Skip to content

Commit 89f9223

Browse files
authored
incremental: handle Stream as stream rather than linked list (#4098)
The incremental graph can handle a stream as a stream, rather than creating a linked list where each incremental data record also includes the next record in addition to any new defers and/or streams. Enables easily batching all available stream items within the same incremental entry. Depends on #4094
1 parent 07d5025 commit 89f9223

File tree

4 files changed

+229
-289
lines changed

4 files changed

+229
-289
lines changed

src/execution/IncrementalGraph.ts

+82-15
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import { isPromise } from '../jsutils/isPromise.js';
22
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
33

4+
import type { GraphQLError } from '../error/GraphQLError.js';
5+
46
import type {
57
DeferredFragmentRecord,
68
DeferredGroupedFieldSetRecord,
79
IncrementalDataRecord,
810
IncrementalDataRecordResult,
911
ReconcilableDeferredGroupedFieldSetResult,
10-
StreamItemsRecord,
12+
StreamItemRecord,
1113
StreamRecord,
1214
SubsequentResultRecord,
1315
} from './types.js';
@@ -27,9 +29,9 @@ function isDeferredFragmentNode(
2729
}
2830

2931
function isStreamNode(
30-
subsequentResultNode: SubsequentResultNode,
31-
): subsequentResultNode is StreamRecord {
32-
return 'path' in subsequentResultNode;
32+
record: SubsequentResultNode | IncrementalDataRecord,
33+
): record is StreamRecord {
34+
return 'streamItemQueue' in record;
3335
}
3436

3537
type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
@@ -67,7 +69,7 @@ export class IncrementalGraph {
6769
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
6870
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
6971
} else {
70-
this._addStreamItemsRecord(incrementalDataRecord);
72+
this._addStreamRecord(incrementalDataRecord);
7173
}
7274
}
7375
}
@@ -95,6 +97,7 @@ export class IncrementalGraph {
9597
if (isStreamNode(node)) {
9698
this._pending.add(node);
9799
newPending.push(node);
100+
this._newIncrementalDataRecords.add(node);
98101
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
99102
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
100103
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
@@ -110,12 +113,20 @@ export class IncrementalGraph {
110113
this._newPending.clear();
111114

112115
for (const incrementalDataRecord of this._newIncrementalDataRecords) {
113-
const result = incrementalDataRecord.result.value;
114-
if (isPromise(result)) {
116+
if (isStreamNode(incrementalDataRecord)) {
115117
// eslint-disable-next-line @typescript-eslint/no-floating-promises
116-
result.then((resolved) => this._enqueue(resolved));
118+
this._onStreamItems(
119+
incrementalDataRecord,
120+
incrementalDataRecord.streamItemQueue,
121+
);
117122
} else {
118-
this._enqueue(result);
123+
const result = incrementalDataRecord.result.value;
124+
if (isPromise(result)) {
125+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
126+
result.then((resolved) => this._enqueue(resolved));
127+
} else {
128+
this._enqueue(result);
129+
}
119130
}
120131
}
121132
this._newIncrementalDataRecords.clear();
@@ -246,12 +257,8 @@ export class IncrementalGraph {
246257
}
247258
}
248259

249-
private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
250-
const streamRecord = streamItemsRecord.streamRecord;
251-
if (!this._pending.has(streamRecord)) {
252-
this._newPending.add(streamRecord);
253-
}
254-
this._newIncrementalDataRecords.add(streamItemsRecord);
260+
private _addStreamRecord(streamRecord: StreamRecord): void {
261+
this._newPending.add(streamRecord);
255262
}
256263

257264
private _addDeferredFragmentNode(
@@ -283,6 +290,66 @@ export class IncrementalGraph {
283290
return deferredFragmentNode;
284291
}
285292

293+
private async _onStreamItems(
294+
streamRecord: StreamRecord,
295+
streamItemQueue: Array<StreamItemRecord>,
296+
): Promise<void> {
297+
let items: Array<unknown> = [];
298+
let errors: Array<GraphQLError> = [];
299+
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
300+
let streamItemRecord: StreamItemRecord | undefined;
301+
while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
302+
let result = streamItemRecord.value;
303+
if (isPromise(result)) {
304+
if (items.length > 0) {
305+
this._enqueue({
306+
streamRecord,
307+
result:
308+
// TODO add additional test case or rework for coverage
309+
errors.length > 0 /* c8 ignore start */
310+
? { items, errors } /* c8 ignore stop */
311+
: { items },
312+
incrementalDataRecords,
313+
});
314+
items = [];
315+
errors = [];
316+
incrementalDataRecords = [];
317+
}
318+
// eslint-disable-next-line no-await-in-loop
319+
result = await result;
320+
// wait an additional tick to coalesce resolving additional promises
321+
// within the queue
322+
// eslint-disable-next-line no-await-in-loop
323+
await Promise.resolve();
324+
}
325+
if (result.item === undefined) {
326+
if (items.length > 0) {
327+
this._enqueue({
328+
streamRecord,
329+
result: errors.length > 0 ? { items, errors } : { items },
330+
incrementalDataRecords,
331+
});
332+
}
333+
this._enqueue(
334+
result.errors === undefined
335+
? { streamRecord }
336+
: {
337+
streamRecord,
338+
errors: result.errors,
339+
},
340+
);
341+
return;
342+
}
343+
items.push(result.item);
344+
if (result.errors !== undefined) {
345+
errors.push(...result.errors);
346+
}
347+
if (result.incrementalDataRecords !== undefined) {
348+
incrementalDataRecords.push(...result.incrementalDataRecords);
349+
}
350+
}
351+
}
352+
286353
private *_yieldCurrentCompletedIncrementalData(
287354
first: IncrementalDataRecordResult,
288355
): Generator<IncrementalDataRecordResult> {

0 commit comments

Comments
 (0)