Skip to content

Commit fd5e3f7

Browse files
CSHARP-3761: Reimplement SDAM to use a dedicated thread. (mongodb#722)
CSHARP-3761: Reimplement SDAM to use a dedicated thread.
1 parent 169c616 commit fd5e3f7

File tree

8 files changed

+151
-67
lines changed

8 files changed

+151
-67
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/* Copyright 2010-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
20+
namespace MongoDB.Driver.Core.Misc
21+
{
22+
internal static class ThreadHelper
23+
{
24+
public static void Sleep(TimeSpan timeout, CancellationToken cancellationToken)
25+
{
26+
Task.Delay(timeout, cancellationToken).Wait(cancellationToken);
27+
}
28+
}
29+
}

src/MongoDB.Driver.Core/Core/Servers/HeartbeatDelay.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public void RequestHeartbeat()
7474
}
7575
}
7676

77+
public void Wait(CancellationToken cancellationToken)
78+
{
79+
_taskCompletionSource.Task.Wait();
80+
}
81+
7782
private void TimerCallback(object state)
7883
{
7984
_taskCompletionSource.TrySetResult(true);

src/MongoDB.Driver.Core/Core/Servers/RoundTripTimeMonitor.cs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
using System.Diagnostics;
1818
using System.Net;
1919
using System.Threading;
20-
using System.Threading.Tasks;
2120
using MongoDB.Driver.Core.Connections;
2221
using MongoDB.Driver.Core.Misc;
2322

@@ -28,7 +27,7 @@ internal interface IRoundTripTimeMonitor : IDisposable
2827
TimeSpan Average { get; }
2928
void AddSample(TimeSpan roundTripTime);
3029
void Reset();
31-
Task RunAsync();
30+
void Start();
3231
}
3332

3433
internal sealed class RoundTripTimeMonitor : IRoundTripTimeMonitor
@@ -42,6 +41,7 @@ internal sealed class RoundTripTimeMonitor : IRoundTripTimeMonitor
4241
private readonly TimeSpan _heartbeatInterval;
4342
private readonly object _lock = new object();
4443
private IConnection _roundTripTimeConnection;
44+
private Thread _roundTripTimeMonitorThread;
4545
private readonly ServerApi _serverApi;
4646
private readonly ServerId _serverId;
4747

@@ -85,7 +85,26 @@ public void Dispose()
8585
}
8686
}
8787

88-
public async Task RunAsync()
88+
public void Start()
89+
{
90+
_roundTripTimeMonitorThread = new Thread(ThreadStart) { IsBackground = true };
91+
_roundTripTimeMonitorThread.Start();
92+
93+
void ThreadStart()
94+
{
95+
try
96+
{
97+
MonitorServer();
98+
}
99+
catch (OperationCanceledException)
100+
{
101+
// ignore OperationCanceledException
102+
}
103+
}
104+
}
105+
106+
// private methods
107+
private void MonitorServer()
89108
{
90109
var helloOk = false;
91110
while (!_cancellationToken.IsCancellationRequested)
@@ -94,15 +113,15 @@ public async Task RunAsync()
94113
{
95114
if (_roundTripTimeConnection == null)
96115
{
97-
await InitializeConnectionAsync().ConfigureAwait(false); // sets _roundTripTimeConnection
116+
InitializeConnection(); // sets _roundTripTimeConnection
98117
}
99118
else
100119
{
101120
var helloCommand = HelloHelper.CreateCommand(_serverApi, helloOk);
102121
var helloProtocol = HelloHelper.CreateProtocol(helloCommand, _serverApi);
103122

104123
var stopwatch = Stopwatch.StartNew();
105-
var helloResult = await HelloHelper.GetResultAsync(_roundTripTimeConnection, helloProtocol, _cancellationToken).ConfigureAwait(false);
124+
var helloResult = HelloHelper.GetResult(_roundTripTimeConnection, helloProtocol, _cancellationToken);
106125
stopwatch.Stop();
107126
AddSample(stopwatch.Elapsed);
108127
helloOk = helloResult.HelloOk;
@@ -118,13 +137,11 @@ public async Task RunAsync()
118137
}
119138
toDispose?.Dispose();
120139
}
121-
122-
await Task.Delay(_heartbeatInterval, _cancellationToken).ConfigureAwait(false);
140+
ThreadHelper.Sleep(_heartbeatInterval, _cancellationToken);
123141
}
124142
}
125143

126-
// private methods
127-
private async Task InitializeConnectionAsync()
144+
private void InitializeConnection()
128145
{
129146
_cancellationToken.ThrowIfCancellationRequested();
130147

@@ -135,7 +152,7 @@ private async Task InitializeConnectionAsync()
135152
{
136153
// if we are cancelling, it's because the server has
137154
// been shut down and we really don't need to wait.
138-
await roundTripTimeConnection.OpenAsync(_cancellationToken).ConfigureAwait(false);
155+
roundTripTimeConnection.Open(_cancellationToken);
139156
_cancellationToken.ThrowIfCancellationRequested();
140157
}
141158
catch

src/MongoDB.Driver.Core/Core/Servers/ServerMonitor.cs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
*/
1515

1616
using System;
17+
using System.Collections.Concurrent;
1718
using System.Diagnostics;
1819
using System.Net;
1920
using System.Threading;
20-
using System.Threading.Tasks;
2121
using MongoDB.Bson;
2222
using MongoDB.Driver.Core.Connections;
2323
using MongoDB.Driver.Core.Events;
@@ -43,6 +43,8 @@ internal sealed class ServerMonitor : IServerMonitor
4343
private readonly InterlockedInt32 _state;
4444
private readonly ServerMonitorSettings _serverMonitorSettings;
4545

46+
private Thread _serverMonitorThread;
47+
4648
private readonly Action<ServerHeartbeatStartedEvent> _heartbeatStartedEventHandler;
4749
private readonly Action<ServerHeartbeatSucceededEvent> _heartbeatSucceededEventHandler;
4850
private readonly Action<ServerHeartbeatFailedEvent> _heartbeatFailedEventHandler;
@@ -144,15 +146,21 @@ public void Initialize()
144146
{
145147
if (_state.TryChange(State.Initial, State.Open))
146148
{
147-
// the call to Task.Factory.StartNew is not normally recommended or necessary
148-
// we are using it temporarily to work around a race condition in some of our tests
149-
// the issue is that we set up some mocked async methods to return results immediately synchronously
150-
// which results in the MonitorServerAsync method making more progress synchronously than the test expected
151-
// by using Task.Factory.StartNew we introduce a short delay before the MonitorServerAsync Task starts executing
152-
// the delay is whatever time it takes for the new Task to be activated and scheduled
153-
// and the delay is usually long enough for the test to get past the race condition (though not guaranteed)
154-
_ = Task.Factory.StartNew(() => _ = MonitorServerAsync().ConfigureAwait(false)).ConfigureAwait(false);
155-
_ = _roundTripTimeMonitor.RunAsync().ConfigureAwait(false);
149+
_roundTripTimeMonitor.Start();
150+
_serverMonitorThread = new Thread(ThreadStart) { IsBackground = true };
151+
_serverMonitorThread.Start();
152+
}
153+
154+
void ThreadStart()
155+
{
156+
try
157+
{
158+
MonitorServer();
159+
}
160+
catch (OperationCanceledException)
161+
{
162+
// ignore OperationCanceledException
163+
}
156164
}
157165
}
158166

@@ -187,7 +195,7 @@ private CommandWireProtocol<BsonDocument> InitializeHelloProtocol(IConnection co
187195
return HelloHelper.CreateProtocol(helloCommand, _serverApi, commandResponseHandling);
188196
}
189197

190-
private async Task<IConnection> InitializeConnectionAsync(CancellationToken cancellationToken) // called setUpConnection in spec
198+
private IConnection InitializeConnection(CancellationToken cancellationToken) // called setUpConnection in spec
191199
{
192200
var connection = _connectionFactory.CreateConnection(_serverId, _endPoint);
193201

@@ -196,7 +204,7 @@ private async Task<IConnection> InitializeConnectionAsync(CancellationToken canc
196204
{
197205
// if we are cancelling, it's because the server has
198206
// been shut down and we really don't need to wait.
199-
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
207+
connection.Open(cancellationToken);
200208
}
201209
catch
202210
{
@@ -210,7 +218,7 @@ private async Task<IConnection> InitializeConnectionAsync(CancellationToken canc
210218
return connection;
211219
}
212220

213-
private async Task MonitorServerAsync()
221+
private void MonitorServer()
214222
{
215223
var metronome = new Metronome(_serverMonitorSettings.HeartbeatInterval);
216224
var monitorCancellationToken = _monitorCancellationTokenSource.Token;
@@ -227,7 +235,7 @@ private async Task MonitorServerAsync()
227235

228236
try
229237
{
230-
await HeartbeatAsync(cachedHeartbeatCancellationToken).ConfigureAwait(false);
238+
Heartbeat(cachedHeartbeatCancellationToken);
231239
}
232240
catch (OperationCanceledException) when (cachedHeartbeatCancellationToken.IsCancellationRequested)
233241
{
@@ -244,7 +252,7 @@ private async Task MonitorServerAsync()
244252
{
245253
handler.Invoke(new SdamInformationEvent(() =>
246254
string.Format(
247-
"Unexpected exception in ServerMonitor.MonitorServerAsync: {0}",
255+
"Unexpected exception in ServerMonitor.MonitorServer: {0}",
248256
unexpectedException.ToString())));
249257
}
250258
catch
@@ -276,7 +284,7 @@ private async Task MonitorServerAsync()
276284
}
277285
_heartbeatDelay = newHeartbeatDelay;
278286
}
279-
await newHeartbeatDelay.Task.ConfigureAwait(false); // corresponds to wait method in spec
287+
newHeartbeatDelay.Wait(monitorCancellationToken); // corresponds to wait method in spec
280288
}
281289
catch
282290
{
@@ -285,7 +293,7 @@ private async Task MonitorServerAsync()
285293
}
286294
}
287295

288-
private async Task HeartbeatAsync(CancellationToken cancellationToken)
296+
private void Heartbeat(CancellationToken cancellationToken)
289297
{
290298
CommandWireProtocol<BsonDocument> helloProtocol = null;
291299
bool processAnother = true;
@@ -304,7 +312,7 @@ private async Task HeartbeatAsync(CancellationToken cancellationToken)
304312
}
305313
if (connection == null)
306314
{
307-
var initializedConnection = await InitializeConnectionAsync(cancellationToken).ConfigureAwait(false);
315+
var initializedConnection = InitializeConnection(cancellationToken);
308316
lock (_lock)
309317
{
310318
if (_state.Value == State.Disposed)
@@ -326,7 +334,7 @@ private async Task HeartbeatAsync(CancellationToken cancellationToken)
326334
{
327335
helloProtocol = InitializeHelloProtocol(connection, previousDescription?.HelloOk ?? false);
328336
}
329-
heartbeatHelloResult = await GetHelloResultAsync(connection, helloProtocol, cancellationToken).ConfigureAwait(false);
337+
heartbeatHelloResult = GetHelloResult(connection, helloProtocol, cancellationToken);
330338
}
331339
}
332340
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
@@ -420,7 +428,7 @@ bool IsNetworkError(Exception ex)
420428
}
421429
}
422430

423-
private async Task<HelloResult> GetHelloResultAsync(
431+
private HelloResult GetHelloResult(
424432
IConnection connection,
425433
CommandWireProtocol<BsonDocument> helloProtocol,
426434
CancellationToken cancellationToken)
@@ -434,7 +442,7 @@ private async Task<HelloResult> GetHelloResultAsync(
434442
try
435443
{
436444
var stopwatch = Stopwatch.StartNew();
437-
var helloResult = await HelloHelper.GetResultAsync(connection, helloProtocol, cancellationToken).ConfigureAwait(false);
445+
var helloResult = HelloHelper.GetResult(connection, helloProtocol, cancellationToken);
438446
stopwatch.Stop();
439447

440448
if (_heartbeatSucceededEventHandler != null)

0 commit comments

Comments
 (0)