diff --git a/src/__tests__/client.ts b/src/__tests__/client.ts index 181e7b8d373..7e99eed1c6a 100644 --- a/src/__tests__/client.ts +++ b/src/__tests__/client.ts @@ -2970,100 +2970,124 @@ describe("@connection", () => { const client = new ApolloClient({ cache }); const obsQueries = new Set>(); - const subs = new Set(); function watch( query: DocumentNode, fetchPolicy: WatchQueryFetchPolicy = "cache-first" - ): any[] { - const results: any[] = []; + ) { const obsQuery = client.watchQuery({ query, fetchPolicy, }); obsQueries.add(obsQuery); - subs.add( - obsQuery.subscribe({ - next(result) { - results.push(result.data); - }, - }) - ); - return results; + return new ObservableStream(obsQuery); } - const aResults = watch(gql` + const aStream = watch(gql` { a } `); - const bResults = watch(gql` + const bStream = watch(gql` { b } `); - const abResults = watch(gql` + const abStream = watch(gql` { a b } `); - await wait(); + await expect(aStream).toEmitValue({ + data: { a: 123 }, + loading: false, + networkStatus: NetworkStatus.ready, + }); - function checkLastResult( - results: any[], - expectedData: Record - ) { - const lastResult = results[results.length - 1]; - expect(lastResult).toEqual(expectedData); - return lastResult; - } + await expect(bStream).toEmitValue({ + data: { b: "asdf" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); - checkLastResult(aResults, { a: 123 }); - const bAsdf = checkLastResult(bResults, { b: "asdf" }); - checkLastResult(abResults, { a: 123, b: "asdf" }); + await expect(abStream).toEmitValue({ + data: { a: 123, b: "asdf" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); aVar(aVar() + 111); - await wait(); - const a234 = checkLastResult(aResults, { a: 234 }); - expect(checkLastResult(bResults, { b: "asdf" })).toBe(bAsdf); - checkLastResult(abResults, { a: 234, b: "asdf" }); + await expect(aStream).toEmitValue({ + data: { a: 234 }, + loading: false, + networkStatus: NetworkStatus.ready, + }); + + await expect(bStream).not.toEmitAnything({ timeout: 10 }); + + await expect(abStream).toEmitValue({ + data: { a: 234, b: "asdf" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); bVar(bVar().toUpperCase()); - await wait(); - expect(checkLastResult(aResults, { a: 234 })).toBe(a234); - checkLastResult(bResults, { b: "ASDF" }); - checkLastResult(abResults, { a: 234, b: "ASDF" }); + await expect(aStream).not.toEmitAnything({ timeout: 10 }); + + await expect(bStream).toEmitValue({ + data: { b: "ASDF" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); + + await expect(abStream).toEmitValue({ + data: { a: 234, b: "ASDF" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); aVar(aVar() + 222); bVar("oyez"); - await wait(); - const a456 = checkLastResult(aResults, { a: 456 }); - const bOyez = checkLastResult(bResults, { b: "oyez" }); - const a456bOyez = checkLastResult(abResults, { a: 456, b: "oyez" }); + await expect(aStream).toEmitValue({ + data: { a: 456 }, + loading: false, + networkStatus: NetworkStatus.ready, + }); + + await expect(bStream).toEmitValue({ + data: { b: "oyez" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); + + await expect(abStream).toEmitValue({ + data: { a: 456, b: "oyez" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); // Since the ObservableQuery skips results that are the same as the // previous result, and nothing is actually changing about the // ROOT_QUERY.a field, clear previous results to give the invalidated // results a chance to be delivered. obsQueries.forEach((obsQuery) => obsQuery.resetLastResults()); - await wait(); + // Verify that resetting previous results did not trigger the delivery // of any new results, by itself. - expect(checkLastResult(aResults, a456)).toBe(a456); - expect(checkLastResult(bResults, bOyez)).toBe(bOyez); - expect(checkLastResult(abResults, a456bOyez)).toBe(a456bOyez); + await expect(aStream).not.toEmitAnything({ timeout: 10 }); + await expect(bStream).not.toEmitAnything({ timeout: 10 }); + await expect(abStream).not.toEmitAnything({ timeout: 10 }); // Now invalidate the ROOT_QUERY.a field. client.cache.evict({ fieldName: "a" }); - await wait(); - expect(checkLastResult(aResults, a456)).toBe(a456); - expect(checkLastResult(bResults, bOyez)).toBe(bOyez); - expect(checkLastResult(abResults, a456bOyez)).toBe(a456bOyez); + await expect(aStream).not.toEmitAnything({ timeout: 10 }); + await expect(bStream).not.toEmitAnything({ timeout: 10 }); + await expect(abStream).not.toEmitAnything({ timeout: 10 }); const cQuery = gql` { @@ -3072,7 +3096,14 @@ describe("@connection", () => { `; // Passing cache-only as the fetchPolicy allows the { c: "see" } // result to be delivered even though networkStatus is still loading. - const cResults = watch(cQuery, "cache-only"); + const cStream = watch(cQuery, "cache-only"); + + await expect(cStream).toEmitValue({ + data: {}, + loading: false, + networkStatus: NetworkStatus.ready, + partial: true, + }); // Now try writing directly to the cache, rather than calling // client.writeQuery. @@ -3082,12 +3113,15 @@ describe("@connection", () => { c: "see", }, }); - await wait(); - checkLastResult(aResults, a456); - checkLastResult(bResults, bOyez); - checkLastResult(abResults, a456bOyez); - checkLastResult(cResults, { c: "see" }); + await expect(aStream).not.toEmitAnything(); + await expect(bStream).not.toEmitAnything(); + await expect(abStream).not.toEmitAnything(); + await expect(cStream).toEmitValue({ + data: { c: "see" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); cache.modify({ fields: { @@ -3097,35 +3131,27 @@ describe("@connection", () => { }, }, }); - await wait(); - checkLastResult(aResults, a456); - checkLastResult(bResults, bOyez); - checkLastResult(abResults, a456bOyez); - checkLastResult(cResults, { c: "saw" }); + await expect(aStream).not.toEmitAnything(); + await expect(bStream).not.toEmitAnything(); + await expect(abStream).not.toEmitAnything(); + await expect(cStream).toEmitValue({ + data: { c: "saw" }, + loading: false, + networkStatus: NetworkStatus.ready, + }); client.cache.evict({ fieldName: "c" }); - await wait(); - checkLastResult(aResults, a456); - checkLastResult(bResults, bOyez); - checkLastResult(abResults, a456bOyez); - expect(checkLastResult(cResults, {})); - - expect(aResults).toEqual([{ a: 123 }, { a: 234 }, { a: 456 }]); - - expect(bResults).toEqual([{ b: "asdf" }, { b: "ASDF" }, { b: "oyez" }]); - - expect(abResults).toEqual([ - { a: 123, b: "asdf" }, - { a: 234, b: "asdf" }, - { a: 234, b: "ASDF" }, - { a: 456, b: "oyez" }, - ]); - - expect(cResults).toEqual([{}, { c: "see" }, { c: "saw" }, {}]); - - subs.forEach((sub) => sub.unsubscribe()); + await expect(aStream).not.toEmitAnything(); + await expect(bStream).not.toEmitAnything(); + await expect(abStream).not.toEmitAnything(); + await expect(cStream).toEmitValue({ + data: {}, + loading: false, + networkStatus: NetworkStatus.ready, + partial: true, + }); }); function wait(time = 10) { diff --git a/src/testing/internal/ObservableStream.ts b/src/testing/internal/ObservableStream.ts index ad5d7c05175..803407d431c 100644 --- a/src/testing/internal/ObservableStream.ts +++ b/src/testing/internal/ObservableStream.ts @@ -19,6 +19,7 @@ type ObservableEvent = export class ObservableStream { private reader: ReadableStreamDefaultReader>; private subscription!: ObservableSubscription; + private readerQueue: Array>> = []; constructor(observable: Observable) { this.reader = new ReadableStream>({ @@ -32,9 +33,36 @@ export class ObservableStream { }).getReader(); } + peek({ timeout = 100 }: TakeOptions = {}) { + // Calling `peek` multiple times in a row should not advance the reader + // multiple times until this value has been consumed. + let readerPromise = this.readerQueue[0]; + + if (!readerPromise) { + // Since this.reader.read() advances the reader in the stream, we don't + // want to consume this promise entirely, otherwise we will miss it when + // calling `take`. Instead, we push it into a queue that can be consumed + // by `take` the next time its called so that we avoid advancing the + // reader until we are finished processing all peeked values. + readerPromise = this.readNextValue(); + this.readerQueue.push(readerPromise); + } + + return Promise.race([ + readerPromise, + new Promise>((_, reject) => { + setTimeout( + reject, + timeout, + new Error("Timeout waiting for next event") + ); + }), + ]); + } + take({ timeout = 100 }: TakeOptions = {}) { return Promise.race([ - this.reader.read().then((result) => result.value!), + this.readerQueue.shift() || this.readNextValue(), new Promise>((_, reject) => { setTimeout( reject, @@ -65,6 +93,10 @@ export class ObservableStream { const event = await this.take(options); validateEquals(event, { type: "complete" }); } + + private async readNextValue() { + return this.reader.read().then((result) => result.value!); + } } // Lightweight expect(...).toEqual(...) check that avoids using `expect` so that diff --git a/src/testing/matchers/toEmitAnything.ts b/src/testing/matchers/toEmitAnything.ts index 0096caf455d..13d1a061834 100644 --- a/src/testing/matchers/toEmitAnything.ts +++ b/src/testing/matchers/toEmitAnything.ts @@ -8,7 +8,7 @@ export const toEmitAnything: MatcherFunction<[options?: TakeOptions]> = const hint = this.utils.matcherHint("toEmitAnything", "stream", ""); try { - const value = await stream.take(options); + const value = await stream.peek(options); return { pass: true,