Skip to content

Commit b961b81

Browse files
committed
CSHARP-3815: Make SemaphoreSlimSignalable.Signal end SemaphoreSlimSignalable.WaitSignaledAsync immediately
1 parent dede5e2 commit b961b81

File tree

4 files changed

+150
-25
lines changed

4 files changed

+150
-25
lines changed

src/MongoDB.Driver.Core/Core/Misc/SemaphoreSlimSignalable.cs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public void Dispose()
5858

5959
// private fields
6060
private CancellationTokenSource _signalCancellationTokenSource;
61-
private bool _isCancellationScheduled;
6261

6362
private readonly SemaphoreSlim _semaphore;
6463
private readonly object _syncRoot;
@@ -72,36 +71,27 @@ public SemaphoreSlimSignalable(int initialCount)
7271
_syncRoot = new object();
7372

7473
_signalCancellationTokenSource = new CancellationTokenSource();
75-
_isCancellationScheduled = false;
7674
}
7775

7876
public int Count => _semaphore.CurrentCount;
7977

8078
public void Signal()
8179
{
82-
if (!_isCancellationScheduled)
80+
lock (_syncRoot)
8381
{
84-
lock (_syncRoot)
85-
{
86-
if (!_isCancellationScheduled)
87-
{
88-
_signalCancellationTokenSource.CancelAfter(0);
89-
_isCancellationScheduled = true;
90-
}
91-
}
82+
_signalCancellationTokenSource.Cancel();
9283
}
9384
}
9485

9586
public void Reset()
9687
{
97-
if (_isCancellationScheduled)
88+
if (_signalCancellationTokenSource.IsCancellationRequested)
9889
{
9990
lock (_syncRoot)
10091
{
101-
if (_isCancellationScheduled)
92+
if (_signalCancellationTokenSource.IsCancellationRequested)
10293
{
10394
_signalCancellationTokenSource = new CancellationTokenSource();
104-
_isCancellationScheduled = false;
10595
}
10696
}
10797
}
@@ -146,7 +136,7 @@ public SemaphoreWaitResult WaitSignaled(TimeSpan timeout, CancellationToken canc
146136

147137
public async Task<SemaphoreWaitResult> WaitSignaledAsync(TimeSpan timeout, CancellationToken cancellationToken)
148138
{
149-
var (tokemSourceLinked, signalTokenSource, signaled) = GetLinkedTokenAndCheckForSignaled(cancellationToken);
139+
var (tokenSourceLinked, signalTokenSource, signaled) = GetLinkedTokenAndCheckForSignaled(cancellationToken);
150140

151141
if (signaled)
152142
{
@@ -155,13 +145,17 @@ public async Task<SemaphoreWaitResult> WaitSignaledAsync(TimeSpan timeout, Cance
155145

156146
try
157147
{
158-
var entered = await _semaphore.WaitAsync(timeout, tokemSourceLinked.Token).ConfigureAwait(false);
148+
var entered = await _semaphore.WaitAsync(timeout, tokenSourceLinked.Token).ConfigureAwait(false);
149+
159150
return entered ? SemaphoreWaitResult.Entered : SemaphoreWaitResult.TimedOut;
160151
}
161152
catch (OperationCanceledException)
162153
{
163154
if (IsSignaled(signalTokenSource.Token, cancellationToken))
164155
{
156+
// Request task rescheduling, to avoid resuming execution on Signal thread
157+
await TaskExtensions.YieldNoContext();
158+
165159
return SemaphoreWaitResult.Signaled;
166160
}
167161

src/MongoDB.Driver.Core/Core/Misc/TaskExtensions.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,48 @@
1313
* limitations under the License.
1414
*/
1515

16+
using System;
17+
using System.Runtime.CompilerServices;
1618
using System.Threading.Tasks;
1719

1820
namespace MongoDB.Driver.Core.Misc
1921
{
2022
internal static class TaskExtensions
2123
{
24+
internal struct YieldNoContextAwaitable
25+
{
26+
public YieldNoContextAwaiter GetAwaiter() { return new YieldNoContextAwaiter(); }
27+
28+
public struct YieldNoContextAwaiter : ICriticalNotifyCompletion
29+
{
30+
/// <summary>Gets whether a yield is not required.</summary>
31+
/// <remarks>This property is intended for compiler user rather than use directly in code.</remarks>
32+
public bool IsCompleted { get { return false; } } // yielding is always required for YieldNoContextAwaiter, hence false
33+
34+
public void OnCompleted(Action continuation)
35+
{
36+
Task.Factory.StartNew(continuation, default, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
37+
}
38+
39+
public void UnsafeOnCompleted(Action continuation)
40+
{
41+
Task.Factory.StartNew(continuation, default, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
42+
}
43+
44+
public void GetResult()
45+
{
46+
// no op
47+
}
48+
}
49+
}
50+
2251
public static void IgnoreExceptions(this Task task)
2352
{
2453
task.ContinueWith(t => { var ignored = t.Exception; },
2554
TaskContinuationOptions.OnlyOnFaulted |
2655
TaskContinuationOptions.ExecuteSynchronously);
2756
}
57+
58+
public static YieldNoContextAwaitable YieldNoContext() => new YieldNoContextAwaitable();
2859
}
2960
}

tests/MongoDB.Driver.Core.Tests/Core/ConnectionPools/ExclusiveConnectionPoolTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ public void AcquireConnection_should_throw_a_TimeoutException_when_all_connectio
589589

590590
[Theory]
591591
[ParameterAttributeData]
592-
public void AquireConnection_should_timeout_when_non_sufficient_reused_connections(
592+
public void AcquireConnection_should_timeout_when_non_sufficient_reused_connections(
593593
[Values(true, false)] bool async,
594594
[Values(1, 10, null)] int? maxConnectingOptional)
595595
{
@@ -755,7 +755,7 @@ public void AcquiredConnection_should_not_throw_exceptions_when_disposed_after_t
755755

756756
[Theory]
757757
[ParameterAttributeData]
758-
public void AquireConnection_should_timeout_when_no_sufficient_reused_connections(
758+
public void AcquireConnection_should_timeout_when_no_sufficient_reused_connections(
759759
[Values(true, false)]
760760
bool async)
761761
{
@@ -880,7 +880,7 @@ public void AquireConnection_should_timeout_when_no_sufficient_reused_connection
880880

881881
[Theory]
882882
[ParameterAttributeData]
883-
public void Aquire_and_release_connection_stress_test(
883+
public void Acquire_and_release_connection_stress_test(
884884
[RandomSeed(new[] { 0 })] int seed,
885885
[Values(2, 10, 30)] int threadsCount,
886886
[Values(true, false, null)] bool? asyncOrRandom,

tests/MongoDB.Driver.Core.Tests/Core/Misc/SemaphoreSlimSignalableTests.cs

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class SemaphoreSlimSignalableTests
2828
{
2929
[Theory]
3030
[ParameterAttributeData]
31-
public void SemaphoreSlimSignalable_constructor_should_check_arguments([Values(-2, -1, 1025)]int count)
31+
public void Constructor_should_check_arguments([Values(-2, -1, 1025)] int count)
3232
{
3333
var exception = Record.Exception(() => new SemaphoreSlimSignalable(count));
3434

@@ -39,7 +39,48 @@ public void SemaphoreSlimSignalable_constructor_should_check_arguments([Values(-
3939

4040
[Theory]
4141
[ParameterAttributeData]
42-
public async Task SemaphoreSlimSignalable_wait_should_enter(
42+
public async Task Reset_should_clear_signal(
43+
[Values(true, false)] bool async)
44+
{
45+
const int threadsCount = 4;
46+
var semaphore = new SemaphoreSlimSignalable(0);
47+
48+
semaphore.Signal();
49+
semaphore.Reset();
50+
51+
var waitTasks = WaitAsync(semaphore, async, true, threadsCount, TimeSpan.FromSeconds(5));
52+
53+
for (int i = 0; i < threadsCount; i++)
54+
{
55+
semaphore.Release();
56+
}
57+
58+
var results = await waitTasks;
59+
60+
Assert(results, SemaphoreSlimSignalable.SemaphoreWaitResult.Entered);
61+
}
62+
63+
[Theory]
64+
[ParameterAttributeData]
65+
public async Task Reset_should_not_reset_non_signaled(
66+
[Values(true, false)] bool async)
67+
{
68+
const int threadsCount = 4;
69+
var semaphore = new SemaphoreSlimSignalable(0);
70+
71+
var waitTasks = WaitAsync(semaphore, async, true, threadsCount, TimeSpan.FromSeconds(5));
72+
73+
semaphore.Reset();
74+
semaphore.Signal();
75+
76+
var results = await waitTasks;
77+
78+
Assert(results, SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled);
79+
}
80+
81+
[Theory]
82+
[ParameterAttributeData]
83+
public async Task Wait_should_enter(
4384
[Values(true, false)] bool async,
4485
[Values(true, false)] bool isSignaledWait,
4586
[Values(0, 1, 2)] int initialCount,
@@ -61,10 +102,10 @@ public async Task SemaphoreSlimSignalable_wait_should_enter(
61102

62103
[Theory]
63104
[ParameterAttributeData]
64-
public async Task SemaphoreSlimSignalable_wait_should_timeout(
105+
public async Task Wait_should_timeout(
65106
[Values(true, false)] bool async,
66107
[Values(true, false)] bool isSignaledWait,
67-
[Values(5, 10)]int timeoutMS)
108+
[Values(5, 10)] int timeoutMS)
68109
{
69110
const int threadsCount = 4;
70111
var semaphore = new SemaphoreSlimSignalable(0);
@@ -75,7 +116,7 @@ public async Task SemaphoreSlimSignalable_wait_should_timeout(
75116

76117
[Theory]
77118
[ParameterAttributeData]
78-
public async Task SemaphoreSlimSignalable_wait_should_signal(
119+
public async Task Wait_should_signal(
79120
[Values(true, false)] bool async,
80121
[Values(true, false)] bool signalBeforeWait)
81122
{
@@ -99,7 +140,7 @@ public async Task SemaphoreSlimSignalable_wait_should_signal(
99140

100141
[Theory]
101142
[ParameterAttributeData]
102-
public async Task SemaphoreSlimSignalable_wait_should_cancel(
143+
public async Task Wait_should_cancel(
103144
[Values(true, false)] bool async,
104145
[Values(true, false)] bool isSignaledWait)
105146
{
@@ -120,6 +161,65 @@ public async Task SemaphoreSlimSignalable_wait_should_cancel(
120161
}
121162
}
122163

164+
[Fact]
165+
public async Task WaitSignaledAsync_should_not_continue_on_signal_thread()
166+
{
167+
var cancelationTokenSource = new CancellationTokenSource();
168+
169+
var semaphore = new SemaphoreSlimSignalable(0);
170+
var waitStartedEvent = new ManualResetEventSlim(false);
171+
172+
var signalThreadTerminated = false;
173+
var signalThread = SignalThread();
174+
175+
for (int i = 0; i < 100; i++)
176+
{
177+
semaphore.Reset();
178+
waitStartedEvent.Reset();
179+
180+
var waitSignaledTask = WaitSignaledTask();
181+
await waitSignaledTask;
182+
183+
waitSignaledTask.Result.Should().NotBe(signalThread.ManagedThreadId);
184+
}
185+
186+
Volatile.Write(ref signalThreadTerminated, true);
187+
waitStartedEvent.Set();
188+
signalThread.Join(100);
189+
190+
async Task<int> WaitSignaledTask()
191+
{
192+
await Task.Yield();
193+
194+
var waitTask = semaphore.WaitSignaledAsync(Timeout.InfiniteTimeSpan, cancelationTokenSource.Token);
195+
waitStartedEvent.Set();
196+
197+
var waitResult = await waitTask;
198+
waitResult.Should().Be(SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled);
199+
200+
return Thread.CurrentThread.ManagedThreadId;
201+
}
202+
203+
Thread SignalThread()
204+
{
205+
var thread = new Thread(_ =>
206+
{
207+
while (!Volatile.Read(ref signalThreadTerminated))
208+
{
209+
waitStartedEvent.Wait();
210+
waitStartedEvent.Reset();
211+
212+
semaphore.Signal();
213+
}
214+
});
215+
216+
thread.IsBackground = true;
217+
thread.Start();
218+
219+
return thread;
220+
}
221+
}
222+
123223
// private methods
124224
private Task<SemaphoreSlimSignalable.SemaphoreWaitResult[]> WaitAsync(
125225
SemaphoreSlimSignalable semaphore,

0 commit comments

Comments
 (0)