Skip to content

Commit 31cee81

Browse files
committed
Added stream index implementation
1 parent 5571322 commit 31cee81

File tree

9 files changed

+182
-39
lines changed

9 files changed

+182
-39
lines changed

src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/Fixtures/SecondaryIndexingFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public async Task<List<ResolvedEvent>> ReadUntil(
5656
TimeSpan? timeout = null,
5757
CancellationToken ct = default
5858
) {
59-
timeout ??= TimeSpan.FromMilliseconds(5000);
59+
timeout ??= TimeSpan.FromMilliseconds(10000);
6060
var endTime = DateTime.UtcNow.Add(timeout.Value);
6161

6262
var events = new List<ResolvedEvent>();
@@ -68,7 +68,7 @@ public async Task<List<ResolvedEvent>> ReadUntil(
6868
try {
6969
events = await ReadStream(streamName, ct).ToListAsync(ct);
7070

71-
reachedPosition = events.Count != 0 && events.Last().Event.LogPosition <= (long)position.CommitPosition;
71+
reachedPosition = events.Count != 0 && events.Last().Event.LogPosition >= (long)position.CommitPosition;
7272
} catch (ReadResponseException.StreamNotFound ex) {
7373
streamNotFound = ex;
7474
}

src/KurrentDB.SecondaryIndexing.Tests/IntegrationTests/SecondaryIndexingPluginEnabledIntegrationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public async Task ReadsIndexStream_ForEnabledPlugin() {
5656
Assert.NotEmpty(eventTypeReadResult);
5757

5858
var allResults = allReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();
59-
var categoryResults = allReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();
60-
var eventTypeResults = allReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();
59+
var categoryResults = categoryReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();
60+
var eventTypeResults = eventTypeReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();
6161

6262
Assert.Equal(_expectedEventData.Length, allResults.Count);
6363
Assert.Equal(_expectedEventData.Length, categoryResults.Count);

src/KurrentDB.SecondaryIndexing/Indices/Default/DefaultIndex.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using KurrentDB.Core.Services.Storage.ReaderIndex;
99
using KurrentDB.SecondaryIndexing.Indices.Category;
1010
using KurrentDB.SecondaryIndexing.Indices.EventType;
11+
using KurrentDB.SecondaryIndexing.Indices.Stream;
1112
using KurrentDB.SecondaryIndexing.Storage;
1213

1314
namespace KurrentDB.SecondaryIndexing.Indices.Default;
@@ -25,6 +26,10 @@ internal class DefaultIndex<TStreamId> : Disposable, ISecondaryIndex {
2526

2627
public CategoryIndex<TStreamId> CategoryIndex { get; }
2728

29+
public EventTypeIndex<TStreamId> EventTypeIndex { get; set; }
30+
31+
public StreamIndex StreamIndex { get; set; }
32+
2833
public DefaultIndex(DuckDbDataSource db, IReadIndex<TStreamId> readIndex) {
2934
_db = db;
3035
_db.InitDb();
@@ -33,6 +38,7 @@ public DefaultIndex(DuckDbDataSource db, IReadIndex<TStreamId> readIndex) {
3338

3439
CategoryIndex = new CategoryIndex<TStreamId>(db, connection, readIndex);
3540
EventTypeIndex = new EventTypeIndex<TStreamId>(db, connection, readIndex);
41+
StreamIndex = new StreamIndex(connection);
3642

3743
var processor = new DefaultSecondaryIndexProcessor<TStreamId>(connection, this);
3844
Processor = processor;
@@ -43,8 +49,6 @@ public DefaultIndex(DuckDbDataSource db, IReadIndex<TStreamId> readIndex) {
4349
];
4450
}
4551

46-
public EventTypeIndex<TStreamId> EventTypeIndex { get; set; }
47-
4852
public void Init() {
4953
}
5054

src/KurrentDB.SecondaryIndexing/Indices/Default/DefaultSecondaryIndexProcessor.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,18 @@ public async ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken toke
4040

4141
await _defaultIndex.CategoryIndex.Processor.Index(resolvedEvent, token);
4242
await _defaultIndex.EventTypeIndex.Processor.Index(resolvedEvent, token);
43+
await _defaultIndex.StreamIndex.Processor.Index(resolvedEvent, token);
4344

4445
var category = _defaultIndex.CategoryIndex.LastIndexed;
4546
var eventType = _defaultIndex.EventTypeIndex.LastIndexed;
47+
var streamId = _defaultIndex.StreamIndex.LastIndexed;
4648

4749
using (var row = _appender.CreateRow()) {
4850
row.Append(_seq++);
4951
row.Append((int)resolvedEvent.Event.EventNumber);
5052
row.Append(resolvedEvent.Event.LogPosition);
5153
row.Append(resolvedEvent.Event.TimeStamp);
52-
row.Append(0);
54+
row.Append(streamId);
5355
row.Append((int)eventType.Id);
5456
row.Append(eventType.Sequence);
5557
row.Append((int)category.Id);
@@ -71,5 +73,6 @@ public async ValueTask Commit(CancellationToken token = default) {
7173

7274
await _defaultIndex.CategoryIndex.Processor.Commit(token);
7375
await _defaultIndex.EventTypeIndex.Processor.Commit(token);
76+
await _defaultIndex.StreamIndex.Processor.Commit(token);
7477
}
7578
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
2+
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
3+
4+
using Kurrent.Quack;
5+
using KurrentDB.Core.Services.Storage.InMemory;
6+
7+
namespace KurrentDB.SecondaryIndexing.Indices.Stream;
8+
9+
internal class StreamIndex(DuckDBAdvancedConnection connection) : ISecondaryIndex {
10+
private readonly StreamIndexProcessor _processor = new(connection);
11+
12+
public void Init() {
13+
}
14+
15+
public ulong? GetLastPosition() =>
16+
(ulong)_processor.LastCommittedPosition;
17+
18+
public ulong? GetLastSequence() => (ulong)_processor.Seq;
19+
20+
public ISecondaryIndexProcessor Processor => _processor;
21+
public IReadOnlyList<IVirtualStreamReader> Readers { get; } = [];
22+
23+
public long LastIndexed => _processor.LastIndexed;
24+
25+
public void Dispose() { }
26+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
2+
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
3+
4+
using DotNext;
5+
using Kurrent.Quack;
6+
using KurrentDB.Core.Data;
7+
using KurrentDB.SecondaryIndexing.Storage;
8+
using Microsoft.Extensions.Caching.Memory;
9+
using static KurrentDB.SecondaryIndexing.Indices.Stream.StreamSql;
10+
11+
namespace KurrentDB.SecondaryIndexing.Indices.Stream;
12+
13+
internal class StreamIndexProcessor(DuckDBAdvancedConnection connection) : Disposable, ISecondaryIndexProcessor {
14+
private readonly MemoryCache _streamIdCache = new(new MemoryCacheOptions());
15+
private readonly MemoryCacheEntryOptions _options = new() { SlidingExpiration = TimeSpan.FromMinutes(10) };
16+
private long _lastLogPosition;
17+
private readonly Appender _appender = new(connection, "streams"u8);
18+
public long Seq { get; private set; } = connection.QueryFirstOrDefault<long, QueryStreamsMaxSequencesSql>() ?? 0;
19+
public long LastCommittedPosition { get; private set; }
20+
public long LastIndexed { get; private set; }
21+
22+
public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token = default) {
23+
if (IsDisposingOrDisposed)
24+
return ValueTask.CompletedTask;
25+
26+
var name = resolvedEvent.OriginalStreamId;
27+
_lastLogPosition = resolvedEvent.Event.LogPosition;
28+
29+
if (_streamIdCache.TryGetValue(name, out var existing)) {
30+
LastIndexed = (long)existing!;
31+
return ValueTask.CompletedTask;
32+
}
33+
34+
var fromDb = connection.QueryFirstOrDefault<QueryStreamArgs, long, QueryStreamIdSql>(
35+
new QueryStreamArgs { StreamName = name }
36+
);
37+
if (fromDb.HasValue) {
38+
_streamIdCache.Set(name, fromDb, _options);
39+
LastIndexed = fromDb.Value;
40+
return ValueTask.CompletedTask;
41+
}
42+
43+
var id = ++Seq;
44+
_streamIdCache.Set(name, id, _options);
45+
46+
using var row = _appender.CreateRow();
47+
row.Append(id);
48+
row.Append(name);
49+
row.AppendDefault();
50+
row.AppendDefault();
51+
52+
LastIndexed = id;
53+
return ValueTask.CompletedTask;
54+
}
55+
56+
public ValueTask Commit(CancellationToken token = default) {
57+
if (IsDisposingOrDisposed)
58+
return ValueTask.CompletedTask;
59+
60+
_appender.Flush();
61+
LastCommittedPosition = _lastLogPosition;
62+
63+
return ValueTask.CompletedTask;
64+
}
65+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
2+
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
3+
4+
using Kurrent.Quack;
5+
6+
namespace KurrentDB.SecondaryIndexing.Indices.Stream;
7+
8+
internal static class StreamSql {
9+
public struct QueryStreamArgs {
10+
public string StreamName { get; set; }
11+
}
12+
13+
public struct QueryStreamIdSql : IQuery<QueryStreamArgs, long> {
14+
public static BindingContext Bind(in QueryStreamArgs args, PreparedStatement statement) => new(statement) {
15+
args.StreamName
16+
};
17+
18+
public static ReadOnlySpan<byte> CommandText =>
19+
"select id from streams where name=$1"u8;
20+
21+
public static long Parse(ref DataChunk.Row row) =>
22+
row.ReadInt64();
23+
}
24+
25+
public struct QueryStreamsMaxSequencesSql : IQuery<long> {
26+
public static ReadOnlySpan<byte> CommandText =>
27+
"select max(id) from streams"u8;
28+
29+
public static long Parse(ref DataChunk.Row row) =>
30+
row.ReadInt64();
31+
}
32+
}

src/KurrentDB.SecondaryIndexing/Storage/1_Schema.sql

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
11
create table if not exists event_type (
2-
id int4 primary key,
3-
name varchar,
4-
unique(name)
2+
id int4 primary key,
3+
name varchar not null,
4+
unique(name)
55
);
66

77
create table if not exists category (
8-
id int4 primary key,
9-
name varchar,
10-
unique(name)
8+
id int4 primary key,
9+
name varchar not null,
10+
unique(name)
1111
);
1212

1313
create table if not exists streams (
14-
id ubigint primary key,
15-
name varchar,
16-
unique(name),
17-
max_age int4,
18-
max_count int4
14+
id ubigint primary key,
15+
name varchar not null,
16+
unique(name),
17+
max_age int4 DEFAULT NULL,
18+
max_count int4 DEFAULT NULL
1919
);
2020

2121
create table if not exists idx_all (
22-
seq ubigint,
23-
event_number int4,
24-
log_position ubigint,
25-
created timestamp,
26-
stream ubigint,
27-
event_type int4,
28-
event_type_seq int8,
29-
category int4,
30-
category_seq int8
22+
seq ubigint not null,
23+
event_number int4 not null,
24+
log_position ubigint not null,
25+
created timestamp not null,
26+
stream ubigint not null,
27+
event_type int4 not null,
28+
event_type_seq int8 not null,
29+
category int4 not null,
30+
category_seq int8 not null
3131
);
3232

3333
create index if not exists idx_all_category on idx_all(category, category_seq);
3434
create index if not exists idx_all_event_type on idx_all(event_type, category_seq);
3535
create index if not exists idx_sequence on idx_all(seq);
3636

3737
create or replace macro read_category(name, startAt, finishAt) as table
38-
select
39-
category_seq as seq,
38+
select
39+
category_seq as seq,
4040
event->>'stream_id' as stream_id,
4141
event_number,
4242
event->>'event_type' as event_type,
@@ -53,8 +53,8 @@ from (
5353
) order by category_seq;
5454

5555
create or replace macro read_all(position) as table
56-
select
57-
seq,
56+
select
57+
seq,
5858
event->>'stream_id' as stream_id,
5959
event_number,
6060
event->>'event_type' as event_type,
@@ -67,8 +67,8 @@ from (
6767
);
6868

6969
create or replace macro read_category(name, start, count) as table
70-
select
71-
category_seq as seq,
70+
select
71+
category_seq as seq,
7272
event->>'stream_id' as stream_id,
7373
event_number,
7474
event->>'event_type' as event_type,

src/KurrentDB.SecondaryIndexing/Storage/DuckDbExtensions.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,36 @@ public static class DuckDbExtensions {
1111
where TRow : struct
1212
where TQuery : IParameterlessStatement, IDataRowParser<TRow> {
1313
using (pool.Rent(out var connection)) {
14-
using var result = connection.ExecuteQuery<TRow, TQuery>().GetEnumerator();
15-
16-
return result.MoveNext() ? null : result.Current;
14+
return connection.QueryFirstOrDefault<TRow, TQuery>();
1715
}
1816
}
1917

18+
public static TRow? QueryFirstOrDefault<TRow, TQuery>(this DuckDBAdvancedConnection connection)
19+
where TRow : struct
20+
where TQuery : IParameterlessStatement, IDataRowParser<TRow> {
21+
using var result = connection.ExecuteQuery<TRow, TQuery>().GetEnumerator();
22+
23+
return result.MoveNext() ? null : result.Current;
24+
}
25+
2026
public static TRow? QueryFirstOrDefault<TArgs, TRow, TQuery>(this DuckDBConnectionPool pool, TArgs args)
2127
where TArgs : struct
2228
where TRow : struct
2329
where TQuery : IPreparedStatement<TArgs>, IDataRowParser<TRow> {
2430
using (pool.Rent(out var connection)) {
25-
using var result = connection.ExecuteQuery<TArgs, TRow, TQuery>(args).GetEnumerator();
26-
27-
return result.MoveNext() ? null : result.Current;
31+
return connection.QueryFirstOrDefault<TArgs, TRow, TQuery>(args);
2832
}
2933
}
3034

35+
public static TRow? QueryFirstOrDefault<TArgs, TRow, TQuery>(this DuckDBAdvancedConnection connection, TArgs args)
36+
where TArgs : struct
37+
where TRow : struct
38+
where TQuery : IPreparedStatement<TArgs>, IDataRowParser<TRow> {
39+
using var result = connection.ExecuteQuery<TArgs, TRow, TQuery>(args).GetEnumerator();
40+
41+
return result.MoveNext() ? null : result.Current;
42+
}
43+
3144
public static List<TRow> Query<TRow, TQuery>(this DuckDBConnectionPool pool)
3245
where TRow : struct
3346
where TQuery : IDataRowParser<TRow>, IParameterlessStatement {

0 commit comments

Comments
 (0)