Skip to content

Commit

Permalink
Merge branch 'main' into release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jerelmiller authored Jan 24, 2025
2 parents 4ea1622 + 4bd6362 commit 3a60a6f
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 79 deletions.
180 changes: 103 additions & 77 deletions src/__tests__/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2970,100 +2970,124 @@ describe("@connection", () => {
const client = new ApolloClient({ cache });

const obsQueries = new Set<ObservableQuery<any>>();
const subs = new Set<ObservableSubscription>();
function watch(
query: DocumentNode,
fetchPolicy: WatchQueryFetchPolicy = "cache-first"
): any[] {
const results: any[] = [];
) {
const obsQuery = client.watchQuery({
query,
fetchPolicy,
});
obsQueries.add(obsQuery);
subs.add(
obsQuery.subscribe({
next(result) {
results.push(result.data);
},
})
);
return results;
return new ObservableStream(obsQuery);
}

const aResults = watch(gql`
const aStream = watch(gql`
{
a
}
`);
const bResults = watch(gql`
const bStream = watch(gql`
{
b
}
`);
const abResults = watch(gql`
const abStream = watch(gql`
{
a
b
}
`);

await wait();
await expect(aStream).toEmitValue({
data: { a: 123 },
loading: false,
networkStatus: NetworkStatus.ready,
});

function checkLastResult(
results: any[],
expectedData: Record<string, any>
) {
const lastResult = results[results.length - 1];
expect(lastResult).toEqual(expectedData);
return lastResult;
}
await expect(bStream).toEmitValue({
data: { b: "asdf" },
loading: false,
networkStatus: NetworkStatus.ready,
});

checkLastResult(aResults, { a: 123 });
const bAsdf = checkLastResult(bResults, { b: "asdf" });
checkLastResult(abResults, { a: 123, b: "asdf" });
await expect(abStream).toEmitValue({
data: { a: 123, b: "asdf" },
loading: false,
networkStatus: NetworkStatus.ready,
});

aVar(aVar() + 111);
await wait();

const a234 = checkLastResult(aResults, { a: 234 });
expect(checkLastResult(bResults, { b: "asdf" })).toBe(bAsdf);
checkLastResult(abResults, { a: 234, b: "asdf" });
await expect(aStream).toEmitValue({
data: { a: 234 },
loading: false,
networkStatus: NetworkStatus.ready,
});

await expect(bStream).not.toEmitAnything({ timeout: 10 });

await expect(abStream).toEmitValue({
data: { a: 234, b: "asdf" },
loading: false,
networkStatus: NetworkStatus.ready,
});

bVar(bVar().toUpperCase());
await wait();

expect(checkLastResult(aResults, { a: 234 })).toBe(a234);
checkLastResult(bResults, { b: "ASDF" });
checkLastResult(abResults, { a: 234, b: "ASDF" });
await expect(aStream).not.toEmitAnything({ timeout: 10 });

await expect(bStream).toEmitValue({
data: { b: "ASDF" },
loading: false,
networkStatus: NetworkStatus.ready,
});

await expect(abStream).toEmitValue({
data: { a: 234, b: "ASDF" },
loading: false,
networkStatus: NetworkStatus.ready,
});

aVar(aVar() + 222);
bVar("oyez");
await wait();

const a456 = checkLastResult(aResults, { a: 456 });
const bOyez = checkLastResult(bResults, { b: "oyez" });
const a456bOyez = checkLastResult(abResults, { a: 456, b: "oyez" });
await expect(aStream).toEmitValue({
data: { a: 456 },
loading: false,
networkStatus: NetworkStatus.ready,
});

await expect(bStream).toEmitValue({
data: { b: "oyez" },
loading: false,
networkStatus: NetworkStatus.ready,
});

await expect(abStream).toEmitValue({
data: { a: 456, b: "oyez" },
loading: false,
networkStatus: NetworkStatus.ready,
});

// Since the ObservableQuery skips results that are the same as the
// previous result, and nothing is actually changing about the
// ROOT_QUERY.a field, clear previous results to give the invalidated
// results a chance to be delivered.
obsQueries.forEach((obsQuery) => obsQuery.resetLastResults());
await wait();

// Verify that resetting previous results did not trigger the delivery
// of any new results, by itself.
expect(checkLastResult(aResults, a456)).toBe(a456);
expect(checkLastResult(bResults, bOyez)).toBe(bOyez);
expect(checkLastResult(abResults, a456bOyez)).toBe(a456bOyez);
await expect(aStream).not.toEmitAnything({ timeout: 10 });
await expect(bStream).not.toEmitAnything({ timeout: 10 });
await expect(abStream).not.toEmitAnything({ timeout: 10 });

// Now invalidate the ROOT_QUERY.a field.
client.cache.evict({ fieldName: "a" });
await wait();

expect(checkLastResult(aResults, a456)).toBe(a456);
expect(checkLastResult(bResults, bOyez)).toBe(bOyez);
expect(checkLastResult(abResults, a456bOyez)).toBe(a456bOyez);
await expect(aStream).not.toEmitAnything({ timeout: 10 });
await expect(bStream).not.toEmitAnything({ timeout: 10 });
await expect(abStream).not.toEmitAnything({ timeout: 10 });

const cQuery = gql`
{
Expand All @@ -3072,7 +3096,14 @@ describe("@connection", () => {
`;
// Passing cache-only as the fetchPolicy allows the { c: "see" }
// result to be delivered even though networkStatus is still loading.
const cResults = watch(cQuery, "cache-only");
const cStream = watch(cQuery, "cache-only");

await expect(cStream).toEmitValue({
data: {},
loading: false,
networkStatus: NetworkStatus.ready,
partial: true,
});

// Now try writing directly to the cache, rather than calling
// client.writeQuery.
Expand All @@ -3082,12 +3113,15 @@ describe("@connection", () => {
c: "see",
},
});
await wait();

checkLastResult(aResults, a456);
checkLastResult(bResults, bOyez);
checkLastResult(abResults, a456bOyez);
checkLastResult(cResults, { c: "see" });
await expect(aStream).not.toEmitAnything();
await expect(bStream).not.toEmitAnything();
await expect(abStream).not.toEmitAnything();
await expect(cStream).toEmitValue({
data: { c: "see" },
loading: false,
networkStatus: NetworkStatus.ready,
});

cache.modify({
fields: {
Expand All @@ -3097,35 +3131,27 @@ describe("@connection", () => {
},
},
});
await wait();

checkLastResult(aResults, a456);
checkLastResult(bResults, bOyez);
checkLastResult(abResults, a456bOyez);
checkLastResult(cResults, { c: "saw" });
await expect(aStream).not.toEmitAnything();
await expect(bStream).not.toEmitAnything();
await expect(abStream).not.toEmitAnything();
await expect(cStream).toEmitValue({
data: { c: "saw" },
loading: false,
networkStatus: NetworkStatus.ready,
});

client.cache.evict({ fieldName: "c" });
await wait();

checkLastResult(aResults, a456);
checkLastResult(bResults, bOyez);
checkLastResult(abResults, a456bOyez);
expect(checkLastResult(cResults, {}));

expect(aResults).toEqual([{ a: 123 }, { a: 234 }, { a: 456 }]);

expect(bResults).toEqual([{ b: "asdf" }, { b: "ASDF" }, { b: "oyez" }]);

expect(abResults).toEqual([
{ a: 123, b: "asdf" },
{ a: 234, b: "asdf" },
{ a: 234, b: "ASDF" },
{ a: 456, b: "oyez" },
]);

expect(cResults).toEqual([{}, { c: "see" }, { c: "saw" }, {}]);

subs.forEach((sub) => sub.unsubscribe());
await expect(aStream).not.toEmitAnything();
await expect(bStream).not.toEmitAnything();
await expect(abStream).not.toEmitAnything();
await expect(cStream).toEmitValue({
data: {},
loading: false,
networkStatus: NetworkStatus.ready,
partial: true,
});
});

function wait(time = 10) {
Expand Down
34 changes: 33 additions & 1 deletion src/testing/internal/ObservableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ObservableEvent<T> =
export class ObservableStream<T> {
private reader: ReadableStreamDefaultReader<ObservableEvent<T>>;
private subscription!: ObservableSubscription;
private readerQueue: Array<Promise<ObservableEvent<T>>> = [];

constructor(observable: Observable<T>) {
this.reader = new ReadableStream<ObservableEvent<T>>({
Expand All @@ -32,9 +33,36 @@ export class ObservableStream<T> {
}).getReader();
}

peek({ timeout = 100 }: TakeOptions = {}) {
// Calling `peek` multiple times in a row should not advance the reader
// multiple times until this value has been consumed.
let readerPromise = this.readerQueue[0];

if (!readerPromise) {
// Since this.reader.read() advances the reader in the stream, we don't
// want to consume this promise entirely, otherwise we will miss it when
// calling `take`. Instead, we push it into a queue that can be consumed
// by `take` the next time its called so that we avoid advancing the
// reader until we are finished processing all peeked values.
readerPromise = this.readNextValue();
this.readerQueue.push(readerPromise);
}

return Promise.race([
readerPromise,
new Promise<ObservableEvent<T>>((_, reject) => {
setTimeout(
reject,
timeout,
new Error("Timeout waiting for next event")
);
}),
]);
}

take({ timeout = 100 }: TakeOptions = {}) {
return Promise.race([
this.reader.read().then((result) => result.value!),
this.readerQueue.shift() || this.readNextValue(),
new Promise<ObservableEvent<T>>((_, reject) => {
setTimeout(
reject,
Expand Down Expand Up @@ -65,6 +93,10 @@ export class ObservableStream<T> {
const event = await this.take(options);
validateEquals(event, { type: "complete" });
}

private async readNextValue() {
return this.reader.read().then((result) => result.value!);
}
}

// Lightweight expect(...).toEqual(...) check that avoids using `expect` so that
Expand Down
2 changes: 1 addition & 1 deletion src/testing/matchers/toEmitAnything.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const toEmitAnything: MatcherFunction<[options?: TakeOptions]> =
const hint = this.utils.matcherHint("toEmitAnything", "stream", "");

try {
const value = await stream.take(options);
const value = await stream.peek(options);

return {
pass: true,
Expand Down

0 comments on commit 3a60a6f

Please sign in to comment.