Skip to content

Commit bc38879

Browse files
committed
CSHARP-2280: Add Watch method to IMongoClient and IMongoDatabase.
1 parent b476119 commit bc38879

21 files changed

+921
-91
lines changed

src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>
4343
private Collation _collation;
4444
private readonly CollectionNamespace _collectionNamespace;
4545
private string _comment;
46+
private readonly DatabaseNamespace _databaseNamespace;
4647
private BsonValue _hint;
4748
private TimeSpan? _maxAwaitTime;
4849
private TimeSpan? _maxTime;
@@ -53,6 +54,19 @@ public class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>
5354
private bool? _useCursor;
5455

5556
// constructors
57+
/// <summary>
58+
/// Initializes a new instance of the <see cref="AggregateOperation{TResult}"/> class.
59+
/// </summary>
60+
/// <param name="databaseNamespace">The database namespace.</param>
61+
/// <param name="pipeline">The pipeline.</param>
62+
/// <param name="resultSerializer">The result value serializer.</param>
63+
/// <param name="messageEncoderSettings">The message encoder settings.</param>
64+
public AggregateOperation(DatabaseNamespace databaseNamespace, IEnumerable<BsonDocument> pipeline, IBsonSerializer<TResult> resultSerializer, MessageEncoderSettings messageEncoderSettings)
65+
: this(pipeline, resultSerializer, messageEncoderSettings)
66+
{
67+
_databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace));
68+
}
69+
5670
/// <summary>
5771
/// Initializes a new instance of the <see cref="AggregateOperation{TResult}"/> class.
5872
/// </summary>
@@ -61,8 +75,13 @@ public class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>
6175
/// <param name="resultSerializer">The result value serializer.</param>
6276
/// <param name="messageEncoderSettings">The message encoder settings.</param>
6377
public AggregateOperation(CollectionNamespace collectionNamespace, IEnumerable<BsonDocument> pipeline, IBsonSerializer<TResult> resultSerializer, MessageEncoderSettings messageEncoderSettings)
78+
: this(pipeline, resultSerializer, messageEncoderSettings)
6479
{
6580
_collectionNamespace = Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace));
81+
}
82+
83+
private AggregateOperation(IEnumerable<BsonDocument> pipeline, IBsonSerializer<TResult> resultSerializer, MessageEncoderSettings messageEncoderSettings)
84+
{
6685
_pipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).ToList();
6786
_resultSerializer = Ensure.IsNotNull(resultSerializer, nameof(resultSerializer));
6887
_messageEncoderSettings = Ensure.IsNotNull(messageEncoderSettings, nameof(messageEncoderSettings));
@@ -125,6 +144,17 @@ public string Comment
125144
set { _comment = value; }
126145
}
127146

147+
/// <summary>
148+
/// Gets the database namespace.
149+
/// </summary>
150+
/// <value>
151+
/// The database namespace.
152+
/// </value>
153+
public DatabaseNamespace DatabaseNamespace
154+
{
155+
get { return _databaseNamespace; }
156+
}
157+
128158
/// <summary>
129159
/// Gets or sets the hint. This must either be a BsonString representing the index name or a BsonDocument representing the key pattern of the index.
130160
/// </summary>
@@ -276,7 +306,7 @@ internal BsonDocument CreateCommand(ConnectionDescription connectionDescription,
276306
var readConcern = ReadConcernHelper.GetReadConcernForCommand(session, connectionDescription, _readConcern);
277307
var command = new BsonDocument
278308
{
279-
{ "aggregate", _collectionNamespace.CollectionName },
309+
{ "aggregate", _collectionNamespace == null ? (BsonValue)1 : _collectionNamespace.CollectionName },
280310
{ "pipeline", new BsonArray(_pipeline) },
281311
{ "allowDiskUse", () => _allowDiskUse.Value, _allowDiskUse.HasValue },
282312
{ "maxTimeMS", () => MaxTimeHelper.ToMaxTimeMS(_maxTime.Value), _maxTime.HasValue },
@@ -325,7 +355,7 @@ private AsyncCursor<TResult> CreateCursorFromCursorResult(IChannelSourceHandle c
325355
var getMoreChannelSource = new ServerChannelSource(channelSource.Server, channelSource.Session.Fork());
326356
return new AsyncCursor<TResult>(
327357
getMoreChannelSource,
328-
CollectionNamespace,
358+
result.CollectionNamespace,
329359
command,
330360
result.Results,
331361
result.CursorId.GetValueOrDefault(0),
@@ -362,6 +392,7 @@ private void EnsureIsReadOnlyPipeline()
362392
private class AggregateResult
363393
{
364394
public long? CursorId;
395+
public CollectionNamespace CollectionNamespace;
365396
public TResult[] Results;
366397
}
367398

@@ -420,18 +451,28 @@ public override AggregateResult Deserialize(BsonDeserializationContext context,
420451
while (reader.ReadBsonType() != 0)
421452
{
422453
var elementName = reader.ReadName();
423-
if (elementName == "id")
424-
{
425-
result.CursorId = new Int64Serializer().Deserialize(context);
426-
}
427-
else if (elementName == "firstBatch")
454+
switch (elementName)
428455
{
429-
var arraySerializer = new ArraySerializer<TResult>(_resultSerializer);
430-
result.Results = arraySerializer.Deserialize(context);
431-
}
432-
else
433-
{
434-
reader.SkipValue();
456+
case "id":
457+
result.CursorId = new Int64Serializer().Deserialize(context);
458+
break;
459+
460+
case "ns":
461+
var ns = reader.ReadString();
462+
var separatorIndex = ns.IndexOf('.');
463+
var databaseName = ns.Substring(0, separatorIndex);
464+
var collectionName = ns.Substring(separatorIndex + 1);
465+
result.CollectionNamespace = new CollectionNamespace(new DatabaseNamespace(databaseName), collectionName);
466+
break;
467+
468+
case "firstBatch":
469+
var arraySerializer = new ArraySerializer<TResult>(_resultSerializer);
470+
result.Results = arraySerializer.Deserialize(context);
471+
break;
472+
473+
default:
474+
reader.SkipValue();
475+
break;
435476
}
436477
}
437478
reader.ReadEndDocument();

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamCursor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ internal sealed class ChangeStreamCursor<TDocument> : IAsyncCursor<TDocument>
4040
private IAsyncCursor<RawBsonDocument> _cursor;
4141
private bool _disposed;
4242
private IBsonSerializer<TDocument> _documentSerializer;
43-
private BsonDocument _resumeToken;
4443

4544
// public properties
4645
/// <inheritdoc />
@@ -90,7 +89,7 @@ public void Dispose()
9089
{
9190
if (RetryabilityHelper.IsResumableChangeStreamException(ex))
9291
{
93-
_cursor = _changeStreamOperation.Resume(_binding, _resumeToken, cancellationToken);
92+
_cursor = _changeStreamOperation.Resume(_binding, cancellationToken);
9493
hasMore = _cursor.MoveNext(cancellationToken);
9594
}
9695
else
@@ -115,7 +114,7 @@ public void Dispose()
115114
{
116115
if (RetryabilityHelper.IsResumableChangeStreamException(ex))
117116
{
118-
_cursor = await _changeStreamOperation.ResumeAsync(_binding, _resumeToken, cancellationToken).ConfigureAwait(false);
117+
_cursor = await _changeStreamOperation.ResumeAsync(_binding, cancellationToken).ConfigureAwait(false);
119118
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
120119
}
121120
else
@@ -160,7 +159,8 @@ private IEnumerable<TDocument> DeserializeDocuments(IEnumerable<RawBsonDocument>
160159

161160
if (lastRawDocument != null)
162161
{
163-
_resumeToken = lastRawDocument["_id"].DeepClone().AsBsonDocument;
162+
_changeStreamOperation.ResumeAfter = lastRawDocument["_id"].DeepClone().AsBsonDocument;
163+
_changeStreamOperation.StartAtOperationTime = null;
164164
}
165165

166166
return documents;

0 commit comments

Comments
 (0)