Skip to content

RFC: incremental delivery with deduplication + concurrent execution #1026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 63 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
fca1c5d
Introduce @defer and @stream.
robrichard Aug 18, 2022
43e9997
fix typos
robrichard Feb 17, 2021
cb5a3f4
clear up that it is legal to support either defer or stream individually
robrichard Feb 17, 2021
0eb4426
Add sumary of arguments to Type System
robrichard Feb 17, 2021
43bfe01
Update Section 3 -- Type System.md
robrichard May 15, 2021
acb5bf0
clarification on defer/stream requirement
robrichard Nov 19, 2021
abea59b
clarify negative values of initialCount
robrichard Nov 20, 2021
139d69f
allow extensions only subsequent payloads
robrichard Nov 25, 2021
de5004b
fix typo
robrichard Nov 26, 2021
9e89f42
Raise a field error if initialCount is less than zero
robrichard Aug 18, 2022
f894ba3
data is not necessarily an object in subsequent payloads
robrichard Dec 6, 2021
08053d7
add Defer And Stream Directives Are Used On Valid Root Field rule
robrichard Dec 6, 2021
e19246b
wait for parent async record to ensure correct order of payloads
robrichard Aug 18, 2022
2ecd0af
Simplify execution, payloads should begin execution immediately
robrichard Dec 20, 2021
337bb87
Clarify error handling
robrichard Dec 20, 2021
2982dec
add isCompletedIterator to AsyncPayloadRecord to track completed iter…
robrichard Dec 30, 2021
32fb73b
fix typo
robrichard Jan 21, 2022
1ff999e
deferDirective and visitedFragments
robrichard Feb 2, 2022
270b409
stream if argument, indexPath -> itemPath
robrichard Feb 7, 2022
75f2258
Clarify stream only applies to outermost list of multi-dimensional ar…
robrichard Feb 7, 2022
d8c28d1
add validation “Defer And Stream Directive Labels Are Unique”
robrichard Mar 7, 2022
eb3a4e3
Clarification on labels
robrichard Mar 8, 2022
f2b50bf
fix wrong quotes
robrichard Mar 23, 2022
92f02f3
remove label/path requirement
robrichard Mar 23, 2022
049bce8
add missing line
robrichard Jun 9, 2022
9a07500
fix ExecuteRequest
robrichard Jun 9, 2022
7c5e1da
fix response
robrichard Jun 9, 2022
19cb9c3
Align deferred fragment field collection with reference implementation
robrichard Aug 3, 2022
c747f61
spec updates to reflect latest discussions
robrichard Aug 18, 2022
6f3c715
Note about mutation execution order
robrichard Aug 18, 2022
7c9ea0a
minor change for uniqueness
robrichard Aug 18, 2022
d84939e
fix typos
robrichard Aug 18, 2022
1ad7e9c
if: Boolean! = true
robrichard Aug 23, 2022
4b6554e
address pr feedback
robrichard Aug 23, 2022
9103fdb
clarify null behavior of if
robrichard Aug 24, 2022
3944d05
Add error boundary behavior
robrichard Sep 8, 2022
90b31ae
defer/stream response => payload
robrichard Sep 8, 2022
f1c0ec2
event stream => response stream
robrichard Sep 8, 2022
3830406
link to path section
robrichard Sep 8, 2022
f950efb
use case no dash
robrichard Sep 8, 2022
ad5b2e2
remove "or null"
robrichard Sep 8, 2022
c1f3f65
add detailed incremental example
robrichard Sep 8, 2022
2e41749
update label validation rule
robrichard Sep 8, 2022
abb14a0
clarify hasNext on incremental example
robrichard Sep 8, 2022
4ea2a34
clarify canceling of subsequent payloads
robrichard Sep 8, 2022
1565491
Add examples for non-null cases
robrichard Sep 8, 2022
a938f44
typo
robrichard Sep 9, 2022
a301f21
improve non-null example
robrichard Sep 9, 2022
38bfbb9
Add FilterSubsequentPayloads algorithm
robrichard Sep 9, 2022
8d07dee
link to note on should
robrichard Oct 12, 2022
008818d
update on hasNext
robrichard Nov 1, 2022
4adb05a
small fixes (#3)
yaacovCR Nov 7, 2022
ddd0fd7
remove ResolveFIeldGenerator (#4)
yaacovCR Nov 16, 2022
b54c9fe
fix typos (#6)
yaacovCR Nov 18, 2022
02d4676
Add error handling for stream iterators (#5)
yaacovCR Nov 21, 2022
3e74250
Raise a field error if defer/stream encountered during subscription e…
robrichard Nov 22, 2022
cb3ab46
Add validation rule for defer/stream on subscriptions
robrichard Nov 22, 2022
24cf072
clarify label is not required
robrichard Nov 23, 2022
d74430c
fix parentRecord argument in ExecuteStreamField (#7)
yaacovCR Nov 29, 2022
79da712
fix typo
robrichard Dec 5, 2022
8df13da
replace server with service
robrichard Jan 15, 2023
94363c9
CollectFields does not require path or asyncRecord (#11)
yaacovCR Jan 16, 2023
32ed5bc
incremental delivery with deduplication and concurrent delivery
yaacovCR May 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wait for parent async record to ensure correct order of payloads
# Conflicts:
#	spec/Section 6 -- Execution.md
  • Loading branch information
robrichard committed Jan 15, 2023
commit e19246b0f4fb21d78fc400638b711239149bd3a4
1 change: 0 additions & 1 deletion cspell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ words:
- openwebfoundation
- parallelization
- structs
- sublist
- subselection
# Fictional characters / examples
- alderaan
Expand Down
192 changes: 108 additions & 84 deletions spec/Section 6 -- Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,13 @@ First, the selection set is turned into a grouped field set; then, each
represented field in the grouped field set produces an entry into a response
map.

ExecuteSelectionSet(selectionSet, objectType, objectValue, variableValues,
subsequentPayloads, parentPath):
ExecuteSelectionSet(selectionSet, objectType, objectValue, variableValues, path,
subsequentPayloads, asyncRecord):

- If {path} is not provided, initialize it to an empty list.
- If {subsequentPayloads} is not provided, initialize it to the empty set.
- If {parentPath} is not provided, initialize it to an empty list.
- Let {groupedFieldSet} be the result of {CollectFields(objectType, objectValue,
selectionSet, variableValues, subsequentPayloads, parentPath)}.
selectionSet, variableValues, path subsequentPayloads, asyncRecord)}.
- Initialize {resultMap} to an empty ordered map.
- For each {groupedFieldSet} as {responseKey} and {fields}:
- Let {fieldName} be the name of the first entry in {fields}. Note: This value
Expand All @@ -433,7 +433,7 @@ subsequentPayloads, parentPath):
{objectType}.
- If {fieldType} is defined:
- Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType,
fields, variableValues, subsequentPayloads, parentPath)}.
fields, variableValues, path, subsequentPayloads, asyncRecord)}.
- Set {responseValue} as the value for {responseKey} in {resultMap}.
- Return {resultMap}.

Expand Down Expand Up @@ -585,8 +585,8 @@ The depth-first-search order of the field groups produced by {CollectFields()}
is maintained through execution, ensuring that fields appear in the executed
response in a stable and predictable order.

CollectFields(objectType, objectValue, selectionSet, variableValues,
deferredFragments, parentPath, visitedFragments):
CollectFields(objectType, objectValue, selectionSet, variableValues, path,
subsequentPayloads, asyncRecord, visitedFragments):

- If {visitedFragments} is not provided, initialize it to the empty set.
- Initialize {groupedFields} to an empty ordered map of lists.
Expand Down Expand Up @@ -625,14 +625,16 @@ deferredFragments, parentPath, visitedFragments):
with the next {selection} in {selectionSet}.
- Let {fragmentSelectionSet} be the top-level selection set of {fragment}.
- If {deferDirective} is defined:
- Let {deferredFragment} be the result of calling
{DeferFragment(objectType, objectValue, fragmentSelectionSet,
parentPath)}.
- Append {deferredFragment} to {deferredFragments}.
- Let {label} be the value or the variable to {deferDirective}'s {label}
argument.
- Let {deferredFragmentRecord} be the result of calling
{CreateDeferredFragmentRecord(label, objectType, objectValue,
fragmentSelectionSet, path, asyncRecord)}.
- Append {deferredFragmentRecord} to {subsequentPayloads}.
- Continue with the next {selection} in {selectionSet}.
- Let {fragmentGroupedFieldSet} be the result of calling
{CollectFields(objectType, fragmentSelectionSet, variableValues,
visitedFragments)}.
{CollectFields(objectType, objectValue, fragmentSelectionSet,
variableValues, path, subsequentPayloads, asyncRecord, visitedFragments)}.
- For each {fragmentGroup} in {fragmentGroupedFieldSet}:
- Let {responseKey} be the response key shared by all fields in
{fragmentGroup}.
Expand All @@ -649,14 +651,16 @@ deferredFragments, parentPath, visitedFragments):
be that directive.
- If {deferDirective}'s {if} argument is {true} or is a variable in
{variableValues} with the value {true}:
- Let {deferredFragment} be the result of calling
{DeferFragment(objectType, objectValue, fragmentSelectionSet,
parentPath)}.
- Append {deferredFragment} to {deferredFragments}.
- Let {label} be the value or the variable to {deferDirective}'s {label}
argument.
- Let {deferredFragmentRecord} be the result of calling
{CreateDeferredFragmentRecord(label, objectType, objectValue,
fragmentSelectionSet, path, asyncRecord)}.
- Append {deferredFragmentRecord} to {subsequentPayloads}.
- Continue with the next {selection} in {selectionSet}.
- Let {fragmentGroupedFieldSet} be the result of calling
{CollectFields(objectType, fragmentSelectionSet, variableValues,
visitedFragments, parentPath)}.
{CollectFields(objectType, objectValue, fragmentSelectionSet,
variableValues, path, subsequentPayloads, asyncRecord, visitedFragments)}.
- For each {fragmentGroup} in {fragmentGroupedFieldSet}:
- Let {responseKey} be the response key shared by all fields in
{fragmentGroup}.
Expand All @@ -680,44 +684,53 @@ DoesFragmentTypeApply(objectType, fragmentType):
- if {objectType} is a possible type of {fragmentType}, return {true}
otherwise return {false}.

DeferFragment(objectType, objectValue, fragmentSelectionSet, parentPath):
#### Async Payload Record

An Async Payload Record is either a Deferred Fragment Record or a Stream Record.
All Async Payload Records are structures containing:

- Let {label} be the value or the variable to {deferDirective}'s {label}
argument.
- Let {deferredFragmentRecord} be the result of calling
{CreateDeferredFragmentRecord(label, objectType, objectValue,
fragmentSelectionSet, parentPath)}.
- return {deferredFragmentRecord}.
- {label}: value derived from the corresponding `@defer` or `@stream` directive.
- {parentRecord}: optionally an Async Payload Record.
- {errors}: a list of field errors encountered during execution.
- {dataExecution}: A result that can notify when the corresponding execution has
completed.
- {path}: a list of field names and indices from root to the location of the
corresponding `@defer` or `@stream` directive.

#### Deferred Fragment Record

Let {deferredFragmentRecord} be an inline fragment or fragment spread with
`@defer` provided.

Deferred Fragment Record is a structure containing:
Deferred Fragment Record is a structure containing all the entries of Async
Payload Record, and additionally:

- {label}: value derived from the `@defer` directive.
- {objectType}: of the {deferredFragmentRecord}.
- {objectValue}: of the {deferredFragmentRecord}.
- {fragmentSelectionSet}: the top level selection set of
{deferredFragmentRecord}.
- {path}: a list of field names and indices from root to
{deferredFragmentRecord}.

CreateDeferredFragmentRecord(label, objectType, objectValue,
fragmentSelectionSet, path):
fragmentSelectionSet, path, parentRecord):

- If {path} is not provided, initialize it to an empty list.
- Construct a deferred fragment record based on the parameters passed in.
- Initialize {errors} to an empty list.

ResolveDeferredFragmentRecord(deferredFragmentRecord, variableValues,
subsequentPayloads):

- Let {label}, {objectType}, {objectValue}, {fragmentSelectionSet}, {path} be
the corresponding fields in the deferred fragment record structure.
- Let {payload} be the result of calling
{ExecuteSelectionSet(fragmentSelectionSet, objectType, objectValue,
variableValues, subsequentPayloads, path)}.
- Let {label}, {objectType}, {objectValue}, {fragmentSelectionSet}, {path},
{parentRecord} be the corresponding fields in the deferred fragment record
structure.
- Let {dataExecution} be the asynchronous future value of:
- Let {payload} be the result of {ExecuteSelectionSet(fragmentSelectionSet,
objectType, objectValue, variableValues, path, subsequentPayloads,
deferredFragmentRecord)}.
- If {parentRecord} is defined:
- Wait for the result of {dataExecution} on {parentRecord}.
- Return {payload}.
- Set {dataExecution} on {deferredFragmentRecord}.
- Let {payload} be the result of waiting for {dataExecution}.
- Add an entry to {payload} named `label` with the value {label}.
- Add an entry to {payload} named `path` with the value {path}.
- Return {payload}.
Expand All @@ -730,28 +743,27 @@ coerces any provided argument values, then resolves a value for the field, and
finally completes that value either by recursively executing another selection
set or coercing a scalar value.

ExecuteField(objectType, objectValue, fieldType, fields, variableValues,
subsequentPayloads, parentPath):
ExecuteField(objectType, objectValue, fieldType, fields, variableValues, path,
subsequentPayloads, asyncRecord):

- Let {field} be the first entry in {fields}.
- Let {fieldName} be the field name of {field}.
- Append {fieldName} to {path}.
- Let {argumentValues} be the result of {CoerceArgumentValues(objectType, field,
variableValues)}
- If {field} provides the directive `@stream`, let {streamDirective} be that
directive.
- Let {initialCount} be the value or variable provided to {streamDirective}'s
{initialCount} argument.
- Let {resolvedValue} be {ResolveFieldGenerator(objectType, objectValue,
fieldName, argumentValues, initialCount)}.
fieldName, argumentValues)}.
- Let {result} be the result of calling {CompleteValue(fieldType, fields,
resolvedValue, variableValues, subsequentPayloads, parentPath)}.
- Append {fieldName} to the {path} field of every {subsequentPayloads}.
resolvedValue, variableValues, path, subsequentPayloads, asyncRecord)}.
- Return {result}.
- Let {resolvedValue} be {ResolveFieldValue(objectType, objectValue, fieldName,
argumentValues)}.
- Let {result} be the result of calling {CompleteValue(fieldType, fields,
resolvedValue, variableValues, subsequentPayloads)}.
- Append {fieldName} to the {path} for every {subsequentPayloads}.
resolvedValue, variableValues, path, subsequentPayloads, asyncRecord)}.
- Return {result}.

### Coercing Field Arguments
Expand Down Expand Up @@ -837,14 +849,13 @@ ResolveFieldValue(objectType, objectValue, fieldName, argumentValues):
- Return the result of calling {resolver}, providing {objectValue} and
{argumentValues}.

ResolveFieldGenerator(objectType, objectValue, fieldName, argumentValues,
initialCount):
ResolveFieldGenerator(objectType, objectValue, fieldName, argumentValues):

- If {objectType} provide an internal function {generatorResolver} for
generating partially resolved value of a list field named {fieldName}:
- Let {generatorResolver} be the internal function.
- Return the iterator from calling {generatorResolver}, providing
{objectValue}, {argumentValues} and {initialCount}.
{objectValue} and {argumentValues}.
- Create {generator} from {ResolveFieldValue(objectType, objectValue, fieldName,
argumentValues)}.
- Return {generator}.
Expand All @@ -864,52 +875,58 @@ field execution process continues recursively. In the case where a value
returned for a list type field is an iterator due to `@stream` specified on the
field, value completion iterates over the iterator until the number of items
yield by the iterator satisfies `initialCount` specified on the `@stream`
directive. Unresolved items in the iterator will be stored in a stream record
which the executor resumes to execute after the initial execution finishes.
directive.

#### Stream Record

Let {streamField} be a list field with a `@stream` directive provided.

A Stream Record is a structure containing:
A Stream Record is a structure containing all the entries of Async Payload
Record, and additionally:

- {label}: value derived from the `@stream` directive's `label` argument.
- {iterator}: created by {ResolveFieldGenerator}.
- {resolvedItems}: items resolved from the {iterator} but not yet delivered.
- {index}: indicating the position of the item in the complete list.
- {path}: a list of field names and indices from root to {streamField}.
- {fields}: the group of fields grouped by CollectFields() for {streamField}.
- {innerType}: inner type of {streamField}'s type.

CreateStreamRecord(label, initialCount, iterator, resolvedItems, index, fields,
innerType):
CreateStreamRecord(label, iterator, index, fields, innerType, path,
parentRecord):

- Construct a stream record based on the parameters passed in.
- Initialize {errors} to an empty list.

ResolveStreamRecord(streamRecord, variableValues, subsequentPayloads):

- Let {label}, {iterator}, {resolvedItems}, {index}, {path}, {fields},
- Let {label}, {parentRecord}, {iterator}, {index}, {path}, {fields},
{innerType} be the correspondent fields on the Stream Record structure.
- Wait for the next item from {iterator}.
- If an item is not retrieved because {iterator} has completed:
- Return {null}
- Let {item} be the item retrieved from {iterator}.
- Append {index} to {path}.
- Increment {index}.
- Let {payload} be the result of calling CompleteValue(innerType, fields, item,
variableValues, subsequentPayloads, path)}.
- Let {indexPath} be {path} with {index} appended.
- Let {dataExecution} be the asynchronous future value of:
- Wait for the next item from {iterator}.
- If an item is not retrieved because {iterator} has completed:
- Return {null}.
- Let {item} be the item retrieved from {iterator}.
- Let {payload} be the result of calling {CompleteValue(innerType, fields,
item, variableValues, indexPath, subsequentPayloads, parentRecord)}.
- Increment {index}.
- Let {nextStreamRecord} be the result of calling {CreateStreamRecord(label,
iterator, index, fields, innerType, path, streamRecord)}.
- Append {nextStreamRecord} to {subsequentPayloads}.
- If {parentRecord} is defined:
- Wait for the result of {dataExecution} on {parentRecord}.
- Return {payload}.
- Set {dataExecution} on {streamRecord}.
- Let {payload} be the result of waiting for {dataExecution}.
- Add an entry to {payload} named `label` with the value {label}.
- Add an entry to {payload} named `path` with the value {path}.
- Append {streamRecord} to {subsequentPayloads}.
- Add an entry to {payload} named `path` with the value {indexPath}.
- Return {payload}.

CompleteValue(fieldType, fields, result, variableValues, subsequentPayloads,
parentPath):
CompleteValue(fieldType, fields, result, variableValues, path,
subsequentPayloads, asyncRecord):

- If the {fieldType} is a Non-Null type:
- Let {innerType} be the inner type of {fieldType}.
- Let {completedResult} be the result of calling {CompleteValue(innerType,
fields, result, variableValues)}.
fields, result, variableValues, path)}.
- If {completedResult} is {null}, raise a _field error_.
- Return {completedResult}.
- If {result} is {null} (or another internal value similar to {null} such as
Expand All @@ -924,26 +941,33 @@ parentPath):
- If {initialCount} is less than zero, raise a _field error_.
- Let {label} be the value or variable provided to {streamDirective}'s
{label} argument.
- Let {resolvedItems} be an empty list
- For each {members} in {result}:
- Append all items from {members} to {resolvedItems}.
- If the length of {resolvedItems} is greater or equal to {initialCount}:
- Let {initialItems} be the sublist of the first {initialCount} items
from {resolvedItems}.
- Let {remainingItems} be the sublist of the items in {resolvedItems}
after the first {initialCount} items.
- Let {initialItems} be an empty list
- Let {index} be zero.
- While {result} is not closed:
- If {streamDirective} was not provided or {index} is not greater than or
equal to {initialCount}:
- Wait for the next item from {result}.
- Let {resultItem} be the item retrieved from {result}.
- Let {indexPath} be {path} with {index} appended.
- Let {resolvedItem} be the result of calling {CompleteValue(innerType,
fields, resultItem, variableValues, indexPath, subsequentPayloads,
asyncRecord)}.
- Append {resolvedItem} to {initialItems}.
- Increment {index}.
- If {streamDirective} was provided and {index} is greater than or equal
to {initialCount}:
- Let {streamRecord} be the result of calling {CreateStreamRecord(label,
initialCount, result, remainingItems, initialCount, fields, innerType,
parentPath)}
result, index, fields, innerType, path, asyncRecord)}.
- Append {streamRecord} to {subsequentPayloads}.
- Let {result} be {initialItems}.
- Exit for each loop.
- Exit while loop.
- Return {initialItems}.
- If {result} is not a collection of values, raise a _field error_.
- Let {innerType} be the inner type of {fieldType}.
- Return a list where each list item is the result of calling
{CompleteValue(innerType, fields, resultItem, variableValues,
subsequentPayloads, parentPath)}, where {resultItem} is each item in
{result}.
{CompleteValue(innerType, fields, resultItem, variableValues, indexPath,
subsequentPayloads, asyncRecord)}, where {resultItem} is each item in
{result} and {indexPath} is {path} with the index of the item appended.
- If {fieldType} is a Scalar or Enum type:
- Return the result of {CoerceResult(fieldType, result)}.
- If {fieldType} is an Object, Interface, or Union type:
Expand All @@ -953,7 +977,7 @@ parentPath):
- Let {objectType} be {ResolveAbstractType(fieldType, result)}.
- Let {subSelectionSet} be the result of calling {MergeSelectionSets(fields)}.
- Return the result of evaluating {ExecuteSelectionSet(subSelectionSet,
objectType, result, variableValues, subsequentPayloads, parentPath)}
objectType, result, variableValues, path, subsequentPayloads, asyncRecord)}
_normally_ (allowing for parallelization).

**Coercing Results**
Expand Down