Skip to content

Commit f69a63a

Browse files
committed
[DEVEX-227] Updated ProjectState to keep dictionary of states instead of the single state
That gives possibility to cache multiple single stream projection states instead of rolling always a single one
1 parent 3644406 commit f69a63a

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

src/Kurrent.Client/Streams/GettingState/StateBuilder.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,22 @@ public static async IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TS
180180
Func<TState, ResolvedEvent, TState> evolve,
181181
[EnumeratorCancellation] CancellationToken ct
182182
) where TState : notnull {
183-
var state = initialState;
184-
185183
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
186184
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) {
187-
yield return new StateAtPointInTime<TState>(state);
185+
yield return new StateAtPointInTime<TState>(initialState);
188186

189187
yield break;
190188
}
191189
}
192190

191+
var states = new Dictionary<string, TState>();
192+
193193
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
194+
var state = states.GetValueOrDefault(resolvedEvent.OriginalStreamId, initialState);
194195
state = evolve(state, resolvedEvent);
195196

197+
states[resolvedEvent.OriginalStreamId] = state;
198+
196199
yield return new StateAtPointInTime<TState>(
197200
state,
198201
resolvedEvent.Event.EventNumber,
@@ -210,14 +213,13 @@ public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
210213
) where TState : notnull {
211214
StateAtPointInTime<TState>? stateAtPointInTime = null;
212215

213-
options ??= new GetStreamStateOptions<TState> ();
214-
215-
if (options.GetSnapshot != null) {
216+
options ??= new GetStreamStateOptions<TState>();
217+
218+
if (options.GetSnapshot != null)
216219
stateAtPointInTime = await options.GetSnapshot(
217220
GetSnapshotOptions.ForStream(streamName),
218221
ct
219222
);
220-
}
221223

222224
options.StreamPosition = stateAtPointInTime?.LastStreamPosition ?? StreamPosition.Start;
223225

0 commit comments

Comments
 (0)