Skip to content

Commit 47ecdb5

Browse files
committed
CSHARP-2634: Add support for $merge.
1 parent 62d4865 commit 47ecdb5

File tree

167 files changed

+4565
-1762
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

167 files changed

+4565
-1762
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public class Feature
3535
private static readonly Feature __aggregateGraphLookupStage = new Feature("AggregateGraphLookupStage", new SemanticVersion(3, 4, 0, "rc0"));
3636
private static readonly Feature __aggregateHint = new Feature("AggregateHint", new SemanticVersion(3, 6, 0, "rc0"));
3737
private static readonly Feature __aggregateLet = new Feature("AggregateLet", new SemanticVersion(3, 6, 0));
38-
private static readonly Feature __aggregateOut = new Feature("Aggregate", new SemanticVersion(2, 6, 0));
38+
private static readonly Feature __aggregateMerge = new Feature("AggregateMerge", new SemanticVersion(4, 2, 0));
39+
private static readonly Feature __aggregateOut = new Feature("AggregateOut", new SemanticVersion(2, 6, 0));
3940
private static readonly ArrayFiltersFeature __arrayFilters = new ArrayFiltersFeature("ArrayFilters", new SemanticVersion(3, 5, 11));
4041
private static readonly Feature __bypassDocumentValidation = new Feature("BypassDocumentValidation", new SemanticVersion(3, 2, 0));
4142
private static readonly Feature __changeStreamStage = new Feature("ChangeStreamStage", new SemanticVersion(3, 5, 11));
@@ -139,6 +140,11 @@ public class Feature
139140
/// </summary>
140141
public static Feature AggregateLet => __aggregateLet;
141142

143+
/// <summary>
144+
/// Gets the aggregate merge feature.
145+
/// </summary>
146+
public static Feature AggregateMerge => __aggregateMerge;
147+
142148
/// <summary>
143149
/// Gets the aggregate out feature.
144150
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,9 +416,9 @@ private AsyncCursor<TResult> CreateCursorFromInlineResult(BsonDocument command,
416416

417417
private void EnsureIsReadOnlyPipeline()
418418
{
419-
if (Pipeline.Any(s => s.GetElement(0).Name == "$out"))
419+
if (Pipeline.Any(s => { var n = s.GetElement(0).Name; return n == "$out" || n == "$merge"; }))
420420
{
421-
throw new ArgumentException("The pipeline for an AggregateOperation contains a $out operator. Use AggregateOutputToCollectionOperation instead.", "pipeline");
421+
throw new ArgumentException("The pipeline for an AggregateOperation contains a $out or $merge operator. Use AggregateOutputToCollectionOperation instead.", "pipeline");
422422
}
423423
}
424424

src/MongoDB.Driver.Core/Core/Operations/AggregateToCollectionOperation.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,10 @@ private WriteCommandOperation<BsonDocument> CreateOperation(ICoreSessionHandle s
282282
private void EnsureIsOutputToCollectionPipeline()
283283
{
284284
var lastStage = _pipeline.LastOrDefault();
285-
if (lastStage == null || lastStage.GetElement(0).Name != "$out")
285+
var lastStageName = lastStage?.GetElement(0).Name;
286+
if (lastStage == null || (lastStageName != "$out" && lastStageName != "$merge"))
286287
{
287-
throw new ArgumentException("The last stage of the pipeline for an AggregateOutputToCollectionOperation must have a $out operator.", "pipeline");
288+
throw new ArgumentException("The last stage of the pipeline for an AggregateOutputToCollectionOperation must have a $out or $merge operator.", "pipeline");
288289
}
289290
}
290291
}

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamOperation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ private BsonDocument CreateChangeStreamStage()
403403
{
404404
var changeStreamOptions = new BsonDocument
405405
{
406-
{ "fullDocument", ToString(_fullDocument) },
406+
{ "fullDocument", () => ToString(_fullDocument), _fullDocument != ChangeStreamFullDocumentOption.Default },
407407
{ "allChangesForCluster", true, _collectionNamespace == null && _databaseNamespace == null },
408408
{ "startAfter", _startAfter, _startAfter != null},
409409
{ "startAtOperationTime", _startAtOperationTime, _startAtOperationTime != null },

src/MongoDB.Driver.Legacy/MongoCollection.cs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,9 @@ private IEnumerable<BsonDocument> Aggregate(IClientSessionHandle session, Aggreg
144144

145145
var messageEncoderSettings = GetMessageEncoderSettings();
146146

147-
var last = args.Pipeline.LastOrDefault();
148-
if (last != null && last.GetElement(0).Name == "$out")
147+
var lastStage = args.Pipeline.LastOrDefault();
148+
var lastStageName = lastStage?.GetElement(0).Name;
149+
if (lastStage != null && (lastStageName == "$out" || lastStageName== "$merge"))
149150
{
150151
var aggregateOperation = new AggregateToCollectionOperation(_collectionNamespace, args.Pipeline, messageEncoderSettings)
151152
{
@@ -158,8 +159,39 @@ private IEnumerable<BsonDocument> Aggregate(IClientSessionHandle session, Aggreg
158159
};
159160
ExecuteWriteOperation(session, aggregateOperation);
160161

161-
var outputCollectionName = last[0].AsString;
162-
var outputCollectionNamespace = new CollectionNamespace(_collectionNamespace.DatabaseNamespace, outputCollectionName);
162+
CollectionNamespace outputCollectionNamespace;
163+
var stageName = lastStage.GetElement(0).Name;
164+
switch (stageName)
165+
{
166+
case "$out":
167+
{
168+
var outputCollectionName = lastStage[0].AsString;
169+
outputCollectionNamespace = new CollectionNamespace(_collectionNamespace.DatabaseNamespace, outputCollectionName);
170+
}
171+
break;
172+
case "$merge":
173+
{
174+
var mergeArguments = lastStage[0].AsBsonDocument;
175+
DatabaseNamespace outputDatabaseNamespace;
176+
string outputCollectionName;
177+
var into = mergeArguments["into"];
178+
if (into.IsString)
179+
{
180+
outputDatabaseNamespace = _collectionNamespace.DatabaseNamespace;
181+
outputCollectionName = into.AsString;
182+
}
183+
else
184+
{
185+
outputDatabaseNamespace = new DatabaseNamespace(into["db"].AsString);
186+
outputCollectionName = into["coll"].AsString;
187+
}
188+
outputCollectionNamespace = new CollectionNamespace(outputDatabaseNamespace, outputCollectionName);
189+
}
190+
break;
191+
default:
192+
throw new ArgumentException($"Unexpected stage name: {stageName}.");
193+
}
194+
163195
var resultSerializer = BsonDocumentSerializer.Instance;
164196
var findOperation = new FindOperation<BsonDocument>(outputCollectionNamespace, resultSerializer, messageEncoderSettings)
165197
{

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,22 @@ public override IAggregateFluent<TResult> Match(FilterDefinition<TResult> filter
156156
return WithPipeline(_pipeline.Match(filter));
157157
}
158158

159+
public override IAsyncCursor<TOutput> Merge<TOutput>(IMongoCollection<TOutput> outputCollection, MergeStageOptions<TOutput> mergeOptions = null, CancellationToken cancellationToken = default)
160+
{
161+
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
162+
mergeOptions = mergeOptions ?? new MergeStageOptions<TOutput>();
163+
var aggregate = WithPipeline(_pipeline.Merge<TInput, TResult, TOutput>(outputCollection, mergeOptions));
164+
return aggregate.ToCursor(cancellationToken);
165+
}
166+
167+
public override async Task<IAsyncCursor<TOutput>> MergeAsync<TOutput>(IMongoCollection<TOutput> outputCollection, MergeStageOptions<TOutput> mergeOptions = null, CancellationToken cancellationToken = default)
168+
{
169+
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
170+
mergeOptions = mergeOptions ?? new MergeStageOptions<TOutput>();
171+
var aggregate = WithPipeline(_pipeline.Merge<TInput, TResult, TOutput>(outputCollection, mergeOptions));
172+
return await aggregate.ToCursorAsync(cancellationToken).ConfigureAwait(false);
173+
}
174+
159175
public override IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer)
160176
{
161177
return WithPipeline(_pipeline.OfType(newResultSerializer));

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,18 @@ public virtual IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement,
146146
/// <inheritdoc />
147147
public abstract IAggregateFluent<TResult> Match(FilterDefinition<TResult> filter);
148148

149+
/// <inheritdoc />
150+
public virtual IAsyncCursor<TOutput> Merge<TOutput>(IMongoCollection<TOutput> outputCollection, MergeStageOptions<TOutput> mergeOptions = null, CancellationToken cancellationToken = default)
151+
{
152+
throw new NotImplementedException();
153+
}
154+
155+
/// <inheritdoc />
156+
public virtual Task<IAsyncCursor<TOutput>> MergeAsync<TOutput>(IMongoCollection<TOutput> outputCollection, MergeStageOptions<TOutput> mergeOptions = null, CancellationToken cancellationToken = default)
157+
{
158+
throw new NotImplementedException();
159+
}
160+
149161
/// <inheritdoc />
150162
public abstract IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer) where TNewResult : TResult;
151163

src/MongoDB.Driver/BulkWriteUpsert.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,30 @@ public int Index
6262
get { return _index; }
6363
}
6464

65+
// public methods
66+
/// <inheritdoc />
67+
public override bool Equals(object obj)
68+
{
69+
if (object.ReferenceEquals(obj, null) || obj.GetType() != typeof(BulkWriteUpsert))
70+
{
71+
return false;
72+
}
73+
74+
var other = (BulkWriteUpsert)obj;
75+
return
76+
_index == other._index &&
77+
_id.Equals(other._id);
78+
}
79+
80+
/// <inheritdoc />
81+
public override int GetHashCode()
82+
{
83+
return new Hasher()
84+
.Hash(_index)
85+
.Hash(_id)
86+
.GetHashCode();
87+
}
88+
6589
// internal static methods
6690
internal static BulkWriteUpsert FromCore(Core.Operations.BulkWriteOperationUpsert upsert)
6791
{

src/MongoDB.Driver/IAggregateFluent.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,24 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
234234
/// <returns>The fluent aggregate interface.</returns>
235235
IAggregateFluent<TResult> Match(FilterDefinition<TResult> filter);
236236

237+
/// <summary>
238+
/// Appends a merge stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
239+
/// </summary>
240+
/// <param name="outputCollection">The output collection.</param>
241+
/// <param name="mergeOptions">The merge options.</param>
242+
/// <param name="cancellationToken">The cancellation token.</param>
243+
/// <returns>A cursor.</returns>
244+
IAsyncCursor<TOutput> Merge<TOutput>(IMongoCollection<TOutput> outputCollection, MergeStageOptions<TOutput> mergeOptions = null, CancellationToken cancellationToken = default);
245+
246+
/// <summary>
247+
/// Appends a merge stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
248+
/// </summary>
249+
/// <param name="outputCollection">The output collection.</param>
250+
/// <param name="mergeOptions">The merge options.</param>
251+
/// <param name="cancellationToken">The cancellation token.</param>
252+
/// <returns>A cursor.</returns>
253+
Task<IAsyncCursor<TOutput>> MergeAsync<TOutput>(IMongoCollection<TOutput> outputCollection, MergeStageOptions<TOutput> mergeOptions = null, CancellationToken cancellationToken = default);
254+
237255
/// <summary>
238256
/// Appends a match stage to the pipeline that matches derived documents and changes the result type to the derived type.
239257
/// </summary>
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/* Copyright 2019-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Collections.Generic;
17+
using MongoDB.Bson;
18+
using MongoDB.Bson.Serialization;
19+
20+
namespace MongoDB.Driver
21+
{
22+
/// <summary>
23+
/// The behavior of $merge is a result document and an existing document in the collection
24+
/// have the same value for the specified on field(s).
25+
/// </summary>
26+
public enum MergeStageWhenMatched
27+
{
28+
/// <summary>
29+
/// Replace the existing document in the output collection with the matching results document.
30+
/// </summary>
31+
Replace,
32+
33+
/// <summary>
34+
/// Keep the existing document in the output collection.
35+
/// </summary>
36+
KeepExisting,
37+
38+
/// <summary>
39+
/// Merge the matching documents (similar to the $mergeObjects operator).
40+
/// </summary>
41+
Merge,
42+
43+
/// <summary>
44+
/// Stop and fail the aggregation. Any changes to the output collection from previous documents are not reverted.
45+
/// </summary>
46+
Fail,
47+
48+
/// <summary>
49+
/// Use an aggregation pipeline to update the document in the collection.
50+
/// </summary>
51+
Pipeline
52+
}
53+
54+
/// <summary>
55+
/// The behavior of $merge if a result document does not match an existing document in the output collection.
56+
/// </summary>
57+
public enum MergeStageWhenNotMatched
58+
{
59+
/// <summary>
60+
/// Insert the document into the output collection.
61+
/// </summary>
62+
Insert,
63+
64+
/// <summary>
65+
/// Discard the document; i.e. $merge does not insert the document into the output collection.
66+
/// </summary>
67+
Discard,
68+
69+
/// <summary>
70+
/// Stop and fail the aggregation operation. Any changes to the output collection from previous documents are not reverted.
71+
/// </summary>
72+
Fail
73+
}
74+
75+
/// <summary>
76+
/// Options for the $merge aggregation pipeline stage.
77+
/// </summary>
78+
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
79+
public class MergeStageOptions<TOutput>
80+
{
81+
// private fields
82+
private BsonDocument _letVariables;
83+
private IReadOnlyList<string> _onFieldNames;
84+
private IBsonSerializer<TOutput> _outputSerializer;
85+
private MergeStageWhenMatched? _whenMatched;
86+
private PipelineDefinition<TOutput, TOutput> _whenMatchedPipeline;
87+
private MergeStageWhenNotMatched? _whenNotMatched;
88+
89+
// public properties
90+
/// <summary>
91+
/// Specifies variables accessible for use in the WhenMatchedPipeline.
92+
/// </summary>
93+
public BsonDocument LetVariables
94+
{
95+
get => _letVariables;
96+
set => _letVariables = value;
97+
}
98+
99+
/// <summary>
100+
/// Field or fields that act as a unique identifier for a document. The identifier determines if a results
101+
/// document matches an already existing document in the output collection.
102+
/// </summary>
103+
public IReadOnlyList<string> OnFieldNames
104+
{
105+
get => _onFieldNames;
106+
set => _onFieldNames = value;
107+
}
108+
109+
/// <summary>
110+
/// The output serializer.
111+
/// </summary>
112+
public IBsonSerializer<TOutput> OutputSerializer
113+
{
114+
get => _outputSerializer;
115+
set => _outputSerializer = value;
116+
}
117+
118+
/// <summary>
119+
/// The behavior of $merge if a result document and an existing document in the collectoin have the
120+
/// same value for the specified on field(s).
121+
/// </summary>
122+
public MergeStageWhenMatched? WhenMatched
123+
{
124+
get => _whenMatched;
125+
set => _whenMatched = value;
126+
}
127+
128+
/// <summary>
129+
/// An aggregation pipeline to update the document in the collection.
130+
/// Used when WhenMatched is Pipeline.
131+
/// </summary>
132+
public PipelineDefinition<TOutput, TOutput> WhenMatchedPipeline
133+
{
134+
get => _whenMatchedPipeline;
135+
set => _whenMatchedPipeline = value;
136+
}
137+
138+
/// <summary>
139+
/// The behavior of $merge if a result document does not match an existing document in the output collection.
140+
/// </summary>
141+
public MergeStageWhenNotMatched? WhenNotMatched
142+
{
143+
get => _whenNotMatched;
144+
set => _whenNotMatched = value;
145+
}
146+
}
147+
}

src/MongoDB.Driver/MongoClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ private MessageEncoderSettings GetMessageEncoderSettings()
530530

531531
private IClientSessionHandle StartImplicitSession(bool areSessionsSupported)
532532
{
533-
var options = new ClientSessionOptions();
533+
var options = new ClientSessionOptions { CausalConsistency = false };
534534

535535
ICoreSessionHandle coreSession;
536536
#pragma warning disable 618

0 commit comments

Comments
 (0)