Skip to content

Commit bb50fc1

Browse files
committed
map/reduce in core now returns a cursor. Added map/reduce to high-level api.
1 parent 553b4dc commit bb50fc1

17 files changed

+575
-1056
lines changed

src/MongoDB.Driver.Core.Tests/Core/Operations/MapReduceOperationBaseTests.cs

Lines changed: 58 additions & 52 deletions
Large diffs are not rendered by default.

src/MongoDB.Driver.Core.Tests/Core/Operations/MapReduceOperationTests.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Collections.Generic;
1718
using System.Reflection;
1819
using System.Threading;
1920
using System.Threading.Tasks;
@@ -30,36 +31,34 @@ public class MapReduceOperationTests : OperationTestBase
3031
{
3132
// fields
3233
private readonly BsonJavaScript _mapFunction = "map";
33-
private readonly BsonDocument _query = new BsonDocument("query", 1);
3434
private readonly BsonJavaScript _reduceFunction = "reduce";
3535
private readonly IBsonSerializer<BsonDocument> _resultSerializer = BsonDocumentSerializer.Instance;
3636

3737
// test methods
3838
[Test]
3939
public void constructor_should_initialize_instance()
4040
{
41-
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _query, _resultSerializer, _messageEncoderSettings);
41+
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _resultSerializer, _messageEncoderSettings);
4242

4343
subject.CollectionNamespace.Should().BeSameAs(_collectionNamespace);
4444
subject.MapFunction.Should().BeSameAs(_mapFunction);
4545
subject.MessageEncoderSettings.Should().BeSameAs(_messageEncoderSettings);
46-
subject.Filter.Should().BeSameAs(_query);
4746
subject.ReduceFunction.Should().BeSameAs(_reduceFunction);
4847
subject.ResultSerializer.Should().BeSameAs(_resultSerializer);
4948
}
5049

5150
[Test]
5251
public void constructor_should_throw_when_resultSerializer_is_null()
5352
{
54-
Action action = () => new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _query, null, _messageEncoderSettings);
53+
Action action = () => new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, null, _messageEncoderSettings);
5554

5655
action.ShouldThrow<ArgumentNullException>().And.ParamName.Should().Be("resultSerializer");
5756
}
5857

5958
[Test]
6059
public void CreateOutputOptions_should_return_expected_result()
6160
{
62-
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _query, _resultSerializer, _messageEncoderSettings);
61+
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _resultSerializer, _messageEncoderSettings);
6362
var subjectReflector = new Reflector(subject);
6463
var expectedResult = new BsonDocument("inline", 1);
6564

@@ -77,33 +76,33 @@ public async Task ExecuteAsync_should_return_expected_results()
7776
var query = new BsonDocument();
7877
var mapFunction = "function() { emit(this.x, this.v); }";
7978
var reduceFunction = "function(key, values) { var sum = 0; for (var i = 0; i < values.length; i++) { sum += values[i]; }; return sum; }";
80-
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, mapFunction, reduceFunction, query, _resultSerializer, _messageEncoderSettings);
81-
BsonValue expectedResults = new BsonArray
79+
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, mapFunction, reduceFunction, _resultSerializer, _messageEncoderSettings);
80+
var expectedResults = new List<BsonDocument>
8281
{
8382
new BsonDocument { {"_id", 1 }, { "value", 3 } },
8483
new BsonDocument { {"_id", 2 }, { "value", 4 } },
8584
};
8685

87-
var response = await ExecuteOperationAsync(subject);
88-
var results = response["results"];
86+
var cursor = await ExecuteOperationAsync(subject);
87+
var results = await cursor.ToListAsync();
8988

90-
results.Should().Be(expectedResults);
89+
results.Should().Equal(expectedResults);
9190
}
9291

9392
[Test]
9493
public void ExecuteAsync_should_throw_when_binding_is_null()
9594
{
96-
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _query, _resultSerializer, _messageEncoderSettings);
95+
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _resultSerializer, _messageEncoderSettings);
9796

98-
Action action = () => subject.ExecuteAsync(null, CancellationToken.None);
97+
Func<Task> act = () => subject.ExecuteAsync(null, CancellationToken.None);
9998

100-
action.ShouldThrow<ArgumentNullException>().And.ParamName.Should().Be("binding");
99+
act.ShouldThrow<ArgumentNullException>().And.ParamName.Should().Be("binding");
101100
}
102101

103102
[Test]
104103
public void ResultSerializer_should_get_value()
105104
{
106-
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _query, _resultSerializer, _messageEncoderSettings);
105+
var subject = new MapReduceOperation<BsonDocument>(_collectionNamespace, _mapFunction, _reduceFunction, _resultSerializer, _messageEncoderSettings);
107106

108107
var result = subject.ResultSerializer;
109108

src/MongoDB.Driver.Core.Tests/Core/Operations/MapReduceOutputToCollectionOperationTests.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,29 +50,28 @@ public override void TestFixtureSetUp()
5050
[Test]
5151
public void constructor_should_initialize_instance()
5252
{
53-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
53+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
5454

5555
subject.CollectionNamespace.Should().BeSameAs(_collectionNamespace);
5656
subject.OutputCollectionNamespace.Should().BeSameAs(_outputCollectionNamespace);
5757
subject.MapFunction.Should().BeSameAs(_mapFunction);
5858
subject.MessageEncoderSettings.Should().BeSameAs(_messageEncoderSettings);
59-
subject.Filter.Should().BeSameAs(_query);
6059
subject.ReduceFunction.Should().BeSameAs(_reduceFunction);
6160
subject.OutputMode.Should().Be(MapReduceOutputMode.Replace);
6261
}
6362

6463
[Test]
6564
public void constructor_should_throw_when_outputCollectionNamespace_is_null()
6665
{
67-
Action action = () => new MapReduceOutputToCollectionOperation(_collectionNamespace, null, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
66+
Action action = () => new MapReduceOutputToCollectionOperation(_collectionNamespace, null, _mapFunction, _reduceFunction, _messageEncoderSettings);
6867

6968
action.ShouldThrow<ArgumentNullException>().And.ParamName.Should().Be("outputCollectionNamespace");
7069
}
7170

7271
[Test]
7372
public void CreateOutputOptions_should_return_expected_result()
7473
{
75-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
74+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
7675
var subjectReflector = new Reflector(subject);
7776
var expectedResult = new BsonDocument
7877
{
@@ -88,7 +87,7 @@ public void CreateOutputOptions_should_return_expected_result()
8887
[Test]
8988
public void CreateOutputOptions_should_return_expected_result_when_ShardedOutput_is_provided()
9089
{
91-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings)
90+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings)
9291
{
9392
ShardedOutput = true
9493
};
@@ -108,7 +107,7 @@ public void CreateOutputOptions_should_return_expected_result_when_ShardedOutput
108107
[Test]
109108
public void CreateOutputOptions_should_return_expected_result_when_NonAtomicOutput_is_provided()
110109
{
111-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings)
110+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings)
112111
{
113112
NonAtomicOutput = true
114113
};
@@ -131,10 +130,9 @@ public async Task ExecuteAsync_should_return_expected_result()
131130
{
132131
await EnsureTestDataAsync();
133132

134-
var query = new BsonDocument();
135133
var mapFunction = "function() { emit(this.x, this.v); }";
136134
var reduceFunction = "function(key, values) { var sum = 0; for (var i = 0; i < values.length; i++) { sum += values[i]; }; return sum; }";
137-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, mapFunction, reduceFunction, query, _messageEncoderSettings);
135+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, mapFunction, reduceFunction, _messageEncoderSettings);
138136
var expectedDocuments = new BsonDocument[]
139137
{
140138
new BsonDocument { {"_id", 1 }, { "value", 3 } },
@@ -152,7 +150,7 @@ public async Task ExecuteAsync_should_return_expected_result()
152150
[Test]
153151
public void ExecuteAsync_should_throw_when_binding_is_null()
154152
{
155-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
153+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
156154

157155
Action action = () => subject.ExecuteAsync(null, CancellationToken.None);
158156

@@ -162,7 +160,7 @@ public void ExecuteAsync_should_throw_when_binding_is_null()
162160
[Test]
163161
public void NonAtomicOutput_should_get_and_set_value()
164162
{
165-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
163+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
166164
var value = true;
167165

168166
subject.NonAtomicOutput = value;
@@ -174,7 +172,7 @@ public void NonAtomicOutput_should_get_and_set_value()
174172
[Test]
175173
public void OutputCollectionNamespace_should_get__value()
176174
{
177-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
175+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
178176

179177
var result = subject.OutputCollectionNamespace;
180178

@@ -184,7 +182,7 @@ public void OutputCollectionNamespace_should_get__value()
184182
[Test]
185183
public void OutputMode_should_get_and_set_value()
186184
{
187-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
185+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
188186
var value = MapReduceOutputMode.Merge;
189187

190188
subject.OutputMode = value;
@@ -196,7 +194,7 @@ public void OutputMode_should_get_and_set_value()
196194
[Test]
197195
public void ShardedOutput_should_get_and_set_value()
198196
{
199-
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _query, _messageEncoderSettings);
197+
var subject = new MapReduceOutputToCollectionOperation(_collectionNamespace, _outputCollectionNamespace, _mapFunction, _reduceFunction, _messageEncoderSettings);
200198
var value = true;
201199

202200
subject.ShardedOutput = value;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/* Copyright 2013-2014 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Generic;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
using MongoDB.Bson;
21+
using MongoDB.Bson.Serialization;
22+
using MongoDB.Bson.Serialization.Serializers;
23+
using MongoDB.Driver.Core.Bindings;
24+
using MongoDB.Driver.Core.Misc;
25+
using MongoDB.Driver.Core.Operations;
26+
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
27+
28+
namespace MongoDB.Driver.Operations
29+
{
30+
/// <summary>
31+
/// Represents a map reduce operation.
32+
/// </summary>
33+
public class MapReduceLegacyOperation : MapReduceOperationBase, IReadOperation<BsonDocument>
34+
{
35+
// constructors
36+
/// <summary>
37+
/// Initializes a new instance of the <see cref="MapReduceLegacyOperation"/> class.
38+
/// </summary>
39+
/// <param name="collectionNamespace">The collection namespace.</param>
40+
/// <param name="mapFunction">The map function.</param>
41+
/// <param name="reduceFunction">The reduce function.</param>
42+
/// <param name="messageEncoderSettings">The message encoder settings.</param>
43+
public MapReduceLegacyOperation(CollectionNamespace collectionNamespace, BsonJavaScript mapFunction, BsonJavaScript reduceFunction, MessageEncoderSettings messageEncoderSettings)
44+
: base(
45+
collectionNamespace,
46+
mapFunction,
47+
reduceFunction,
48+
messageEncoderSettings)
49+
{
50+
}
51+
52+
// methods
53+
/// <inheritdoc/>
54+
protected override BsonDocument CreateOutputOptions()
55+
{
56+
return new BsonDocument("inline", 1);
57+
}
58+
59+
/// <inheritdoc/>
60+
public Task<BsonDocument> ExecuteAsync(IReadBinding binding, CancellationToken cancellationToken)
61+
{
62+
Ensure.IsNotNull(binding, "binding");
63+
var command = CreateCommand();
64+
var operation = new ReadCommandOperation<BsonDocument>(CollectionNamespace.DatabaseNamespace, command, BsonDocumentSerializer.Instance, MessageEncoderSettings);
65+
return operation.ExecuteAsync(binding, cancellationToken);
66+
}
67+
}
68+
}

src/MongoDB.Driver.Core/Core/Operations/MapReduceOperation.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace MongoDB.Driver.Core.Operations
3030
/// Represents a map reduce operation.
3131
/// </summary>
3232
/// <typeparam name="TResult">The type of the result.</typeparam>
33-
public class MapReduceOperation<TResult> : MapReduceOperationBase, IReadOperation<TResult>
33+
public class MapReduceOperation<TResult> : MapReduceOperationBase, IReadOperation<IAsyncCursor<TResult>>
3434
{
3535
// fields
3636
private readonly IBsonSerializer<TResult> _resultSerializer;
@@ -42,15 +42,13 @@ public class MapReduceOperation<TResult> : MapReduceOperationBase, IReadOperatio
4242
/// <param name="collectionNamespace">The collection namespace.</param>
4343
/// <param name="mapFunction">The map function.</param>
4444
/// <param name="reduceFunction">The reduce function.</param>
45-
/// <param name="filter">The filter.</param>
4645
/// <param name="resultSerializer">The result serializer.</param>
4746
/// <param name="messageEncoderSettings">The message encoder settings.</param>
48-
public MapReduceOperation(CollectionNamespace collectionNamespace, BsonJavaScript mapFunction, BsonJavaScript reduceFunction, BsonDocument filter, IBsonSerializer<TResult> resultSerializer, MessageEncoderSettings messageEncoderSettings)
47+
public MapReduceOperation(CollectionNamespace collectionNamespace, BsonJavaScript mapFunction, BsonJavaScript reduceFunction, IBsonSerializer<TResult> resultSerializer, MessageEncoderSettings messageEncoderSettings)
4948
: base(
5049
collectionNamespace,
5150
mapFunction,
5251
reduceFunction,
53-
filter,
5452
messageEncoderSettings)
5553
{
5654
_resultSerializer = Ensure.IsNotNull(resultSerializer, "resultSerializer");
@@ -76,12 +74,15 @@ protected override BsonDocument CreateOutputOptions()
7674
}
7775

7876
/// <inheritdoc/>
79-
public Task<TResult> ExecuteAsync(IReadBinding binding, CancellationToken cancellationToken)
77+
public async Task<IAsyncCursor<TResult>> ExecuteAsync(IReadBinding binding, CancellationToken cancellationToken)
8078
{
8179
Ensure.IsNotNull(binding, "binding");
8280
var command = CreateCommand();
83-
var operation = new ReadCommandOperation<TResult>(CollectionNamespace.DatabaseNamespace, command, _resultSerializer, MessageEncoderSettings);
84-
return operation.ExecuteAsync(binding, cancellationToken);
81+
var resultArraySerializer = new ArraySerializer<TResult>(_resultSerializer);
82+
var resultSerializer = new ElementDeserializer<TResult[]>("results", resultArraySerializer);
83+
var operation = new ReadCommandOperation<TResult[]>(CollectionNamespace.DatabaseNamespace, command, resultSerializer, MessageEncoderSettings);
84+
var result = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
85+
return new SingleBatchAsyncCursor<TResult>(result);
8586
}
8687
}
8788
}

src/MongoDB.Driver.Core/Core/Operations/MapReduceOperationBase.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public abstract class MapReduceOperationBase
3030
{
3131
// fields
3232
private readonly CollectionNamespace _collectionNamespace;
33-
private readonly BsonDocument _filter;
33+
private BsonDocument _filter;
3434
private BsonJavaScript _finalizeFunction;
3535
private bool? _javaScriptMode;
3636
private long? _limit;
@@ -49,14 +49,12 @@ public abstract class MapReduceOperationBase
4949
/// <param name="collectionNamespace">The collection namespace.</param>
5050
/// <param name="mapFunction">The map function.</param>
5151
/// <param name="reduceFunction">The reduce function.</param>
52-
/// <param name="filter">The filter.</param>
5352
/// <param name="messageEncoderSettings">The message encoder settings.</param>
54-
protected MapReduceOperationBase(CollectionNamespace collectionNamespace, BsonJavaScript mapFunction, BsonJavaScript reduceFunction, BsonDocument filter, MessageEncoderSettings messageEncoderSettings)
53+
protected MapReduceOperationBase(CollectionNamespace collectionNamespace, BsonJavaScript mapFunction, BsonJavaScript reduceFunction, MessageEncoderSettings messageEncoderSettings)
5554
{
5655
_collectionNamespace = Ensure.IsNotNull(collectionNamespace, "collectionNamespace");
5756
_mapFunction = Ensure.IsNotNull(mapFunction, "mapFunction");
5857
_reduceFunction = Ensure.IsNotNull(reduceFunction, "reduceFunction");
59-
_filter = Ensure.IsNotNull(filter, "filter");
6058
_messageEncoderSettings = Ensure.IsNotNull(messageEncoderSettings, "messageEncoderSettings");
6159
}
6260

@@ -73,14 +71,15 @@ public CollectionNamespace CollectionNamespace
7371
}
7472

7573
/// <summary>
76-
/// Gets the filter.
74+
/// Gets or sets the filter.
7775
/// </summary>
7876
/// <value>
7977
/// The filter.
8078
/// </value>
8179
public BsonDocument Filter
8280
{
8381
get { return _filter; }
82+
set { _filter = value; }
8483
}
8584

8685
/// <summary>

0 commit comments

Comments
 (0)