Skip to content

Commit ded31b4

Browse files
authored
Merge pull request graphql-dotnet#431 from graphql-dotnet/subscriptions2
Add additional subscription support
2 parents da08542 + 08e4cd2 commit ded31b4

File tree

9 files changed

+193
-173
lines changed

9 files changed

+193
-173
lines changed

src/GraphQL.Tests/Subscription/SubscriptionSchema.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Concurrent;
33
using System.Linq;
44
using System.Reactive.Linq;
@@ -19,7 +19,7 @@ public ChatSchema(IChat chat)
1919
}
2020
}
2121

22-
public class ChatSubscriptions : ObjectGraphType<object>
22+
public class ChatSubscriptions : ObjectGraphType
2323
{
2424
private readonly IChat _chat;
2525

src/GraphQL/Builders/FieldBuilder.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using GraphQL.Types;
33
using GraphQL.Resolvers;
4+
using GraphQL.Subscription;
45

56
namespace GraphQL.Builders
67
{
@@ -14,18 +15,18 @@ public static FieldBuilder<TSourceType, TReturnType> Create<TSourceType, TReturn
1415

1516
public class FieldBuilder<TSourceType, TReturnType>
1617
{
17-
private readonly FieldType _fieldType;
18+
private readonly EventStreamFieldType _fieldType;
1819

19-
public FieldType FieldType => _fieldType;
20+
public EventStreamFieldType FieldType => _fieldType;
2021

21-
private FieldBuilder(FieldType fieldType)
22+
private FieldBuilder(EventStreamFieldType fieldType)
2223
{
2324
_fieldType = fieldType;
2425
}
2526

2627
public static FieldBuilder<TSourceType, TReturnType> Create(Type type = null)
2728
{
28-
var fieldType = new FieldType
29+
var fieldType = new EventStreamFieldType
2930
{
3031
Type = type,
3132
Arguments = new QueryArguments(),
@@ -106,5 +107,11 @@ public FieldBuilder<TSourceType, TReturnType> Configure(Action<FieldType> config
106107
configure(FieldType);
107108
return this;
108109
}
110+
111+
public FieldBuilder<TSourceType, TReturnType> Subscribe(Func<ResolveEventStreamContext<TSourceType>, IObservable<TReturnType>> subscribe)
112+
{
113+
FieldType.Subscriber = new EventStreamResolver<TSourceType, TReturnType>(subscribe);
114+
return this;
115+
}
109116
}
110117
}

src/GraphQL/Execution/DocumentExecuter.cs

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections;
33
using System.Collections.Generic;
44
using System.Linq;
@@ -81,14 +81,49 @@ public Task<ExecutionResult> ExecuteAsync(Action<ExecutionOptions> configure)
8181
return ExecuteAsync(options);
8282
}
8383

84+
protected virtual ExecutionResult InitializeResult(ExecutionOptions config)
85+
{
86+
var result = new ExecutionResult { Query = config.Query, ExposeExceptions = config.ExposeExceptions };
87+
return result;
88+
}
89+
90+
protected virtual async Task OnExecution(ExecutionOptions config, ExecutionContext context, ExecutionResult result)
91+
{
92+
var task = ExecuteOperationAsync(context).ConfigureAwait(false);
93+
94+
foreach (var listener in config.Listeners)
95+
{
96+
await listener.BeforeExecutionAwaitedAsync(config.UserContext, config.CancellationToken).ConfigureAwait(false);
97+
}
98+
99+
result.Data = await task;
100+
}
101+
102+
protected virtual void OnValidationError(IValidationResult validationResult, ExecutionResult result)
103+
{
104+
result.Data = null;
105+
result.Errors = validationResult.Errors;
106+
}
107+
108+
protected virtual void OnError(Exception exception, ExecutionResult result)
109+
{
110+
if (result.Errors == null)
111+
{
112+
result.Errors = new ExecutionErrors();
113+
}
114+
115+
result.Data = null;
116+
result.Errors.Add(new ExecutionError(exception.Message, exception));
117+
}
118+
84119
public async Task<ExecutionResult> ExecuteAsync(ExecutionOptions config)
85120
{
86121
var metrics = new Metrics();
87122
metrics.Start(config.OperationName);
88123

89124
config.Schema.FieldNameConverter = config.FieldNameConverter;
90125

91-
var result = new ExecutionResult { Query = config.Query, ExposeExceptions = config.ExposeExceptions };
126+
var result = InitializeResult(config);
92127
try
93128
{
94129
if (!config.Schema.Initialized)
@@ -166,14 +201,7 @@ await listener.AfterValidationAsync(
166201
await listener.BeforeExecutionAsync(config.UserContext, config.CancellationToken).ConfigureAwait(false);
167202
}
168203

169-
var task = ExecuteOperationAsync(context).ConfigureAwait(false);
170-
171-
foreach (var listener in config.Listeners)
172-
{
173-
await listener.BeforeExecutionAwaitedAsync(config.UserContext, config.CancellationToken).ConfigureAwait(false);
174-
}
175-
176-
result.Data = await task;
204+
await OnExecution(config, context, result);
177205

178206
foreach (var listener in config.Listeners)
179207
{
@@ -188,21 +216,14 @@ await listener.AfterValidationAsync(
188216
}
189217
else
190218
{
191-
result.Data = null;
192-
result.Errors = validationResult.Errors;
219+
OnValidationError(validationResult, result);
193220
}
194221

195222
return result;
196223
}
197224
catch (Exception exc)
198225
{
199-
if (result.Errors == null)
200-
{
201-
result.Errors = new ExecutionErrors();
202-
}
203-
204-
result.Data = null;
205-
result.Errors.Add(new ExecutionError(exc.Message, exc));
226+
OnError(exc, result);
206227
return result;
207228
}
208229
finally
@@ -237,7 +258,7 @@ public ExecutionContext BuildExecutionContext(
237258
return context;
238259
}
239260

240-
protected Operation GetOperation(string operationName, Document document)
261+
protected virtual Operation GetOperation(string operationName, Document document)
241262
{
242263
var operation = !string.IsNullOrWhiteSpace(operationName)
243264
? document.Operations.WithName(operationName)

src/GraphQL/Resolvers/EventStreamResolver.cs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System;
2-
using GraphQL.Execution;
1+
using System;
32
using GraphQL.Subscription;
43

54
namespace GraphQL.Resolvers
@@ -24,4 +23,25 @@ IObservable<object> IEventStreamResolver.Subscribe(ResolveEventStreamContext con
2423
return (IObservable<object>)Subscribe(context);
2524
}
2625
}
26+
27+
public class EventStreamResolver<TSourceType, TReturnType> : IEventStreamResolver<TReturnType>
28+
{
29+
private readonly Func<ResolveEventStreamContext<TSourceType>, IObservable<TReturnType>> _subscriber;
30+
31+
public EventStreamResolver(
32+
Func<ResolveEventStreamContext<TSourceType>, IObservable<TReturnType>> subscriber)
33+
{
34+
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
35+
}
36+
37+
public IObservable<TReturnType> Subscribe(ResolveEventStreamContext context)
38+
{
39+
return _subscriber(context.As<TSourceType>());
40+
}
41+
42+
IObservable<object> IEventStreamResolver.Subscribe(ResolveEventStreamContext context)
43+
{
44+
return (IObservable<object>)Subscribe(context);
45+
}
46+
}
2747
}

src/GraphQL/Resolvers/IEventStreamResolver.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System;
2-
using GraphQL.Execution;
1+
using System;
32
using GraphQL.Subscription;
43

54
namespace GraphQL.Resolvers
Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,38 @@
1-
using GraphQL.Types;
1+
using GraphQL.Types;
22

33
namespace GraphQL.Subscription
44
{
5-
public class ResolveEventStreamContext : ResolveFieldContext<object>
5+
public class ResolveEventStreamContext<T> : ResolveFieldContext<T>
66
{
7+
public ResolveEventStreamContext() { }
8+
9+
public ResolveEventStreamContext(ResolveEventStreamContext context)
10+
{
11+
Source = (T)context.Source;
12+
FieldName = context.FieldName;
13+
FieldAst = context.FieldAst;
14+
FieldDefinition = context.FieldDefinition;
15+
ReturnType = context.ReturnType;
16+
ParentType = context.ParentType;
17+
Arguments = context.Arguments;
18+
Schema = context.Schema;
19+
Document = context.Document;
20+
Fragments = context.Fragments;
21+
RootValue = context.RootValue;
22+
UserContext = context.UserContext;
23+
Operation = context.Operation;
24+
Variables = context.Variables;
25+
CancellationToken = context.CancellationToken;
26+
Metrics = context.Metrics;
27+
Errors = context.Errors;
28+
}
29+
}
30+
31+
public class ResolveEventStreamContext : ResolveEventStreamContext<object>
32+
{
33+
internal ResolveEventStreamContext<TSourceType> As<TSourceType>()
34+
{
35+
return new ResolveEventStreamContext<TSourceType>(this);
36+
}
737
}
838
}

0 commit comments

Comments
 (0)