Skip to content

Commit 9c402e5

Browse files
authored
Refactor bulk reads to use CursorAsync (#409)
1 parent c3aa78c commit 9c402e5

File tree

6 files changed

+57
-13
lines changed

6 files changed

+57
-13
lines changed

src/Akka.Persistence.MongoDb.Hosting/MongoDbJournalOptions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ public MongoDbJournalOptions(bool isDefaultPlugin, string identifier = "mongodb"
4646
/// Read Transaction
4747
/// </summary>
4848
public bool? UseReadTransaction { get; set; }
49+
50+
/// <summary>
51+
/// Set the batch size for read operations.
52+
/// If not set (null), this will use the default MongoDb behavior where all queries
53+
/// will have a batch size of 101 on the first page read and then will try to read
54+
/// as many documents as possible that fits the 16 MeBi limit.
55+
/// </summary>
56+
public int? ReadBatchSize { get; set; }
4957

5058
/// <summary>
5159
/// When true, enables BSON serialization (which breaks features like Akka.Cluster.Sharding, AtLeastOnceDelivery, and so on.)
@@ -82,6 +90,8 @@ protected override StringBuilder Build(StringBuilder sb)
8290
if(UseReadTransaction is not null)
8391
sb.AppendLine($"use-read-transaction = {UseReadTransaction.ToHocon()}");
8492

93+
sb.AppendLine($"read-batch-size = {(ReadBatchSize is not null ? ReadBatchSize.ToHocon() : "off")}");
94+
8595
if(LegacySerialization is not null)
8696
sb.AppendLine($"legacy-serialization = {LegacySerialization.ToHocon()}");
8797

src/Akka.Persistence.MongoDb.Tests/Hosting/MongoDbJournalOptionsSpec.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public void DefaultJournalOptionsTest()
4141
config.GetString("connection-string").Should().Be(baseConfig.GetString("connection-string"));
4242
config.GetBoolean("use-write-transaction").Should().Be(baseConfig.GetBoolean("use-write-transaction"));
4343
config.GetBoolean("use-read-transaction").Should().Be(baseConfig.GetBoolean("use-read-transaction"));
44+
config.GetBoolean("read-batch-size").Should().Be(baseConfig.GetBoolean("read-batch-size"));
4445
config.GetBoolean("auto-initialize").Should().Be(baseConfig.GetBoolean("auto-initialize"));
4546
config.GetString("plugin-dispatcher").Should().Be(baseConfig.GetString("plugin-dispatcher"));
4647
config.GetString("collection").Should().Be(baseConfig.GetString("collection"));
@@ -68,6 +69,7 @@ public void CustomIdJournalOptionsTest()
6869
config.GetString("connection-string").Should().Be(baseConfig.GetString("connection-string"));
6970
config.GetBoolean("use-write-transaction").Should().Be(baseConfig.GetBoolean("use-write-transaction"));
7071
config.GetBoolean("use-read-transaction").Should().Be(baseConfig.GetBoolean("use-read-transaction"));
72+
config.GetBoolean("read-batch-size").Should().Be(baseConfig.GetBoolean("read-batch-size"));
7173
config.GetBoolean("auto-initialize").Should().Be(baseConfig.GetBoolean("auto-initialize"));
7274
config.GetString("plugin-dispatcher").Should().Be(baseConfig.GetString("plugin-dispatcher"));
7375
config.GetString("collection").Should().Be(baseConfig.GetString("collection"));
@@ -88,6 +90,7 @@ public void JournalOptionsTest()
8890
MetadataCollection = "metadataCollection",
8991
UseWriteTransaction = true,
9092
UseReadTransaction = true,
93+
ReadBatchSize = 16,
9194
LegacySerialization = true,
9295
CallTimeout = TimeSpan.FromHours(2)
9396
};
@@ -104,6 +107,9 @@ public void JournalOptionsTest()
104107
config.GetString("metadata-collection").Should().Be(options.MetadataCollection);
105108
config.GetBoolean("use-write-transaction").Should().Be(options.UseWriteTransaction.Value);
106109
config.GetBoolean("use-read-transaction").Should().Be(options.UseReadTransaction.Value);
110+
config.GetString("read-batch-size").ToLowerInvariant().Should().NotBe("off");
111+
config.GetString("read-batch-size").ToLowerInvariant().Should().NotBe("false");
112+
config.GetInt("read-batch-size").Should().Be(options.ReadBatchSize.Value);
107113
config.GetBoolean("legacy-serialization").Should().Be(options.LegacySerialization.Value);
108114
config.GetTimeSpan("call-timeout").Should().Be(options.CallTimeout.Value);
109115
}
@@ -121,6 +127,7 @@ public void JournalOptionsTest()
121127
""ConnectionString"": ""mongodb://localhost:27017"",
122128
""UseWriteTransaction"": ""true"",
123129
""UseReadTransaction"": ""true"",
130+
""ReadBatchSize"": 16,
124131
""Identifier"": ""custommongodb"",
125132
""AutoInitialize"": true,
126133
""IsDefaultPlugin"": false,
@@ -143,6 +150,7 @@ public void JournalOptionsIConfigurationBindingTest()
143150
options.ConnectionString.Should().Be("mongodb://localhost:27017");
144151
options.UseWriteTransaction.Should().BeTrue();
145152
options.UseReadTransaction.Should().BeTrue();
153+
options.ReadBatchSize.Should().Be(16);
146154
options.Identifier.Should().Be("custommongodb");
147155
options.AutoInitialize.Should().BeTrue();
148156
options.IsDefaultPlugin.Should().BeFalse();

src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class MongoDbJournal : AsyncWriteJournal
4444
private readonly CancellationTokenSource _pendingCommandsCancellation = new();
4545

4646
private readonly Akka.Serialization.Serialization _serialization;
47+
48+
private readonly FindOptions? _findOptions;
4749

4850
public MongoDbJournal() : this(MongoDbPersistence.Get(Context.System).JournalSettings)
4951
{
@@ -58,6 +60,11 @@ private MongoDbJournal(MongoDbJournalSettings settings)
5860
{
5961
_settings = settings;
6062
_serialization = Context.System.Serialization;
63+
if (_settings.ReadBatchSize is not null)
64+
_findOptions = new FindOptions
65+
{
66+
BatchSize = settings.ReadBatchSize,
67+
};
6168
}
6269

6370
private IMongoDatabase GetMongoDb()
@@ -215,11 +222,11 @@ public override async Task ReplayMessagesAsync(
215222

216223
await MaybeReadWithTransaction(async (session, ct) =>
217224
{
218-
var collections = await journalCollection
219-
.ReplayMessagesQuery(session, persistenceId, fromSequenceNr, toSequenceNr, limitValue)
220-
.ToListAsync(ct);
225+
var cursor = await journalCollection
226+
.ReplayMessagesQuery(session, persistenceId, fromSequenceNr, toSequenceNr, limitValue, _findOptions)
227+
.ToCursorAsync(ct);
221228

222-
collections.ForEach(doc => recoveryCallback(ToPersistenceRepresentation(doc, sender)));
229+
await cursor.ForEachAsync(doc => recoveryCallback(ToPersistenceRepresentation(doc, sender)), ct);
223230
}, unitedCts.Token);
224231
}
225232

@@ -256,8 +263,11 @@ private async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
256263

257264
var maxOrderingId = maxSeqNoEntry.Value;
258265

259-
await journalCollection.MessagesQuery(s, fromSequenceNr, toSequenceNr, maxOrderingId, tag, limitValue)
260-
.ForEachAsync(entry =>
266+
var cursor = await journalCollection
267+
.MessagesQuery(s, fromSequenceNr, toSequenceNr, maxOrderingId, tag, limitValue, _findOptions)
268+
.ToCursorAsync(ct);
269+
270+
await cursor.ForEachAsync(entry =>
261271
{
262272
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
263273
foreach (var adapted in AdaptFromJournal(persistent))
@@ -579,8 +589,11 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
579589

580590
var maxOrderingId = maxSeqNoEntry.Value;
581591

582-
await journalCollection.MessagesQuery(session, fromSequenceNr, toSequenceNr, maxOrderingId, null, limitValue)
583-
.ForEachAsync(entry =>
592+
var cursor = await journalCollection
593+
.MessagesQuery(session, fromSequenceNr, toSequenceNr, maxOrderingId, null, limitValue, _findOptions)
594+
.ToCursorAsync(token);
595+
596+
await cursor.ForEachAsync(entry =>
584597
{
585598
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
586599
foreach (var adapted in AdaptFromJournal(persistent))

src/Akka.Persistence.MongoDb/Journal/MongoDbJournalQueries.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public static IFindFluent<JournalEntry, JournalEntry> ReplayMessagesQuery(
2929
string persistenceId,
3030
long fromSequenceNr,
3131
long toSequenceNr,
32-
int limit)
32+
int limit,
33+
FindOptions? findOptions)
3334
{
3435
var builder = Builders<JournalEntry>.Filter;
3536
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
@@ -40,7 +41,7 @@ public static IFindFluent<JournalEntry, JournalEntry> ReplayMessagesQuery(
4041

4142
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.SequenceNr);
4243

43-
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
44+
return (session is not null ? collection.Find(session, filter, findOptions) : collection.Find(filter, findOptions))
4445
.Sort(sort)
4546
.Limit(limit);
4647
}
@@ -85,7 +86,8 @@ public static IFindFluent<JournalEntry, JournalEntry> MessagesQuery(
8586
long toSequenceNr,
8687
long maxOrderingId,
8788
string? tag,
88-
int limit
89+
int limit,
90+
FindOptions? findOptions
8991
)
9092
{
9193
var builder = Builders<JournalEntry>.Filter;
@@ -101,7 +103,7 @@ int limit
101103

102104
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);
103105

104-
return (session is not null ? collection.Find(session, filter) : collection.Find(filter))
106+
return (session is not null ? collection.Find(session, filter, findOptions) : collection.Find(filter, findOptions))
105107
.Sort(sort)
106108
.Limit(limit);
107109
}

src/Akka.Persistence.MongoDb/MongoDbSettings.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ public class MongoDbJournalSettings : MongoDbSettings
7272
{
7373
public const string JournalConfigPath = "akka.persistence.journal.mongodb";
7474

75-
public string MetadataCollection { get; private set; }
75+
public string MetadataCollection { get; }
76+
77+
public int? ReadBatchSize { get; }
7678

7779
public MongoDbJournalSettings(Config config) : base(config)
7880
{
@@ -81,6 +83,9 @@ public MongoDbJournalSettings(Config config) : base(config)
8183
"MongoDb journal settings cannot be initialized, because required HOCON section couldn't been found");
8284

8385
MetadataCollection = config.GetString("metadata-collection");
86+
var readBatchSize = config.GetString("read-batch-size")?.ToLowerInvariant();
87+
if(!string.IsNullOrWhiteSpace(readBatchSize) && readBatchSize != "off" && readBatchSize != "false")
88+
ReadBatchSize = int.Parse(readBatchSize);
8489
}
8590
}
8691

src/Akka.Persistence.MongoDb/reference.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212

1313
use-read-transaction = on
1414

15+
# Set the batch size for read operations.
16+
# If set to off or false, this will use the default MongoDb behavior where all queries
17+
# will have a batch size of 101 on the first page read and then will try to read
18+
# as many documents as possible that fits the 16 MeBi limit.
19+
read-batch-size = off
20+
1521
# should corresponding journal table's indexes be initialized automatically
1622
auto-initialize = on
1723

0 commit comments

Comments
 (0)