Skip to content

Commit 3556fc9

Browse files
committed
CSHARP-2293: Implement new count API.
1 parent 4724787 commit 3556fc9

27 files changed

+1801
-46
lines changed
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/* Copyright 2018-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Generic;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
using MongoDB.Bson;
21+
using MongoDB.Bson.Serialization.Serializers;
22+
using MongoDB.Driver.Core.Bindings;
23+
using MongoDB.Driver.Core.Misc;
24+
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
25+
26+
namespace MongoDB.Driver.Core.Operations
27+
{
28+
/// <summary>
29+
/// Represents a count documents operation.
30+
/// </summary>
31+
public class CountDocumentsOperation : IReadOperation<long>
32+
{
33+
// private fields
34+
private Collation _collation;
35+
private readonly CollectionNamespace _collectionNamespace;
36+
private BsonDocument _filter;
37+
private BsonValue _hint;
38+
private long? _limit;
39+
private TimeSpan? _maxTime;
40+
private readonly MessageEncoderSettings _messageEncoderSettings;
41+
private ReadConcern _readConcern = ReadConcern.Default;
42+
private long? _skip;
43+
44+
// constructors
45+
/// <summary>
46+
/// Initializes a new instance of the <see cref="CountOperation"/> class.
47+
/// </summary>
48+
/// <param name="collectionNamespace">The collection namespace.</param>
49+
/// <param name="messageEncoderSettings">The message encoder settings.</param>
50+
public CountDocumentsOperation(CollectionNamespace collectionNamespace, MessageEncoderSettings messageEncoderSettings)
51+
{
52+
_collectionNamespace = Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace));
53+
_messageEncoderSettings = Ensure.IsNotNull(messageEncoderSettings, nameof(messageEncoderSettings));
54+
}
55+
56+
// public properties
57+
/// <summary>
58+
/// Gets or sets the collation.
59+
/// </summary>
60+
/// <value>
61+
/// The collation.
62+
/// </value>
63+
public Collation Collation
64+
{
65+
get { return _collation; }
66+
set { _collation = value; }
67+
}
68+
/// <summary>
69+
/// Gets the collection namespace.
70+
/// </summary>
71+
/// <value>
72+
/// The collection namespace.
73+
/// </value>
74+
public CollectionNamespace CollectionNamespace
75+
{
76+
get { return _collectionNamespace; }
77+
}
78+
79+
/// <summary>
80+
/// Gets or sets the filter.
81+
/// </summary>
82+
/// <value>
83+
/// The filter.
84+
/// </value>
85+
public BsonDocument Filter
86+
{
87+
get { return _filter; }
88+
set { _filter = value; }
89+
}
90+
91+
/// <summary>
92+
/// Gets or sets the index hint.
93+
/// </summary>
94+
/// <value>
95+
/// The index hint.
96+
/// </value>
97+
public BsonValue Hint
98+
{
99+
get { return _hint; }
100+
set { _hint = value; }
101+
}
102+
103+
/// <summary>
104+
/// Gets or sets a limit on the number of matching documents to count.
105+
/// </summary>
106+
/// <value>
107+
/// A limit on the number of matching documents to count.
108+
/// </value>
109+
public long? Limit
110+
{
111+
get { return _limit; }
112+
set { _limit = value; }
113+
}
114+
115+
/// <summary>
116+
/// Gets or sets the maximum time the server should spend on this operation.
117+
/// </summary>
118+
/// <value>
119+
/// The maximum time the server should spend on this operation.
120+
/// </value>
121+
public TimeSpan? MaxTime
122+
{
123+
get { return _maxTime; }
124+
set { _maxTime = Ensure.IsNullOrInfiniteOrGreaterThanOrEqualToZero(value, nameof(value)); }
125+
}
126+
127+
/// <summary>
128+
/// Gets the message encoder settings.
129+
/// </summary>
130+
/// <value>
131+
/// The message encoder settings.
132+
/// </value>
133+
public MessageEncoderSettings MessageEncoderSettings
134+
{
135+
get { return _messageEncoderSettings; }
136+
}
137+
138+
/// <summary>
139+
/// Gets or sets the read concern.
140+
/// </summary>
141+
/// <value>
142+
/// The read concern.
143+
/// </value>
144+
public ReadConcern ReadConcern
145+
{
146+
get { return _readConcern; }
147+
set { _readConcern = Ensure.IsNotNull(value, nameof(value)); }
148+
}
149+
150+
/// <summary>
151+
/// Gets or sets the number of documents to skip before counting the remaining matching documents.
152+
/// </summary>
153+
/// <value>
154+
/// The number of documents to skip before counting the remaining matching documents.
155+
/// </value>
156+
public long? Skip
157+
{
158+
get { return _skip; }
159+
set { _skip = value; }
160+
}
161+
162+
// public methods
163+
/// <inheritdoc/>
164+
public long Execute(IReadBinding binding, CancellationToken cancellationToken)
165+
{
166+
Ensure.IsNotNull(binding, nameof(binding));
167+
using (var channelSource = binding.GetReadChannelSource(cancellationToken))
168+
using (var channel = channelSource.GetChannel(cancellationToken))
169+
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
170+
{
171+
var operation = CreateOperation();
172+
var cursor = operation.Execute(channelBinding, cancellationToken);
173+
var result = cursor.ToList(cancellationToken);
174+
return ExtractCountFromResult(result);
175+
}
176+
}
177+
178+
/// <inheritdoc/>
179+
public async Task<long> ExecuteAsync(IReadBinding binding, CancellationToken cancellationToken)
180+
{
181+
Ensure.IsNotNull(binding, nameof(binding));
182+
using (var channelSource = await binding.GetReadChannelSourceAsync(cancellationToken).ConfigureAwait(false))
183+
using (var channel = await channelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false))
184+
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
185+
{
186+
var operation = CreateOperation();
187+
var cursor = await operation.ExecuteAsync(channelBinding, cancellationToken).ConfigureAwait(false);
188+
var result = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
189+
return ExtractCountFromResult(result);
190+
}
191+
}
192+
193+
// private methods
194+
private AggregateOperation<BsonDocument> CreateOperation()
195+
{
196+
var pipeline = CreatePipeline();
197+
var operation = new AggregateOperation<BsonDocument>(_collectionNamespace, pipeline, BsonDocumentSerializer.Instance, _messageEncoderSettings)
198+
{
199+
Collation = _collation,
200+
Hint = _hint,
201+
MaxTime = _maxTime,
202+
ReadConcern = _readConcern
203+
};
204+
return operation;
205+
}
206+
207+
private List<BsonDocument> CreatePipeline()
208+
{
209+
var pipeline = new List<BsonDocument>();
210+
pipeline.Add(new BsonDocument("$match", _filter ?? new BsonDocument()));
211+
if (_skip.HasValue)
212+
{
213+
pipeline.Add(new BsonDocument("$skip", _skip.Value));
214+
}
215+
if (_limit.HasValue)
216+
{
217+
pipeline.Add(new BsonDocument("$limit", _limit.Value));
218+
}
219+
pipeline.Add(new BsonDocument("$group", new BsonDocument { { "_id", BsonNull.Value }, { "n", new BsonDocument("$sum", 1) } }));
220+
return pipeline;
221+
}
222+
223+
private long ExtractCountFromResult(List<BsonDocument> result)
224+
{
225+
switch (result.Count)
226+
{
227+
case 0:
228+
return 0;
229+
230+
case 1:
231+
return result[0]["n"].ToInt64();
232+
233+
default:
234+
throw new MongoClientException($"Expected aggregate command for CountDocuments to return 1 document, but got {result.Count}.");
235+
}
236+
}
237+
}
238+
}

src/MongoDB.Driver.Core/MongoDB.Driver.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@
135135
<Compile Include="Core\Operations\ChangeStreamCursor.cs" />
136136
<Compile Include="Core\Operations\ChangeStreamOperation.cs" />
137137
<Compile Include="Core\Clusters\ClusterClock.cs" />
138+
<Compile Include="Core\Operations\CountDocumentsOperation.cs" />
138139
<Compile Include="Core\Operations\CreateViewOperation.cs" />
139140
<Compile Include="Core\Clusters\IClusterClock.cs" />
140141
<Compile Include="Core\Operations\DelayedEvaluationWriteConcernSerializer.cs" />
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/* Copyright 2018-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using MongoDB.Driver.Core.Misc;
18+
19+
namespace MongoDB.Driver
20+
{
21+
/// <summary>
22+
/// Options for estimated document count.
23+
/// </summary>
24+
public sealed class EstimatedDocumentCountOptions
25+
{
26+
// private fields
27+
private TimeSpan? _maxTime;
28+
29+
// public properties
30+
/// <summary>
31+
/// Gets or sets the maximum time.
32+
/// </summary>
33+
public TimeSpan? MaxTime
34+
{
35+
get { return _maxTime; }
36+
set { _maxTime = Ensure.IsNullOrInfiniteOrGreaterThanOrEqualToZero(value, nameof(value)); }
37+
}
38+
}
39+
}

src/MongoDB.Driver/FilteredMongoCollectionBase.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* limitations under the License.
1414
*/
1515

16+
using System;
1617
using System.Collections.Generic;
1718
using System.Linq;
1819
using System.Threading;
@@ -117,26 +118,50 @@ protected IMongoCollection<TDocument> WrappedCollection
117118
return _wrappedCollection.BulkWriteAsync(session, CombineModelFilters(requests), options, cancellationToken);
118119
}
119120

121+
[Obsolete("Use CountDocuments or EstimatedDocumentCount instead.")]
120122
public override long Count(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
121123
{
122124
return _wrappedCollection.Count(CombineFilters(filter), options, cancellationToken);
123125
}
124126

127+
[Obsolete("Use CountDocuments or EstimatedDocumentCount instead.")]
125128
public override long Count(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
126129
{
127130
return _wrappedCollection.Count(session, CombineFilters(filter), options, cancellationToken);
128131
}
129132

133+
[Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
130134
public override Task<long> CountAsync(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
131135
{
132136
return _wrappedCollection.CountAsync(CombineFilters(filter), options, cancellationToken);
133137
}
134138

139+
[Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
135140
public override Task<long> CountAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
136141
{
137142
return _wrappedCollection.CountAsync(session, CombineFilters(filter), options, cancellationToken);
138143
}
139144

145+
public override long CountDocuments(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
146+
{
147+
return _wrappedCollection.CountDocuments(CombineFilters(filter), options, cancellationToken);
148+
}
149+
150+
public override long CountDocuments(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
151+
{
152+
return _wrappedCollection.CountDocuments(session, CombineFilters(filter), options, cancellationToken);
153+
}
154+
155+
public override Task<long> CountDocumentsAsync(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
156+
{
157+
return _wrappedCollection.CountDocumentsAsync(CombineFilters(filter), options, cancellationToken);
158+
}
159+
160+
public override Task<long> CountDocumentsAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
161+
{
162+
return _wrappedCollection.CountDocumentsAsync(session, CombineFilters(filter), options, cancellationToken);
163+
}
164+
140165
public override IAsyncCursor<TField> Distinct<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
141166
{
142167
return _wrappedCollection.Distinct(field, CombineFilters(filter), options, cancellationToken);
@@ -157,6 +182,16 @@ protected IMongoCollection<TDocument> WrappedCollection
157182
return _wrappedCollection.DistinctAsync(session, field, CombineFilters(filter), options, cancellationToken);
158183
}
159184

185+
public override long EstimatedDocumentCount(EstimatedDocumentCountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
186+
{
187+
throw new NotSupportedException("EstimatedDocumentCount is not supported for filtered collections.");
188+
}
189+
190+
public override Task<long> EstimatedDocumentCountAsync(EstimatedDocumentCountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
191+
{
192+
throw new NotSupportedException("EstimatedDocumentCountAsync is not supported for filtered collections.");
193+
}
194+
160195
public override IAsyncCursor<TProjection> FindSync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
161196
{
162197
return _wrappedCollection.FindSync(CombineFilters(filter), options, cancellationToken);

0 commit comments

Comments
 (0)