Skip to content

Commit cb26085

Browse files
add regression test for off by one error in subscriptions
1 parent 2730403 commit cb26085

File tree

2 files changed

+132
-1
lines changed

2 files changed

+132
-1
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
using Xunit;
7+
using Xunit.Abstractions;
8+
9+
#pragma warning disable 1998
10+
11+
namespace EventStore.Client.Bugs {
12+
public class Issue_2544 : IAsyncLifetime {
13+
private const int BatchSize = 18;
14+
private const int Batches = 4;
15+
private readonly Fixture _fixture;
16+
private readonly Dictionary<StreamPosition, bool> _seen;
17+
private readonly TaskCompletionSource<bool> _completed;
18+
19+
public Issue_2544(ITestOutputHelper outputHelper) {
20+
_fixture = new Fixture();
21+
_fixture.CaptureLogs(outputHelper);
22+
_seen = Enumerable.Range(0, 1 + Batches * BatchSize)
23+
.Select(i => new StreamPosition((ulong)i))
24+
.ToDictionary(r => r, _ => false);
25+
_completed = new TaskCompletionSource<bool>();
26+
}
27+
28+
public static IEnumerable<object[]> TestCases() => Enumerable.Range(0, 5)
29+
.Select(i => new object[] {i});
30+
31+
[Theory, MemberData(nameof(TestCases))]
32+
public async Task subscribe_to_stream(int iteration) {
33+
var streamName = $"{_fixture.GetStreamName()}_{iteration}";
34+
35+
using var _ = await _fixture.Client.SubscribeToStreamAsync(streamName,
36+
(_, e, ct) => EventAppeared(e, streamName), subscriptionDropped: SubscriptionDropped);
37+
38+
await AppendEvents(streamName);
39+
40+
await _completed.Task.WithTimeout();
41+
}
42+
43+
[Theory, MemberData(nameof(TestCases))]
44+
public async Task subscribe_to_all(int iteration) {
45+
var streamName = $"{_fixture.GetStreamName()}_{iteration}";
46+
47+
using var _ = await _fixture.Client.SubscribeToAllAsync((_, e, ct) => EventAppeared(e, streamName),
48+
subscriptionDropped: SubscriptionDropped);
49+
50+
await AppendEvents(streamName);
51+
52+
await _completed.Task.WithTimeout();
53+
}
54+
55+
[Theory, MemberData(nameof(TestCases))]
56+
public async Task subscribe_to_all_filtered(int iteration) {
57+
var streamName = $"{_fixture.GetStreamName()}_{iteration}";
58+
59+
using var _ = await _fixture.Client.SubscribeToAllAsync((_, e, ct) => EventAppeared(e, streamName),
60+
subscriptionDropped: SubscriptionDropped,
61+
filterOptions: new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents()));
62+
63+
await AppendEvents(streamName);
64+
65+
await _completed.Task.WithTimeout();
66+
}
67+
68+
private async Task AppendEvents(string streamName) {
69+
await Task.Delay(TimeSpan.FromMilliseconds(10));
70+
71+
var expectedRevision = StreamRevision.None;
72+
73+
for (var i = 0; i < Batches; i++) {
74+
if (expectedRevision == StreamRevision.None) {
75+
var result = await _fixture.Client.AppendToStreamAsync(streamName, StreamState.NoStream,
76+
_fixture.CreateTestEvents(BatchSize));
77+
expectedRevision = result.NextExpectedStreamRevision;
78+
} else {
79+
var result = await _fixture.Client.AppendToStreamAsync(streamName, expectedRevision,
80+
_fixture.CreateTestEvents(BatchSize));
81+
expectedRevision = result.NextExpectedStreamRevision;
82+
}
83+
84+
await Task.Delay(TimeSpan.FromMilliseconds(10));
85+
}
86+
87+
await _fixture.Client.AppendToStreamAsync(streamName, expectedRevision, new[] {
88+
new EventData(Uuid.NewUuid(), "completed", Array.Empty<byte>(), contentType: "application/octet-stream")
89+
});
90+
}
91+
92+
private void SubscriptionDropped(StreamSubscription _, SubscriptionDroppedReason reason, Exception ex) {
93+
if (ex == null) return;
94+
_completed.TrySetException(ex);
95+
}
96+
97+
private Task EventAppeared(ResolvedEvent e, string streamName) {
98+
if (e.OriginalStreamId != streamName) {
99+
return Task.CompletedTask;
100+
}
101+
102+
if (_seen[e.Event.EventNumber]) {
103+
throw new Exception($"Event {e.Event.EventNumber} was already seen");
104+
}
105+
106+
_seen[e.Event.EventNumber] = true;
107+
if (e.Event.EventType == "completed") {
108+
_completed.TrySetResult(true);
109+
}
110+
111+
return Task.CompletedTask;
112+
}
113+
114+
public Task InitializeAsync() => _fixture.InitializeAsync();
115+
116+
public Task DisposeAsync() => _fixture.DisposeAsync();
117+
118+
public class Fixture : EventStoreClientFixture {
119+
public Fixture() : base(env: new Dictionary<string, string> {
120+
["EVENTSTORE_LOG_LEVEL"] = "Verbose"
121+
}) {
122+
}
123+
124+
protected override Task Given() => Client.SetStreamMetadataAsync("$all", StreamState.Any,
125+
new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root);
126+
127+
protected override Task When() => Task.CompletedTask;
128+
}
129+
}
130+
}

test/EventStore.Client.Tests.Common/EventStoreClientFixtureBase.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ public EventStoreTestServer(Uri address, IDictionary<string, string>? env) {
182182
["EVENTSTORE_MEM_DB"] = "true",
183183
["EVENTSTORE_CERTIFICATE_FILE"] = "/etc/eventstore/certs/node/node.crt",
184184
["EVENTSTORE_CERTIFICATE_PRIVATE_KEY_FILE"] = "/etc/eventstore/certs/node/node.key",
185-
["EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH"] = "/etc/eventstore/certs/ca"
185+
["EVENTSTORE_TRUSTED_ROOT_CERTIFICATES_PATH"] = "/etc/eventstore/certs/ca",
186+
["EVENTSTORE_LOG_LEVEL"] = "Verbose"
186187
}.Select(pair => $"{pair.Key}={pair.Value}").ToArray())
187188
.WithName(ContainerName)
188189
.MountVolume(HostCertificatePath, "/etc/eventstore/certs", MountType.ReadOnly)

0 commit comments

Comments
 (0)