Skip to content

incremental: introduce GraphQLWrappedResult to avoid filtering #4026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b97128f
incremental: introduce GraphQLResult type to avoid filtering
yaacovCR Apr 7, 2024
bd52975
remove unnecessary members from incrementalContext
yaacovCR Apr 10, 2024
25af8dc
add comment explaining strategy for when we add an extra tick
yaacovCR Apr 10, 2024
056c344
add comment
yaacovCR Apr 10, 2024
1105ee1
drop check for simpllicity
yaacovCR Apr 10, 2024
6cfd1f0
remove last traces of IncrementalContext
yaacovCR Apr 14, 2024
3edba1a
simplify stream handling
yaacovCR Apr 14, 2024
b39435a
remove unnecessary type
yaacovCR Apr 15, 2024
537ce5f
refactor classes into interfaces
yaacovCR Apr 15, 2024
26f089d
change logic to reduce extra ticks
yaacovCR Apr 15, 2024
0202ba6
use errors as discriminator rather than null
yaacovCR Apr 15, 2024
facaf19
remove unnecessary exports
yaacovCR Apr 15, 2024
26974f4
remove unnecessary guard
yaacovCR Apr 15, 2024
47b988c
remove unnecessary loop
yaacovCR Apr 15, 2024
679aeb3
rename fragmentResult
yaacovCR Apr 15, 2024
1b83ea5
add better typing for CancellableStreamRecord
yaacovCR Apr 15, 2024
7dcb8b3
introduce small helper
yaacovCR Apr 15, 2024
0b9842c
fix strict types
yaacovCR Apr 15, 2024
7511630
rename NonTerminatingStreamItemsResult to ReconcilableStreamItemsResult
yaacovCR Apr 16, 2024
765860e
break synchronous loop early on non-reconcilable result
yaacovCR Apr 16, 2024
74d092e
just push!
yaacovCR Apr 16, 2024
24136af
rename GraphQLResult to GraphQLWrappedResult
yaacovCR Apr 16, 2024
eb435a0
pass arguments instead of executor function
yaacovCR Apr 17, 2024
f917255
remove extra tick
yaacovCR Apr 17, 2024
8ffb37f
remove another extra tick
yaacovCR Apr 17, 2024
c22cd01
remove extra function
yaacovCR Apr 17, 2024
6e44e3a
add comment regarding stream terminators
yaacovCR Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add better typing for CancellableStreamRecord
  • Loading branch information
yaacovCR committed Apr 15, 2024
commit 1b83ea54dd1406b46fcb0af7d4d3792b834d07b7
38 changes: 24 additions & 14 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export function buildIncrementalResponse(
}

interface IncrementalPublisherContext {
cancellableStreams: Set<StreamRecord>;
cancellableStreams: Set<CancellableStreamRecord>;
}

/**
Expand Down Expand Up @@ -586,15 +586,19 @@ class IncrementalPublisher {
errors: streamItemsResult.errors,
});
this._pending.delete(streamRecord);
this._context.cancellableStreams.delete(streamRecord);
streamRecord.earlyReturn?.().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
if (isCancellableStreamRecord(streamRecord)) {
this._context.cancellableStreams.delete(streamRecord);
streamRecord.earlyReturn().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
}
} else if (streamItemsResult.result === undefined) {
this._completed.push({ id });
this._pending.delete(streamRecord);
this._context.cancellableStreams.delete(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
this._context.cancellableStreams.delete(streamRecord);
}
} else {
const incrementalEntry: IncrementalStreamResult = {
id,
Expand Down Expand Up @@ -694,7 +698,7 @@ export interface DeferredGroupedFieldSetRecord {
result: PromiseOrValue<DeferredGroupedFieldSetResult>;
}

interface SubsequentResultRecord {
export interface SubsequentResultRecord {
path: Path | undefined;
label: string | undefined;
id?: string | undefined;
Expand Down Expand Up @@ -726,25 +730,31 @@ export class DeferredFragmentRecord implements SubsequentResultRecord {
}
}

export interface StreamRecord extends SubsequentResultRecord {
earlyReturn?: (() => Promise<unknown>) | undefined;
export interface CancellableStreamRecord extends SubsequentResultRecord {
earlyReturn: () => Promise<unknown>;
}

function isCancellableStreamRecord(
subsequentResultRecord: SubsequentResultRecord,
): subsequentResultRecord is CancellableStreamRecord {
return 'earlyReturn' in subsequentResultRecord;
}

interface NonReconcilableStreamItemsResult {
streamRecord: StreamRecord;
streamRecord: SubsequentResultRecord;
errors: ReadonlyArray<GraphQLError>;
result?: never;
}

interface NonTerminatingStreamItemsResult {
streamRecord: StreamRecord;
streamRecord: SubsequentResultRecord;
result: BareStreamItemsResult;
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>;
errors?: never;
}

interface TerminatingStreamItemsResult {
streamRecord: StreamRecord;
streamRecord: SubsequentResultRecord;
result?: never;
incrementalDataRecords?: never;
errors?: never;
Expand All @@ -762,7 +772,7 @@ export function isNonTerminatingStreamItemsResult(
}

export interface StreamItemsRecord {
streamRecord: StreamRecord;
streamRecord: SubsequentResultRecord;
result: PromiseOrValue<StreamItemsResult>;
}

Expand Down
41 changes: 25 additions & 16 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@ import { buildFieldPlan } from './buildFieldPlan.js';
import type { DeferUsage, FieldDetails } from './collectFields.js';
import { collectFields, collectSubfields } from './collectFields.js';
import type {
CancellableStreamRecord,
DeferredGroupedFieldSetRecord,
DeferredGroupedFieldSetResult,
ExecutionResult,
ExperimentalIncrementalExecutionResults,
IncrementalDataRecord,
StreamItemsRecord,
StreamItemsResult,
StreamRecord,
SubsequentResultRecord,
} from './IncrementalPublisher.js';
import {
buildIncrementalResponse,
Expand Down Expand Up @@ -143,7 +144,7 @@ export interface ExecutionContext {
fieldResolver: GraphQLFieldResolver<any, any>;
typeResolver: GraphQLTypeResolver<any, any>;
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
cancellableStreams: Set<StreamRecord>;
cancellableStreams: Set<CancellableStreamRecord>;
}

export interface ExecutionArgs {
Expand Down Expand Up @@ -1031,13 +1032,21 @@ async function completeAsyncIteratorValue(
// eslint-disable-next-line no-constant-condition
while (true) {
if (streamUsage && index >= streamUsage.initialCount) {
const streamRecord: StreamRecord = {
label: streamUsage.label,
path,
earlyReturn: asyncIterator.return?.bind(asyncIterator),
};

exeContext.cancellableStreams.add(streamRecord);
const returnFn = asyncIterator.return;
let streamRecord;
if (returnFn === undefined) {
streamRecord = {
label: streamUsage.label,
path,
} as SubsequentResultRecord;
} else {
streamRecord = {
label: streamUsage.label,
path,
earlyReturn: returnFn.bind(asyncIterator),
} as CancellableStreamRecord;
exeContext.cancellableStreams.add(streamRecord);
}

const firstStreamItems = firstAsyncStreamItems(
streamRecord,
Expand Down Expand Up @@ -1178,7 +1187,7 @@ function completeListValue(
const item = iteration.value;

if (streamUsage && index >= streamUsage.initialCount) {
const streamRecord: StreamRecord = {
const streamRecord: SubsequentResultRecord = {
label: streamUsage.label,
path,
};
Expand Down Expand Up @@ -2031,7 +2040,7 @@ function getDeferredFragmentRecords(
}

function firstSyncStreamItems(
streamRecord: StreamRecord,
streamRecord: SubsequentResultRecord,
initialItem: PromiseOrValue<unknown>,
initialIndex: number,
iterator: Iterator<unknown>,
Expand Down Expand Up @@ -2107,7 +2116,7 @@ function prependNextStreamItems(
}

function firstAsyncStreamItems(
streamRecord: StreamRecord,
streamRecord: SubsequentResultRecord,
path: Path,
initialIndex: number,
nodes: ReadonlyArray<FieldNode>,
Expand All @@ -2134,7 +2143,7 @@ function firstAsyncStreamItems(
}

async function getNextAsyncStreamItemsResult(
streamRecord: StreamRecord,
streamRecord: SubsequentResultRecord,
path: Path,
index: number,
nodes: ReadonlyArray<FieldNode>,
Expand Down Expand Up @@ -2175,7 +2184,7 @@ async function getNextAsyncStreamItemsResult(
}

function nextAsyncStreamItems(
streamRecord: StreamRecord,
streamRecord: SubsequentResultRecord,
path: Path,
initialIndex: number,
nodes: ReadonlyArray<FieldNode>,
Expand All @@ -2202,7 +2211,7 @@ function nextAsyncStreamItems(
}

function completeStreamItems(
streamRecord: StreamRecord,
streamRecord: SubsequentResultRecord,
itemPath: Path,
item: unknown,
exeContext: ExecutionContext,
Expand Down Expand Up @@ -2276,7 +2285,7 @@ function completeStreamItems(

function buildStreamItemsResult(
errors: ReadonlyArray<GraphQLError>,
streamRecord: StreamRecord,
streamRecord: SubsequentResultRecord,
result: GraphQLResult<unknown>,
): StreamItemsResult {
return {
Expand Down