Skip to content

Add support for defer and stream directives (Feedback is welcome) #2839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts:
#	src/execution/execute.ts
  • Loading branch information
robrichard committed Jun 23, 2022
commit 05f1903844737f1b9e88d49333de08bdc203346d
209 changes: 209 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { assert } from 'chai';
import { describe, it } from 'mocha';

import { expectJSON } from '../../__testUtils__/expectJSON';
Expand Down Expand Up @@ -162,6 +163,37 @@ const query = new GraphQLObjectType({
yield await Promise.resolve({ string: friends[1].name });
},
},
asyncIterableListDelayed: {
type: new GraphQLList(friendType),
async *resolve() {
for (const friend of friends) {
// pause an additional ms before yielding to allow time
// for tests to return or throw before next value is processed.
// eslint-disable-next-line no-await-in-loop
await resolveOnNextTick();
yield friend; /* c8 ignore start */
// Not reachable, early return
}
} /* c8 ignore stop */,
},
asyncIterableListNoReturn: {
type: new GraphQLList(friendType),
resolve() {
let i = 0;
return {
[Symbol.asyncIterator]: () => ({
async next() {
const friend = friends[i++];
if (friend) {
await resolveOnNextTick();
return { value: friend, done: false };
}
return { value: undefined, done: true };
},
}),
};
},
},
asyncIterableListDelayedClose: {
type: new GraphQLList(friendType),
async *resolve() {
Expand Down Expand Up @@ -1189,4 +1221,181 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Returns underlying async iterables when dispatcher is returned', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
assert(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expectJSON(result1).toDeepEqual({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

const returnPromise = iterator.return();

// this result had started processing before return was called
const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
data: [
{
id: '2',
name: 'Han',
},
],
hasNext: true,
path: ['asyncIterableListDelayed', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: true,
value: undefined,
});
await returnPromise;
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
const document = parse(`
query {
asyncIterableListNoReturn @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
assert(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expectJSON(result1).toDeepEqual({
done: false,
value: {
data: {
asyncIterableListNoReturn: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

const returnPromise = iterator.return();

// this result had started processing before return was called
const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
data: [
{
id: '2',
name: 'Han',
},
],
hasNext: true,
path: ['asyncIterableListNoReturn', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: true,
value: undefined,
});
await returnPromise;
});
it('Returns underlying async iterables when dispatcher is thrown', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
assert(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expectJSON(result1).toDeepEqual({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

const throwPromise = iterator.throw(new Error('bad'));

// this result had started processing before return was called
const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
data: [
{
id: '2',
name: 'Han',
},
],
hasNext: true,
path: ['asyncIterableListDelayed', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: true,
value: undefined,
});
try {
await throwPromise; /* c8 ignore start */
// Not reachable, always throws
/* c8 ignore stop */
} catch (e) {
// ignore error
}
});
});
32 changes: 23 additions & 9 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,7 @@ async function executeStreamIterator(
label,
path: fieldPath,
parentContext,
iterator,
});

const dataPromise = executeStreamIteratorItem(
Expand Down Expand Up @@ -1786,6 +1787,7 @@ function yieldSubsequentPayloads(
initialResult: ExecutionResult,
): AsyncGenerator<AsyncExecutionResult, void, void> {
let _hasReturnedInitialResult = false;
let isDone = false;

async function race(): Promise<IteratorResult<AsyncExecutionResult>> {
if (exeContext.subsequentPayloads.length === 0) {
Expand Down Expand Up @@ -1856,19 +1858,31 @@ function yieldSubsequentPayloads(
},
done: false,
});
} else if (exeContext.subsequentPayloads.length === 0) {
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
return Promise.resolve({ value: undefined, done: true });
}
return race();
},
// TODO: implement return & throw
// c8 ignore next 2
// will be covered in follow up
return: () => Promise.resolve({ value: undefined, done: true }),

// c8 ignore next 2
// will be covered in follow up
throw: (error?: unknown) => Promise.reject(error),
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
await Promise.all(
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
asyncPayloadRecord.iterator?.return?.(),
),
);
isDone = true;
return { value: undefined, done: true };
},
async throw(
error?: unknown,
): Promise<IteratorResult<AsyncExecutionResult, void>> {
await Promise.all(
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
asyncPayloadRecord.iterator?.return?.(),
),
);
isDone = true;
return Promise.reject(error);
},
};
}

Expand Down