Skip to content

Commit 289d877

Browse files
committed
SERVER-20928 Synchronize stepdown process with events
1 parent b25a1fb commit 289d877

File tree

5 files changed

+90
-89
lines changed

5 files changed

+90
-89
lines changed

src/mongo/db/repl/replication_coordinator_impl.cpp

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
405405
_setCurrentRSConfig_inlock(cbData, localConfig, myIndex.getValue());
406406
_setMyLastOptimeAndReport_inlock(&lk, lastOpTime, false);
407407
_externalState->setGlobalTimestamp(lastOpTime.getTimestamp());
408+
// Step down is impossible, so we don't need to wait for the returned event.
408409
_updateTerm_incallback(term);
409410
LOG(1) << "Current term is now " << term;
410411
if (lk.owns_lock()) {
@@ -1724,22 +1725,27 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result)
17241725
}
17251726

17261727
void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
1727-
_scheduleWorkAndWaitForCompletion(stdx::bind(
1728-
&ReplicationCoordinatorImpl::_processReplSetMetadata_incallback, this, replMetadata));
1728+
EventHandle evh;
1729+
_scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) {
1730+
evh = _processReplSetMetadata_incallback(replMetadata);
1731+
});
1732+
if (evh.isValid()) {
1733+
_replExecutor.waitForEvent(evh);
1734+
}
17291735
}
17301736

17311737
void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
17321738
stdx::lock_guard<stdx::mutex> lock(_mutex);
17331739
_cancelAndRescheduleElectionTimeout_inlock();
17341740
}
17351741

1736-
void ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
1742+
EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
17371743
const rpc::ReplSetMetadata& replMetadata) {
17381744
if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
1739-
return;
1745+
return EventHandle();
17401746
}
17411747
_setLastCommittedOpTime(replMetadata.getLastOpCommitted());
1742-
_updateTerm_incallback(replMetadata.getTerm());
1748+
return _updateTerm_incallback(replMetadata.getTerm());
17431749
}
17441750

17451751
bool ReplicationCoordinatorImpl::getMaintenanceMode() {
@@ -1908,11 +1914,6 @@ Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs&
19081914
}
19091915
fassert(18508, cbh.getStatus());
19101916
_replExecutor.wait(cbh.getValue());
1911-
1912-
// Wait if heartbeat causes stepdown.
1913-
if (_stepDownFinishedEvent.isValid()) {
1914-
_replExecutor.waitForEvent(_stepDownFinishedEvent);
1915-
}
19161917
return result;
19171918
}
19181919

@@ -2849,12 +2850,6 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
28492850
if (!termStatus.isOK() && termStatus.code() != ErrorCodes::StaleTerm)
28502851
return termStatus;
28512852

2852-
// Term update may cause current primary step down, we need to wait until it
2853-
// finishes so that it won't close our connection.
2854-
if (_stepDownFinishedEvent.isValid()) {
2855-
_replExecutor.waitForEvent(_stepDownFinishedEvent);
2856-
}
2857-
28582853
Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"};
28592854
CBHStatus cbh = _replExecutor.scheduleWork(
28602855
stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish,
@@ -2993,10 +2988,6 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
29932988
fassert(28645, cbh.getStatus());
29942989
_replExecutor.wait(cbh.getValue());
29952990

2996-
// Wait if heartbeat causes stepdown.
2997-
if (_stepDownFinishedEvent.isValid()) {
2998-
_replExecutor.waitForEvent(_stepDownFinishedEvent);
2999-
}
30002991
return result;
30012992
}
30022993

@@ -3084,65 +3075,80 @@ void ReplicationCoordinatorImpl::_getTerm_helper(const ReplicationExecutor::Call
30843075
*term = _topCoord->getTerm();
30853076
}
30863077

3087-
StatusWith<ReplicationExecutor::CallbackHandle> ReplicationCoordinatorImpl::updateTerm_nonBlocking(
3088-
long long term, bool* updated) {
3078+
EventHandle ReplicationCoordinatorImpl::updateTerm_forTest(long long term, bool* updated) {
3079+
auto finishEvhStatus = _replExecutor.makeEvent();
3080+
invariantOK(finishEvhStatus.getStatus());
3081+
EventHandle finishEvh = finishEvhStatus.getValue();
3082+
auto signalFinishEvent =
3083+
[this, finishEvh](const CallbackArgs&) { this->_replExecutor.signalEvent(finishEvh); };
3084+
auto work = [this, term, updated, signalFinishEvent](const CallbackArgs& args) {
3085+
auto evh = _updateTerm_incallback(term, updated);
3086+
if (evh.isValid()) {
3087+
_replExecutor.onEvent(evh, signalFinishEvent);
3088+
} else {
3089+
signalFinishEvent(args);
3090+
}
3091+
};
3092+
_scheduleWork(work);
3093+
return finishEvh;
3094+
}
3095+
3096+
Status ReplicationCoordinatorImpl::updateTerm(long long term) {
30893097
// Term is only valid if we are replicating.
30903098
if (getReplicationMode() != modeReplSet) {
30913099
return {ErrorCodes::BadValue, "cannot supply 'term' without active replication"};
30923100
}
30933101

30943102
if (!isV1ElectionProtocol()) {
30953103
// Do not update if not in V1 protocol.
3096-
return ReplicationExecutor::CallbackHandle();
3104+
return Status::OK();
30973105
}
30983106

3099-
auto work =
3100-
[this, term, updated](const CallbackArgs&) { *updated = _updateTerm_incallback(term); };
3101-
return _scheduleWork(work);
3102-
}
3103-
3104-
Status ReplicationCoordinatorImpl::updateTerm(long long term) {
31053107
bool updated = false;
3106-
auto result = updateTerm_nonBlocking(term, &updated);
3107-
if (!result.isOK()) {
3108-
return result.getStatus();
3109-
}
3110-
auto handle = result.getValue();
3111-
if (handle.isValid()) {
3112-
_replExecutor.wait(handle);
3108+
EventHandle finishEvh;
3109+
auto work = [this, term, &updated, &finishEvh](const CallbackArgs&) {
3110+
finishEvh = _updateTerm_incallback(term, &updated);
3111+
};
3112+
_scheduleWorkAndWaitForCompletion(work);
3113+
// Wait for potential stepdown to finish.
3114+
if (finishEvh.isValid()) {
3115+
_replExecutor.waitForEvent(finishEvh);
31133116
}
3114-
31153117
if (updated) {
31163118
return {ErrorCodes::StaleTerm, "Replication term of this node was stale; retry query"};
31173119
}
31183120

31193121
return Status::OK();
31203122
}
31213123

3122-
bool ReplicationCoordinatorImpl::_updateTerm_incallback(long long term) {
3124+
EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(long long term, bool* updated) {
31233125
if (!isV1ElectionProtocol()) {
31243126
LOG(3) << "Cannot update term in election protocol version 0";
3125-
return false;
3127+
return EventHandle();
31263128
}
31273129

31283130
auto now = _replExecutor.now();
3129-
bool updated = _topCoord->updateTerm(term, now);
3131+
bool termUpdated = _topCoord->updateTerm(term, now);
31303132
{
31313133
stdx::lock_guard<stdx::mutex> lock(_mutex);
31323134
_cachedTerm = _topCoord->getTerm();
31333135

3134-
if (updated) {
3136+
if (termUpdated) {
31353137
_cancelPriorityTakeover_inlock();
31363138
_cancelAndRescheduleElectionTimeout_inlock();
31373139
}
31383140
}
31393141

3140-
if (updated && getMemberState().primary()) {
3142+
if (updated) {
3143+
*updated = termUpdated;
3144+
}
3145+
3146+
if (termUpdated && getMemberState().primary()) {
31413147
log() << "stepping down from primary, because a new term has begun: " << term;
31423148
_topCoord->prepareForStepDown();
3143-
_stepDownStart();
3149+
return _stepDownStart();
31443150
}
3145-
return updated;
3151+
return EventHandle();
31463152
}
31473153

31483154
SnapshotName ReplicationCoordinatorImpl::reserveSnapshotName(OperationContext* txn) {
@@ -3246,12 +3252,6 @@ void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
32463252
}
32473253
}
32483254

3249-
void ReplicationCoordinatorImpl::waitForStepDownFinish_forTest() {
3250-
if (_stepDownFinishedEvent.isValid()) {
3251-
_replExecutor.waitForEvent(_stepDownFinishedEvent);
3252-
}
3253-
}
3254-
32553255
void ReplicationCoordinatorImpl::_resetElectionInfoOnProtocolVersionUpgrade(
32563256
const ReplicaSetConfig& newConfig) {
32573257
// On protocol version upgrade, reset last vote as if I just learned the term 0 from other

src/mongo/db/repl/replication_coordinator_impl.h

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,11 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
363363

364364
/**
365365
* Non-blocking version of updateTerm.
366-
* Returns callback handle that we can use to wait for the operation to complete.
367-
* When the operation is complete (wait() returns), 'updated' will be set to true
368-
* if the term increased.
366+
* Returns event handle that we can use to wait for the operation to complete.
367+
* When the operation is complete (waitForEvent() returns), 'updated' will be set to true
368+
* if the term increased and potential stepdown has finished.
369369
*/
370-
StatusWith<ReplicationExecutor::CallbackHandle> updateTerm_nonBlocking(long long term,
371-
bool* updated);
370+
ReplicationExecutor::EventHandle updateTerm_forTest(long long term, bool* updated);
372371

373372
/**
374373
* If called after _startElectSelfV1(), blocks until all asynchronous
@@ -383,12 +382,6 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
383382
*/
384383
void waitForElectionDryRunFinish_forTest();
385384

386-
/**
387-
* If called after a stepdown starts, blocks until all asynchronous activities associated with
388-
* stepdown complete.
389-
*/
390-
void waitForStepDownFinish_forTest();
391-
392385
private:
393386
using CallbackFn = executor::TaskExecutor::CallbackFn;
394387

@@ -968,7 +961,7 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
968961
*/
969962
void _requestRemotePrimaryStepdown(const HostAndPort& target);
970963

971-
void _stepDownStart();
964+
ReplicationExecutor::EventHandle _stepDownStart();
972965

973966
/**
974967
* Completes a step-down of the current node. Must be run with a global
@@ -1050,16 +1043,19 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
10501043
/**
10511044
* Callback that attempts to set the current term in topology coordinator and
10521045
* relinquishes primary if the term actually changes and we are primary.
1053-
* Returns true if the term increased.
1046+
* *updated will be true if the term increased.
1047+
* Returns the finish event if it does not finish in this function, for example,
1048+
* due to stepdown, otherwise the returned EventHandle is invalid.
10541049
*/
1055-
bool _updateTerm_incallback(long long term);
1050+
EventHandle _updateTerm_incallback(long long term, bool* updated = nullptr);
10561051

10571052
/**
10581053
* Callback that processes the ReplSetMetadata returned from a command run against another
10591054
* replica set member and updates protocol version 1 information (most recent optime that is
10601055
* committed, member id of the current PRIMARY, the current config version and the current term)
1056+
* Returns the finish event which is invalid if the process has already finished.
10611057
*/
1062-
void _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata);
1058+
EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata);
10631059

10641060
/**
10651061
* Blesses a snapshot to be used for new committed reads.

src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
180180
}
181181

182182
log() << "dry election run succeeded, running for election";
183-
_updateTerm_incallback(originalTerm + 1);
183+
// Stepdown is impossible from this term update.
184+
bool updated = false;
185+
_updateTerm_incallback(originalTerm + 1, &updated);
186+
invariant(updated);
184187
// Secure our vote for ourself first
185188
_topCoord->voteForMyselfV1();
186189

src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
132132
StatusWith<rpc::ReplSetMetadata> replMetadata =
133133
rpc::ReplSetMetadata::readFromMetadata(cbData.response.getValue().metadata);
134134
if (replMetadata.isOK()) {
135+
// Asynchronous stepdown could happen, but it will be queued in executor after
136+
// this function, so we cannot and don't need to wait for it to finish.
135137
_processReplSetMetadata_incallback(replMetadata.getValue());
136138
}
137139
}
@@ -142,6 +144,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
142144

143145
if (responseStatus.isOK()) {
144146
networkTime = cbData.response.getValue().elapsedMillis;
147+
// TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this
148+
// and update tests.
145149
_updateTerm_incallback(hbStatusResponse.getValue().getTerm());
146150
// Postpone election timeout if we have a successful heartbeat response from the primary.
147151
const auto& hbResponse = hbStatusResponse.getValue();
@@ -225,6 +229,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
225229
invariant(action.getPrimaryConfigIndex() == _selfIndex);
226230
log() << "Stepping down from primary in response to heartbeat";
227231
_topCoord->prepareForStepDown();
232+
// Don't need to wait for stepdown to finish.
228233
_stepDownStart();
229234
break;
230235
case HeartbeatResponseAction::StepDownRemotePrimary: {
@@ -281,17 +286,17 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort
281286
}
282287
}
283288

284-
void ReplicationCoordinatorImpl::_stepDownStart() {
285-
auto event = _makeEvent();
286-
if (!event) {
287-
return;
289+
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() {
290+
auto finishEvent = _makeEvent();
291+
if (!finishEvent) {
292+
return finishEvent;
288293
}
289-
_stepDownFinishedEvent = event;
290294
_replExecutor.scheduleWorkWithGlobalExclusiveLock(
291295
stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
292296
this,
293297
stdx::placeholders::_1,
294-
_stepDownFinishedEvent));
298+
finishEvent));
299+
return finishEvent;
295300
}
296301

297302
void ReplicationCoordinatorImpl::_stepDownFinish(
@@ -588,6 +593,8 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
588593
// downstream.
589594
HeartbeatResponseAction action =
590595
_topCoord->setMemberAsDown(now, memberIndex, _getMyLastOptime_inlock());
596+
// Don't mind potential asynchronous stepdown as this is the last step of
597+
// liveness check.
591598
_handleHeartbeatResponseAction(action, makeStatusWith<ReplSetHeartbeatResponse>());
592599
}
593600
}

src/mongo/db/repl/replication_coordinator_impl_test.cpp

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,7 +1284,6 @@ TEST_F(ReplCoordTest, UpdateTerm) {
12841284
Handle cbHandle;
12851285
ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(2).code());
12861286
ASSERT_EQUALS(2, getReplCoord()->getTerm());
1287-
getReplCoord()->waitForStepDownFinish_forTest();
12881287
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
12891288
}
12901289

@@ -1322,33 +1321,25 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
13221321
replExec->scheduleWorkWithGlobalExclusiveLock(stepDownFinishBlocker);
13231322

13241323
bool termUpdated2 = false;
1325-
auto updateTermResult2 = getReplCoord()->updateTerm_nonBlocking(2, &termUpdated2);
1326-
ASSERT_OK(updateTermResult2.getStatus());
1324+
auto updateTermEvh2 = getReplCoord()->updateTerm_forTest(2, &termUpdated2);
1325+
ASSERT(updateTermEvh2.isValid());
13271326

13281327
bool termUpdated3 = false;
1329-
auto updateTermResult3 = getReplCoord()->updateTerm_nonBlocking(3, &termUpdated3);
1330-
ASSERT_OK(updateTermResult3.getStatus());
1328+
auto updateTermEvh3 = getReplCoord()->updateTerm_forTest(3, &termUpdated3);
1329+
ASSERT(updateTermEvh3.isValid());
13311330

13321331
// Unblock 'stepDownFinishBlocker'. Tasks for updateTerm and _stepDownFinish should proceed.
13331332
barrier.countDownAndWait();
13341333

13351334
// Both _updateTerm_incallback tasks should be scheduled.
1336-
auto handle2 = updateTermResult2.getValue();
1337-
ASSERT_TRUE(handle2.isValid());
1338-
replExec->wait(handle2);
1335+
replExec->waitForEvent(updateTermEvh2);
13391336
ASSERT_TRUE(termUpdated2);
1340-
1341-
auto handle3 = updateTermResult3.getValue();
1342-
ASSERT_TRUE(handle3.isValid());
1343-
replExec->wait(handle3);
1337+
replExec->waitForEvent(updateTermEvh3);
13441338
ASSERT_TRUE(termUpdated3);
13451339

13461340
ASSERT_EQUALS(3, getReplCoord()->getTerm());
13471341

1348-
// Ensure all global exclusive lock tasks (eg. _stepDownFinish) run to completion.
1349-
auto work = [](const executor::TaskExecutor::CallbackArgs&) {};
1350-
replExec->wait(unittest::assertGet(replExec->scheduleWorkWithGlobalExclusiveLock(work)));
1351-
getReplCoord()->waitForStepDownFinish_forTest();
1342+
// Update term event handles will wait for potential stepdown.
13521343
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
13531344
}
13541345

@@ -3310,7 +3301,11 @@ TEST_F(ReplCoordTest, LivenessElectionTimeout) {
33103301
}
33113302
}
33123303
getNet()->exitNetwork();
3313-
getReplCoord()->waitForStepDownFinish_forTest();
3304+
3305+
// Ensure all global exclusive lock tasks (eg. _stepDownFinish) run to completion.
3306+
auto exec = getReplExec();
3307+
auto work = [](const executor::TaskExecutor::CallbackArgs&) {};
3308+
exec->wait(unittest::assertGet(exec->scheduleWorkWithGlobalExclusiveLock(work)));
33143309
ASSERT_EQUALS(MemberState::RS_SECONDARY, getReplCoord()->getMemberState().s);
33153310
}
33163311

0 commit comments

Comments
 (0)