Skip to content

Added implementation of Category, Event Type and Stream secondary indexes. #5100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added Category Index imlementation
  • Loading branch information
oskardudycz committed May 23, 2025
commit 3affe99c7c2dcdef50818e74c5f3a7943f37441d
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System.Text;
using KurrentDB.SecondaryIndexing.Indices.Category;
using KurrentDB.SecondaryIndexing.Indices.Default;
using KurrentDB.SecondaryIndexing.Tests.IntegrationTests.Fixtures;
using Xunit.Abstractions;
Expand Down Expand Up @@ -29,10 +31,25 @@ ITestOutputHelper output

[Fact]
public async Task ReadsIndexStream_ForEnabledPlugin() {
var appendResult = await fixture.AppendToStream(RandomStreamName(), _expectedEventData);
// Given
var streamName = RandomStreamName();
var appendResult = await fixture.AppendToStream(streamName, _expectedEventData);

var readResult = await fixture.ReadUntil(DefaultIndexConstants.IndexName, appendResult.Position);
// When
var allReadResult = await fixture.ReadUntil(DefaultIndexConstants.IndexName, appendResult.Position);
var categoryReadResult = await fixture.ReadUntil($"{CategoryIndexConstants.IndexPrefix}test", appendResult.Position);

Assert.NotEmpty(readResult);
// Then
Assert.NotEmpty(allReadResult);
Assert.NotEmpty(categoryReadResult);

var allResults = allReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();
var categoryResults = allReadResult.Where(e => e.Event.EventStreamId == streamName).ToList();

Assert.Equal(_expectedEventData.Length, allResults.Count);
Assert.Equal(_expectedEventData.Length, categoryResults.Count);

Assert.All(allResults, e => Assert.Contains(e.Event.DebugDataView, _expectedEventData));
Assert.All(categoryResults, e => Assert.Contains(e.Event.DebugDataView, _expectedEventData));
}
}
38 changes: 38 additions & 0 deletions src/KurrentDB.SecondaryIndexing/Indices/Category/CategoryIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).

using Kurrent.Quack;
using KurrentDB.Core.Services;
using KurrentDB.Core.Services.Storage.InMemory;
using KurrentDB.Core.Services.Storage.ReaderIndex;
using KurrentDB.SecondaryIndexing.Storage;

namespace KurrentDB.SecondaryIndexing.Indices.Category;

internal static class CategoryIndexConstants {
public const string IndexPrefix = $"{SystemStreams.IndexStreamPrefix}ce-";
}

internal class CategoryIndex<TStreamId> : ISecondaryIndex {
private readonly CategoryIndexProcessor<TStreamId> _processor;

public CategoryIndex(DuckDbDataSource db, DuckDBAdvancedConnection connection, IReadIndex<TStreamId> readIndex) {
_processor = new CategoryIndexProcessor<TStreamId>(connection);
Readers = [new CategoryIndexReader<TStreamId>(db, _processor, readIndex)];
}

public void Init() {
}

public ulong? GetLastPosition() =>
(ulong)_processor.LastCommittedPosition;

public ulong? GetLastSequence() => (ulong)_processor.Seq;

public ISecondaryIndexProcessor Processor => _processor;
public IReadOnlyList<IVirtualStreamReader> Readers { get; }

public SequenceRecord LastIndexed => _processor.LastIndexed;

public void Dispose() { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using DotNext;
using Kurrent.Quack;
using KurrentDB.Core.Data;
using KurrentDB.SecondaryIndexing.Storage;
using static KurrentDB.SecondaryIndexing.Indices.Category.CategorySql;

namespace KurrentDB.SecondaryIndexing.Indices.Category;

internal class CategoryIndexProcessor<TStreamId> : Disposable, ISecondaryIndexProcessor {
private readonly Dictionary<string, long> _categories;
private readonly Dictionary<long, long> _categorySizes = new();
private long _lastLogPosition;
private readonly Appender _appender;

public long Seq { get; private set; }
public long LastCommittedPosition { get; private set; }
public SequenceRecord LastIndexed { get; private set; }

public CategoryIndexProcessor(DuckDBAdvancedConnection connection) {
_appender = new Appender(connection, "category"u8);

var ids = connection.Query<ReferenceRecord, QueryCategorySql>();
_categories = ids.ToDictionary(x => x.name, x => x.id);

foreach (var id in ids) {
_categorySizes[id.id] = -1;
}

var sequences = connection.Query<(long Id, long Sequence), QueryCategoriesMaxSequencesSql>();
foreach (var sequence in sequences) {
_categorySizes[sequence.Id] = sequence.Sequence;
}

Seq = _categories.Count > 0 ? _categories.Values.Max() : 0;
}

public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token = default) {
if (IsDisposingOrDisposed)
return ValueTask.CompletedTask;

var categoryName = GetStreamCategory(resolvedEvent.OriginalStreamId);
_lastLogPosition = resolvedEvent.Event.LogPosition;

if (_categories.TryGetValue(categoryName, out var categoryId)) {
var next = _categorySizes[categoryId] + 1;
_categorySizes[categoryId] = next;

LastIndexed = new SequenceRecord(categoryId, next);
return ValueTask.CompletedTask;
}

var id = ++Seq;

_categories[categoryName] = id;
_categorySizes[id] = 0;

using (var row = _appender.CreateRow()) {
row.Append(id);
row.Append(categoryName);
}

_lastLogPosition = resolvedEvent.Event.LogPosition;
LastIndexed = new SequenceRecord(id, 0);

return ValueTask.CompletedTask;
}

public long GetLastEventNumber(long categoryId) =>
_categorySizes.TryGetValue(categoryId, out var size) ? size : ExpectedVersion.NoStream;

public long GetCategoryId(string categoryName) =>
_categories.TryGetValue(categoryName, out var categoryId) ? categoryId : ExpectedVersion.NoStream;

public ValueTask Commit(CancellationToken token = default) {
if (IsDisposingOrDisposed)
return ValueTask.CompletedTask;

_appender.Flush();
LastCommittedPosition = _lastLogPosition;

return ValueTask.CompletedTask;
}

private static string GetStreamCategory(string streamName) {
var dashIndex = streamName.IndexOf('-');
return dashIndex == -1 ? streamName : streamName[..dashIndex];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).

using KurrentDB.Core.Data;
using KurrentDB.Core.Services.Storage.ReaderIndex;
using KurrentDB.SecondaryIndexing.Indices.DuckDb;
using KurrentDB.SecondaryIndexing.Metrics;
using KurrentDB.SecondaryIndexing.Storage;
using static KurrentDB.SecondaryIndexing.Indices.Category.CategorySql;
using static KurrentDB.SecondaryIndexing.Indices.Category.CategoryIndexConstants;

namespace KurrentDB.SecondaryIndexing.Indices.Category;

internal class CategoryIndexReader<TStreamId>(
DuckDbDataSource db,
CategoryIndexProcessor<TStreamId> processor,
IReadIndex<TStreamId> index
)
: DuckDbIndexReader<TStreamId>(index) {

protected override long GetId(string streamName) {
if (!streamName.StartsWith(IndexPrefix)) {
return ExpectedVersion.Invalid;
}

var categoryName = streamName[8..];
return processor.GetCategoryId(categoryName);
}

protected override long GetLastSequence(long id) => processor.GetLastEventNumber(id);

protected override IEnumerable<IndexedPrepare> GetIndexRecords(long id, long fromEventNumber, long toEventNumber)
=> GetRecords(id, fromEventNumber, toEventNumber);

public override long GetLastIndexedPosition(string streamId) =>
processor.LastCommittedPosition;

public override bool CanReadStream(string streamId) =>
streamId.StartsWith(IndexPrefix);

private IEnumerable<IndexedPrepare> GetRecords(long id, long fromEventNumber, long toEventNumber) {
var range = QueryCategoryIndex(id, fromEventNumber, toEventNumber);
var indexPrepares = range.Select(x => new IndexedPrepare(x.category_seq, x.event_number, x.log_position));
return indexPrepares;
}

private List<CategoryRecord> QueryCategoryIndex(long id, long fromEventNumber, long toEventNumber) {
using var duration = SecondaryIndexMetrics.MeasureIndex("duck_get_cat_range");
return db.Pool.Query<(long, long, long), CategoryRecord, QueryCategoryIndexSql>((id, fromEventNumber, toEventNumber));
}
}
47 changes: 47 additions & 0 deletions src/KurrentDB.SecondaryIndexing/Indices/Category/CategorySql.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using Kurrent.Quack;
using KurrentDB.SecondaryIndexing.Storage;

namespace KurrentDB.SecondaryIndexing.Indices.Category;

internal static class CategorySql {
public struct QueryCategoryIndexSql : IQuery<(long, long, long), CategoryRecord> {
public static BindingContext Bind(in (long, long, long) args, PreparedStatement statement) => new(statement) {
args.Item1,
args.Item2,
args.Item3
};

public static ReadOnlySpan<byte> CommandText =>
"""
select category_seq, log_position, event_number
from idx_all where category=$1 and category_seq>=$2 and category_seq<=$3
"""u8;

public static CategoryRecord Parse(ref DataChunk.Row row) => new() {
category_seq = row.ReadInt32(),
log_position = row.ReadInt64(),
event_number = row.ReadInt32(),
};
}

public struct QueryCategorySql : IQuery<ReferenceRecord> {
public static ReadOnlySpan<byte> CommandText =>
"select id, name from category"u8;

public static ReferenceRecord Parse(ref DataChunk.Row row) => new() {
id = row.ReadInt32(),
name = row.ReadString()
};
}

public struct QueryCategoriesMaxSequencesSql : IQuery<(long Id, long Sequence)> {
public static ReadOnlySpan<byte> CommandText =>
"SELECT category, max(category_seq) FROM idx_all GROUP BY category"u8;

public static (long Id, long Sequence) Parse(ref DataChunk.Row row) =>
(row.ReadInt64(), row.ReadInt64());
}
}
14 changes: 11 additions & 3 deletions src/KurrentDB.SecondaryIndexing/Indices/Default/DefaultIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,38 @@
using KurrentDB.Core.Services;
using KurrentDB.Core.Services.Storage.InMemory;
using KurrentDB.Core.Services.Storage.ReaderIndex;
using KurrentDB.SecondaryIndexing.Indices.Category;
using KurrentDB.SecondaryIndexing.Storage;

namespace KurrentDB.SecondaryIndexing.Indices.Default;

public static class DefaultIndexConstants {
internal static class DefaultIndexConstants {
public const string IndexName = $"{SystemStreams.IndexStreamPrefix}all";
}

public class DefaultIndex<TStreamId> : Disposable, ISecondaryIndex {
internal class DefaultIndex<TStreamId> : Disposable, ISecondaryIndex {
private readonly DuckDbDataSource _db;

public ISecondaryIndexProcessor Processor { get; }

public IReadOnlyList<IVirtualStreamReader> Readers { get; }

public CategoryIndex<TStreamId> CategoryIndex { get; }

public DefaultIndex(DuckDbDataSource db, IReadIndex<TStreamId> readIndex) {
_db = db;
_db.InitDb();

var connection = db.OpenNewConnection();

CategoryIndex = new CategoryIndex<TStreamId>(db, connection, readIndex);

var processor = new DefaultSecondaryIndexProcessor<TStreamId>(connection, this);
Processor = processor;
Readers = [new DefaultIndexReader<TStreamId>(db, processor, readIndex)];
Readers = [
new DefaultIndexReader<TStreamId>(db, processor, readIndex),
..CategoryIndex.Readers
];
}

public void Init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private struct QueryDefaultIndexSql : IQuery<(long, long), AllRecord> {
};

public static ReadOnlySpan<byte> CommandText =>
"select seq, log_position, event_number from idx_all where seq>=1 and seq<=$2"u8;
"select seq, log_position, event_number from idx_all where seq>=$1 and seq<=$2"u8;

public static AllRecord Parse(ref DataChunk.Row row) => new() {
seq = row.ReadInt64(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

namespace KurrentDB.SecondaryIndexing.Indices.Default;

public class DefaultSecondaryIndexProcessor<TStreamId> : Disposable, ISecondaryIndexProcessor {
internal class DefaultSecondaryIndexProcessor<TStreamId> : Disposable, ISecondaryIndexProcessor {
private readonly DefaultIndex<TStreamId> _defaultIndex;
private long _lastLogPosition;
private ulong _seq;
private int _page;
Expand All @@ -20,17 +21,26 @@ public class DefaultSecondaryIndexProcessor<TStreamId> : Disposable, ISecondaryI
public long LastCommittedPosition { get; private set; }
public long LastSequence => (long)_seq;

public DefaultSecondaryIndexProcessor(DuckDBAdvancedConnection connection, DefaultIndex<TStreamId> defaultIndex) {
public DefaultSecondaryIndexProcessor(
DuckDBAdvancedConnection connection,
DefaultIndex<TStreamId> defaultIndex
) {
_defaultIndex = defaultIndex;

_appender = new Appender(connection, "idx_all"u8);

var lastPosition = defaultIndex.GetLastSequence();
Logger.Information("Last known global sequence: {Seq}", lastPosition);
_seq = lastPosition.HasValue ? lastPosition.Value + 1 : 0;
}

public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token = default) {
public async ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token = default) {
if (IsDisposingOrDisposed)
return ValueTask.CompletedTask;
return;

await _defaultIndex.CategoryIndex.Processor.Index(resolvedEvent, token);

var category = _defaultIndex.CategoryIndex.LastIndexed;

using (var row = _appender.CreateRow()) {
row.Append(_seq++);
Expand All @@ -40,25 +50,23 @@ public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token = de
row.Append(0);
row.Append(0);
row.Append(0);
row.Append(0);
row.Append(0);
row.Append((int)category.Id);
row.Append(category.Sequence);
}

_lastLogPosition = resolvedEvent.Event.LogPosition;
_page++;

return ValueTask.CompletedTask;
}

public ValueTask Commit(CancellationToken token = default) {
public async ValueTask Commit(CancellationToken token = default) {
if (IsDisposingOrDisposed)
return ValueTask.CompletedTask;
return;

_appender.Flush();
Logger.Debug("Committed {Count} records to index at sequence {Seq}", _page, _seq);
_page = 0;
LastCommittedPosition = _lastLogPosition;

return ValueTask.CompletedTask;
await _defaultIndex.CategoryIndex.Processor.Commit(token);
}
}