Skip to content

Commit 09b99e5

Browse files
committed
SERVER-23163 Fix signaling of worker thread in replication executor.
1 parent dbccf90 commit 09b99e5

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

src/mongo/db/repl/replication_executor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::onEvent(
274274
queue = &event->_waiters;
275275
} else {
276276
queue = &_readyQueue;
277+
_networkInterface->signalWorkAvailable();
277278
}
278279
return enqueueWork_inlock(queue, work);
279280
}
@@ -382,6 +383,7 @@ StatusWith<ReplicationExecutor::CallbackHandle> ReplicationExecutor::scheduleWor
382383
++insertBefore;
383384
_sleepersQueue.splice(insertBefore, temp, temp.begin());
384385
++_counterScheduledWorkAts;
386+
_networkInterface->signalWorkAvailable();
385387
return cbHandle;
386388
}
387389

src/mongo/db/repl/replication_executor_test.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ namespace repl {
5252
namespace {
5353

5454
using executor::NetworkInterfaceMock;
55+
using unittest::assertGet;
5556

5657
const int64_t prngSeed = 1;
5758

@@ -171,6 +172,85 @@ TEST_F(ReplicationExecutorTest, CancelBeforeRunningFutureWork) {
171172
ASSERT_EQUALS(1, executor.getDiagnosticBSON().getFieldDotted("queues.ready").Int());
172173
}
173174

175+
// Equivalent to EventChainAndWaitingTest::onGo
176+
TEST_F(ReplicationExecutorTest, ScheduleCallbackOnFutureEvent) {
177+
launchExecutorThread();
178+
getNet()->exitNetwork();
179+
180+
ReplicationExecutor& executor = getReplExecutor();
181+
// We signal this "ping" event and the executor will signal "pong" event in return.
182+
auto ping = assertGet(executor.makeEvent());
183+
auto pong = assertGet(executor.makeEvent());
184+
auto fn = [&executor, pong](const ReplicationExecutor::CallbackArgs& cbData) {
185+
ASSERT_OK(cbData.status);
186+
executor.signalEvent(pong);
187+
};
188+
189+
// Wait for a future event.
190+
executor.onEvent(ping, fn);
191+
ASSERT_EQUALS(0, executor.getDiagnosticBSON().getFieldDotted("queues.ready").Int());
192+
executor.signalEvent(ping);
193+
executor.waitForEvent(pong);
194+
}
195+
196+
// Equivalent to EventChainAndWaitingTest::onGoAfterTriggered
197+
TEST_F(ReplicationExecutorTest, ScheduleCallbackOnSignaledEvent) {
198+
launchExecutorThread();
199+
getNet()->exitNetwork();
200+
201+
ReplicationExecutor& executor = getReplExecutor();
202+
// We signal this "ping" event and the executor will signal "pong" event in return.
203+
auto ping = assertGet(executor.makeEvent());
204+
auto pong = assertGet(executor.makeEvent());
205+
auto fn = [&executor, pong](const ReplicationExecutor::CallbackArgs& cbData) {
206+
ASSERT_OK(cbData.status);
207+
executor.signalEvent(pong);
208+
};
209+
210+
// Wait for a signaled event.
211+
executor.signalEvent(ping);
212+
executor.onEvent(ping, fn);
213+
executor.waitForEvent(pong);
214+
}
215+
216+
TEST_F(ReplicationExecutorTest, ScheduleCallbackAtNow) {
217+
launchExecutorThread();
218+
getNet()->exitNetwork();
219+
220+
ReplicationExecutor& executor = getReplExecutor();
221+
auto finishEvent = assertGet(executor.makeEvent());
222+
auto fn = [&executor, finishEvent](const ReplicationExecutor::CallbackArgs& cbData) {
223+
ASSERT_OK(cbData.status);
224+
executor.signalEvent(finishEvent);
225+
};
226+
227+
auto cb = executor.scheduleWorkAt(getNet()->now(), fn);
228+
executor.waitForEvent(finishEvent);
229+
}
230+
231+
TEST_F(ReplicationExecutorTest, ScheduleCallbackAtAFutureTime) {
232+
launchExecutorThread();
233+
getNet()->exitNetwork();
234+
235+
ReplicationExecutor& executor = getReplExecutor();
236+
auto finishEvent = assertGet(executor.makeEvent());
237+
auto fn = [&executor, finishEvent](const ReplicationExecutor::CallbackArgs& cbData) {
238+
ASSERT_OK(cbData.status);
239+
executor.signalEvent(finishEvent);
240+
};
241+
242+
auto now = getNet()->now();
243+
now += Milliseconds(1000);
244+
auto cb = executor.scheduleWorkAt(now, fn);
245+
246+
getNet()->enterNetwork();
247+
getNet()->runUntil(now);
248+
getNet()->exitNetwork();
249+
250+
executor.waitForEvent(finishEvent);
251+
}
252+
253+
174254
} // namespace
175255
} // namespace repl
176256
} // namespace mongo

0 commit comments

Comments
 (0)