Skip to content

Commit 59a1268

Browse files
committed
CSHARP-3305: Rate limit new connection creations (maxConnecting)
1 parent a8b52c0 commit 59a1268

24 files changed

+2166
-687
lines changed

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.Helpers.cs

Lines changed: 719 additions & 0 deletions
Large diffs are not rendered by default.

src/MongoDB.Driver.Core/Core/ConnectionPools/ExclusiveConnectionPool.cs

Lines changed: 24 additions & 502 deletions
Large diffs are not rendered by default.
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/* Copyright 2021-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
20+
namespace MongoDB.Driver.Core.Misc
21+
{
22+
internal sealed class SemaphoreSlimSignalable : IDisposable
23+
{
24+
public enum SemaphoreWaitResult
25+
{
26+
None,
27+
Signaled,
28+
TimedOut,
29+
Entered
30+
}
31+
32+
// private fields
33+
private CancellationTokenSource _signalCancellationTokenSource;
34+
35+
private readonly SemaphoreSlim _semaphore;
36+
private readonly object _syncRoot;
37+
38+
public SemaphoreSlimSignalable(int initialCount)
39+
{
40+
// reasonable upper bound for initialCount to ensure overall correctness
41+
Ensure.IsBetween(initialCount, 0, 1024, nameof(initialCount));
42+
43+
_semaphore = new SemaphoreSlim(initialCount);
44+
_syncRoot = new object();
45+
46+
_signalCancellationTokenSource = new CancellationTokenSource();
47+
}
48+
49+
public int Count => _semaphore.CurrentCount;
50+
51+
public void Signal()
52+
{
53+
if (!_signalCancellationTokenSource.IsCancellationRequested)
54+
{
55+
lock (_syncRoot)
56+
{
57+
_signalCancellationTokenSource.Cancel();
58+
}
59+
}
60+
}
61+
62+
public void Reset()
63+
{
64+
if (_signalCancellationTokenSource.IsCancellationRequested)
65+
{
66+
lock (_syncRoot)
67+
{
68+
if (_signalCancellationTokenSource.IsCancellationRequested)
69+
{
70+
_signalCancellationTokenSource = new CancellationTokenSource();
71+
}
72+
}
73+
}
74+
}
75+
76+
public SemaphoreWaitResult Wait(TimeSpan timeout, CancellationToken cancellationToken)
77+
{
78+
var entered = _semaphore.Wait(timeout, cancellationToken);
79+
return entered ? SemaphoreWaitResult.Entered : SemaphoreWaitResult.TimedOut;
80+
}
81+
82+
public async Task<SemaphoreWaitResult> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
83+
{
84+
var entered = await _semaphore.WaitAsync(timeout, cancellationToken).ConfigureAwait(false);
85+
return entered ? SemaphoreWaitResult.Entered : SemaphoreWaitResult.TimedOut;
86+
}
87+
88+
public SemaphoreWaitResult WaitSignaled(TimeSpan timeout, CancellationToken cancellationToken)
89+
{
90+
var (tokenSourceLinked, signalTokenSource, signaled) = GetLinkedTokenAndCheckForSignaled(cancellationToken);
91+
92+
if (signaled)
93+
{
94+
return SemaphoreWaitResult.Signaled;
95+
}
96+
97+
try
98+
{
99+
var entered = _semaphore.Wait(timeout, tokenSourceLinked.Token);
100+
return entered ? SemaphoreWaitResult.Entered : SemaphoreWaitResult.TimedOut;
101+
}
102+
catch (OperationCanceledException)
103+
{
104+
if (IsSignaled(signalTokenSource.Token, cancellationToken))
105+
{
106+
return SemaphoreWaitResult.Signaled;
107+
}
108+
109+
throw;
110+
}
111+
}
112+
113+
public async Task<SemaphoreWaitResult> WaitSignaledAsync(TimeSpan timeout, CancellationToken cancellationToken)
114+
{
115+
var (tokemSourceLinked, signalTokenSource, signaled) = GetLinkedTokenAndCheckForSignaled(cancellationToken);
116+
117+
if (signaled)
118+
{
119+
return SemaphoreWaitResult.Signaled;
120+
}
121+
122+
try
123+
{
124+
var entered = await _semaphore.WaitAsync(timeout, tokemSourceLinked.Token).ConfigureAwait(false);
125+
return entered ? SemaphoreWaitResult.Entered : SemaphoreWaitResult.TimedOut;
126+
}
127+
catch (OperationCanceledException)
128+
{
129+
if (IsSignaled(signalTokenSource.Token, cancellationToken))
130+
{
131+
return SemaphoreWaitResult.Signaled;
132+
}
133+
134+
throw;
135+
}
136+
}
137+
138+
public void Release()
139+
{
140+
_semaphore.Release();
141+
}
142+
143+
public void Dispose()
144+
{
145+
_semaphore.Dispose();
146+
}
147+
148+
private (CancellationTokenSource TokenSourceLinked, CancellationTokenSource SignalTokenSource, bool Signaled) GetLinkedTokenAndCheckForSignaled(CancellationToken cancellationToken)
149+
{
150+
var signalTokenSource = _signalCancellationTokenSource;
151+
152+
if (IsSignaled(signalTokenSource.Token, cancellationToken))
153+
{
154+
return (default, default, true);
155+
}
156+
157+
var tokenSourceLinked = CancellationTokenSource.CreateLinkedTokenSource(
158+
signalTokenSource.Token,
159+
cancellationToken);
160+
161+
return (tokenSourceLinked, signalTokenSource, false);
162+
}
163+
164+
#pragma warning disable CA1068 // CancellationToken parameters must come last
165+
private bool IsSignaled(CancellationToken signalToken, CancellationToken cancellationToken)
166+
#pragma warning restore CA1068 // CancellationToken parameters must come last
167+
{
168+
cancellationToken.ThrowIfCancellationRequested();
169+
170+
return signalToken.IsCancellationRequested;
171+
}
172+
}
173+
}

src/MongoDB.Driver.Core/Core/Servers/Server.cs

Lines changed: 33 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal sealed class Server : IClusterableServer
6060
private readonly ClusterConnectionMode _clusterConnectionMode;
6161
private readonly ConnectionModeSwitch _connectionModeSwitch;
6262
#pragma warning restore CS0618 // Type or member is obsolete
63-
private IConnectionPool _connectionPool;
63+
private readonly IConnectionPool _connectionPool;
6464
private readonly bool? _directConnection;
6565
private ServerDescription _currentDescription;
6666
private readonly EndPoint _endPoint;
@@ -165,65 +165,37 @@ public void Dispose()
165165
public IChannelHandle GetChannel(CancellationToken cancellationToken)
166166
{
167167
ThrowIfNotOpen();
168-
IConnectionHandle connection = null;
169168

170169
try
171170
{
172171
Interlocked.Increment(ref _outstandingOperationsCount);
173-
connection = _connectionPool.AcquireConnection(cancellationToken);
174-
175-
// ignoring the user's cancellation token here because we don't
176-
// want to throw this connection away simply because the user
177-
// wanted to cancel their operation. It will be better for the
178-
// collective to complete opening the connection than the throw
179-
// it away.
180-
connection.Open(CancellationToken.None); // This results in the initial isMaster being sent
172+
var connection = _connectionPool.AcquireConnection(cancellationToken);
181173
return new ServerChannel(this, connection);
182174
}
183175
catch (Exception ex)
184176
{
185177
Interlocked.Decrement(ref _outstandingOperationsCount);
186178

187-
if (connection != null)
188-
{
189-
HandleBeforeHandshakeCompletesException(connection, ex);
190-
191-
connection.Dispose();
192-
}
193-
179+
HandleBeforeHandshakeCompletesException(ex);
194180
throw;
195181
}
196182
}
197183

198184
public async Task<IChannelHandle> GetChannelAsync(CancellationToken cancellationToken)
199185
{
200186
ThrowIfNotOpen();
201-
IConnectionHandle connection = null;
202187

203188
try
204189
{
205190
Interlocked.Increment(ref _outstandingOperationsCount);
206-
connection = await _connectionPool.AcquireConnectionAsync(cancellationToken).ConfigureAwait(false);
207-
208-
// ignoring the user's cancellation token here because we don't
209-
// want to throw this connection away simply because the user
210-
// wanted to cancel their operation. It will be better for the
211-
// collective to complete opening the connection than the throw
212-
// it away.
213-
await connection.OpenAsync(CancellationToken.None).ConfigureAwait(false);
191+
var connection = await _connectionPool.AcquireConnectionAsync(cancellationToken).ConfigureAwait(false);
214192
return new ServerChannel(this, connection);
215193
}
216194
catch (Exception ex)
217195
{
218196
Interlocked.Decrement(ref _outstandingOperationsCount);
219197

220-
if (connection != null)
221-
{
222-
HandleBeforeHandshakeCompletesException(connection, ex);
223-
224-
connection.Dispose();
225-
}
226-
198+
HandleBeforeHandshakeCompletesException(ex);
227199
throw;
228200
}
229201
}
@@ -336,16 +308,19 @@ private void HandleChannelException(IConnection connection, Exception ex)
336308

337309
lock (_monitor.Lock)
338310
{
339-
if (connection.Generation != _connectionPool.Generation)
311+
if (ex is MongoConnectionException mongoConnectionException)
340312
{
341-
return; // stale generation number
342-
}
313+
if (mongoConnectionException.Generation != null &&
314+
mongoConnectionException.Generation != _connectionPool.Generation)
315+
{
316+
return; // stale generation number
317+
}
343318

344-
if (ex is MongoConnectionException mongoConnectionException &&
345-
mongoConnectionException.IsNetworkException &&
346-
!mongoConnectionException.ContainsTimeoutException)
347-
{
348-
_monitor.CancelCurrentCheck();
319+
if (mongoConnectionException.IsNetworkException &&
320+
!mongoConnectionException.ContainsTimeoutException)
321+
{
322+
_monitor.CancelCurrentCheck();
323+
}
349324
}
350325

351326
var description = Description; // use Description property to access _description value safely
@@ -361,32 +336,34 @@ private void HandleChannelException(IConnection connection, Exception ex)
361336
}
362337
}
363338

364-
private void HandleBeforeHandshakeCompletesException(IConnection connection, Exception ex)
339+
private void HandleBeforeHandshakeCompletesException(Exception ex)
365340
{
366341
if (ex is MongoAuthenticationException)
367342
{
368343
_connectionPool.Clear();
369344
return;
370345
}
371346

372-
lock (_monitor.Lock)
347+
if (ex is MongoConnectionException mongoConnectionException)
373348
{
374-
if (connection.Generation != _connectionPool.Generation)
349+
lock (_monitor.Lock)
375350
{
376-
return; // stale generation number
377-
}
351+
if (mongoConnectionException.Generation != null &&
352+
mongoConnectionException.Generation != _connectionPool.Generation)
353+
{
354+
return; // stale generation number
355+
}
378356

379-
if (ex is MongoConnectionException mongoConnectionException &&
380-
mongoConnectionException.IsNetworkException &&
381-
!mongoConnectionException.ContainsTimeoutException)
382-
{
383-
_monitor.CancelCurrentCheck();
384-
}
357+
if (mongoConnectionException.IsNetworkException &&
358+
!mongoConnectionException.ContainsTimeoutException)
359+
{
360+
_monitor.CancelCurrentCheck();
361+
}
385362

386-
if (ex is MongoConnectionException connectionException &&
387-
(connectionException.IsNetworkException || connectionException.ContainsTimeoutException))
388-
{
389-
Invalidate($"ChannelException during handshake: {ex}.", clearConnectionPool: true, responseTopologyVersion: null);
363+
if (mongoConnectionException.IsNetworkException || mongoConnectionException.ContainsTimeoutException)
364+
{
365+
Invalidate($"ChannelException during handshake: {ex}.", clearConnectionPool: true, responseTopologyVersion: null);
366+
}
390367
}
391368
}
392369
}

src/MongoDB.Driver.Core/MongoConnectionException.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class MongoConnectionException : MongoException
3333
{
3434
// fields
3535
private readonly ConnectionId _connectionId;
36+
private int? _generation = null;
3637

3738
// constructors
3839
/// <summary>
@@ -137,5 +138,22 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont
137138
info.AddValue("_connectionId", _connectionId);
138139
}
139140
#endif
141+
142+
// properties
143+
// TODO temporary property for propagating exception generation to server
144+
// Will be reconsider after SDAM spec error handling adjustments
145+
internal int? Generation
146+
{
147+
get { return _generation; }
148+
set
149+
{
150+
if (_generation != null)
151+
{
152+
throw new InvalidOperationException("Generation is already set.");
153+
}
154+
155+
_generation = value;
156+
}
157+
}
140158
}
141159
}

src/MongoDB.Driver.Core/MongoDB.Driver.Core.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
<ItemGroup Condition="'$(TargetFramework)' == 'net452'">
135135
<PackageReference Include="System.Runtime.InteropServices.RuntimeInformation" Version="4.3.0" />
136136
<PackageReference Include="System.Net.Http" Version="4.3.4" />
137+
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
137138
</ItemGroup>
138139

139140
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.5'">
@@ -144,6 +145,7 @@
144145
<PackageReference Include="System.Net.Security" Version="4.3.2" />
145146
<PackageReference Include="System.Security.SecureString" Version="4.0.0" />
146147
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
148+
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
147149
</ItemGroup>
148150

149151
<ItemGroup>

0 commit comments

Comments
 (0)