Skip to content

Commit 018b95b

Browse files
committed
SERVER-14449 Implement processReplSetSyncFrom in ReplicationCoordinatorImpl
1 parent 0b190a5 commit 018b95b

8 files changed

+271
-4
lines changed

src/mongo/db/repl/member_heartbeat_data.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,11 @@ namespace repl {
8888
_authIssue = false;
8989
}
9090

91+
void MemberHeartbeatData::setAuthIssue() {
92+
_state = MemberState::RS_UNKNOWN;
93+
_health = 0; // set health to 0 so that this doesn't count towards majority.
94+
_authIssue = true;
95+
}
96+
9197
} // namespace repl
9298
} // namespace mongo

src/mongo/db/repl/member_heartbeat_data.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ namespace repl {
9191
*/
9292
void setDownValues(Date_t now, const std::string& heartbeatMessage);
9393

94+
/**
95+
* Sets values in this object that indicate there was an auth issue on the last heartbeat
96+
* command.
97+
*/
98+
void setAuthIssue();
99+
94100
private:
95101
// This member's index into the ReplicaSetConfig
96102
int _configIndex;

src/mongo/db/repl/repl_coordinator_impl.cpp

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -706,8 +706,68 @@ namespace repl {
706706

707707
Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const std::string& target,
708708
BSONObjBuilder* resultObj) {
709-
// TODO
710-
return Status::OK();
709+
resultObj->append("syncFromRequested", target);
710+
711+
HostAndPort targetHostAndPort;
712+
Status status = targetHostAndPort.initialize(target);
713+
if (!status.isOK()) {
714+
return status;
715+
}
716+
717+
boost::lock_guard<boost::mutex> lock(_mutex);
718+
const MemberConfig& selfConfig = _rsConfig.getMemberAt(_thisMembersConfigIndex);
719+
if (selfConfig.isArbiter()) {
720+
return Status(ErrorCodes::NotSecondary, "arbiters don't sync");
721+
}
722+
if (_getCurrentMemberState_inlock().primary()) {
723+
return Status(ErrorCodes::NotSecondary, "primaries don't sync");
724+
}
725+
726+
ReplicaSetConfig::MemberIterator targetConfig = _rsConfig.membersEnd();
727+
int targetIndex = 0;
728+
for (ReplicaSetConfig::MemberIterator it = _rsConfig.membersBegin();
729+
it != _rsConfig.membersEnd(); ++it) {
730+
if (it->getHostAndPort() == targetHostAndPort) {
731+
targetConfig = it;
732+
break;
733+
}
734+
++targetIndex;
735+
}
736+
if (targetConfig == _rsConfig.membersEnd()) {
737+
return Status(ErrorCodes::NodeNotFound,
738+
str::stream() << "Could not find member \"" << target <<
739+
"\" in replica set");
740+
}
741+
if (targetIndex == _thisMembersConfigIndex) {
742+
return Status(ErrorCodes::InvalidOptions, "I cannot sync from myself");
743+
}
744+
if (targetConfig->isArbiter()) {
745+
return Status(ErrorCodes::InvalidOptions,
746+
str::stream() << "Cannot sync from \"" << target <<
747+
"\" because it is an arbiter");
748+
}
749+
if (!targetConfig->shouldBuildIndexes() && selfConfig.shouldBuildIndexes()) {
750+
// TODO(spencer): Is this check actually necessary?
751+
return Status(ErrorCodes::InvalidOptions,
752+
str::stream() << "Cannot sync from \"" << target <<
753+
"\" because it does not build indexes");
754+
}
755+
756+
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
757+
CBHStatus cbh = _replExecutor.scheduleWork(
758+
stdx::bind(&TopologyCoordinator::prepareSyncFromResponse,
759+
_topCoord.get(),
760+
stdx::placeholders::_1,
761+
targetIndex,
762+
_getLastOpApplied_inlock(),
763+
resultObj,
764+
&result));
765+
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
766+
return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
767+
}
768+
fassert(18649, cbh.getStatus());
769+
_replExecutor.wait(cbh.getValue());
770+
return result;
711771
}
712772

713773
Status ReplicationCoordinatorImpl::processReplSetMaintenance(OperationContext* txn,

src/mongo/db/repl/repl_coordinator_impl_test.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,71 @@ namespace {
693693
ASSERT_NOT_EQUALS(electionID1, electionID2);
694694
}
695695

696+
TEST_F(ReplCoordTest, TestProcessReplSetSyncFromSelfArbiter) {
697+
init("mySet/test1:1234,test2:1234,test3:1234");
698+
assertStartSuccess(
699+
BSON("_id" << "mySet" <<
700+
"version" << 2 <<
701+
"members" << BSON_ARRAY(BSON("_id" << 0 <<
702+
"host" << "test0:1234" <<
703+
"arbiterOnly" << true) <<
704+
BSON("_id" << 1 << "host" << "test1:1234"))),
705+
HostAndPort("test0", 1234));
706+
707+
// Try to sync while we are an arbiter
708+
BSONObjBuilder response;
709+
Status result = getReplCoord()->processReplSetSyncFrom("test0:1234", &response);
710+
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
711+
}
712+
713+
TEST_F(ReplCoordTest, TestProcessReplSetSyncFrom) {
714+
init("mySet/test1:1234,test2:1234,test3:1234");
715+
assertStartSuccess(
716+
BSON("_id" << "mySet" <<
717+
"version" << 2 <<
718+
"members" << BSON_ARRAY(BSON("_id" << 0 <<
719+
"host" << "test0:1234" <<
720+
"arbiterOnly" << true) <<
721+
BSON("_id" << 1 << "host" << "test1:1234") <<
722+
BSON("_id" << 2 <<
723+
"host" << "test2:1234" <<
724+
"priority" << 0 <<
725+
"buildIndexes" << false) <<
726+
BSON("_id" << 3 << "host" << "test3:1234"))),
727+
HostAndPort("test1", 1234));
728+
729+
730+
// Try to sync from an invalid URL
731+
BSONObjBuilder response;
732+
Status result = getReplCoord()->processReplSetSyncFrom("", &response);
733+
ASSERT_EQUALS(ErrorCodes::FailedToParse, result);
734+
735+
// Try to sync while in PRIMARY state
736+
getTopoCoord()._changeMemberState(MemberState::RS_PRIMARY);
737+
result = getReplCoord()->processReplSetSyncFrom("test1:1234", &response);
738+
ASSERT_EQUALS(ErrorCodes::NotSecondary, result);
739+
740+
// Try to sync from self
741+
getTopoCoord()._changeMemberState(MemberState::RS_SECONDARY);
742+
result = getReplCoord()->processReplSetSyncFrom("test1:1234", &response);
743+
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
744+
745+
// Try to sync from non-existent member
746+
result = getReplCoord()->processReplSetSyncFrom("fakemember:1234", &response);
747+
ASSERT_EQUALS(ErrorCodes::NodeNotFound, result);
748+
749+
// Try to sync from an arbiter
750+
result = getReplCoord()->processReplSetSyncFrom("test0:1234", &response);
751+
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
752+
753+
// Try to sync from a node that doesn't build indexes
754+
result = getReplCoord()->processReplSetSyncFrom("test2:1234", &response);
755+
ASSERT_EQUALS(ErrorCodes::InvalidOptions, result);
756+
757+
// Finally sync from someone valid
758+
result = getReplCoord()->processReplSetSyncFrom("test3:1234", &response);
759+
ASSERT_OK(result);
760+
}
696761
// TODO(spencer): Unit test replSetFreeze
697762

698763
} // namespace

src/mongo/db/repl/topology_coordinator.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ namespace repl {
9191
// Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY
9292
virtual void signalDrainComplete() = 0;
9393

94+
// produces a reply to a replSetSyncFrom command
95+
virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackData& data,
96+
int targetIndex,
97+
const OpTime& lastOpApplied,
98+
BSONObjBuilder* response,
99+
Status* result) = 0;
100+
94101
// produce a reply to a RAFT-style RequestVote RPC
95102
virtual void prepareRequestVoteResponse(const Date_t now,
96103
const BSONObj& cmdObj,

src/mongo/db/repl/topology_coordinator_impl.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "mongo/db/repl/repl_set_heartbeat_args.h"
4040
#include "mongo/db/repl/repl_set_heartbeat_response.h"
4141
#include "mongo/db/repl/replication_executor.h"
42+
#include "mongo/db/repl/rslog.h"
4243
#include "mongo/db/server_parameters.h"
4344
#include "mongo/util/log.h"
4445
#include "mongo/util/mongoutils/str.h"
@@ -283,6 +284,51 @@ namespace repl {
283284
*/
284285
}
285286

287+
void TopologyCoordinatorImpl::prepareSyncFromResponse(
288+
const ReplicationExecutor::CallbackData& data,
289+
int targetIndex,
290+
const OpTime& lastOpApplied,
291+
BSONObjBuilder* response,
292+
Status* result) {
293+
if (data.status == ErrorCodes::CallbackCanceled) {
294+
*result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
295+
return;
296+
}
297+
298+
const MemberConfig& targetConfig = _currentConfig.getMemberAt(targetIndex);
299+
const MemberHeartbeatData& hbdata = _hbdata[targetIndex];
300+
const std::string targetHostAndPort = targetConfig.getHostAndPort().toString();
301+
if (hbdata.hasAuthIssue()) {
302+
*result = Status(ErrorCodes::Unauthorized,
303+
"not authorized to communicate with " + targetHostAndPort);
304+
return;
305+
}
306+
if (hbdata.getHealth() == 0) {
307+
*result = Status(ErrorCodes::HostUnreachable,
308+
str::stream() << "I cannot reach the requested member: \"" <<
309+
targetHostAndPort << "\"");
310+
return;
311+
}
312+
if (hbdata.getOpTime().getSecs()+10 < lastOpApplied.getSecs()) {
313+
warning() << "attempting to sync from " << targetHostAndPort
314+
<< ", but its latest opTime is " << hbdata.getOpTime().getSecs()
315+
<< " and ours is " << lastOpApplied.getSecs() << " so this may not work"
316+
<< rsLog;
317+
response->append("warning",
318+
"requested member \"" + targetHostAndPort + "\" is more than 10 "
319+
"seconds behind us");
320+
// not returning bad Status, just warning
321+
}
322+
323+
HostAndPort prevSyncSource = getSyncSourceAddress();
324+
if (!prevSyncSource.empty()) {
325+
response->append("prevSyncTarget", prevSyncSource.toString());
326+
}
327+
328+
setForceSyncSourceIndex(targetIndex);
329+
*result = Status::OK();
330+
}
331+
286332
// Produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command
287333
// The caller should validate that the message is for the correct set, and has the required data
288334
void TopologyCoordinatorImpl::prepareRequestVoteResponse(const Date_t now,

src/mongo/db/repl/topology_coordinator_impl.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ namespace repl {
7474

7575
virtual void setCommitOkayThrough(const OpTime& optime);
7676
virtual void setLastReceived(const OpTime& optime);
77+
// TODO(spencer): Can this be made private?
7778
virtual void setForceSyncSourceIndex(int index);
7879

7980
// Looks up syncSource's address and returns it, for use by the Applier
@@ -94,6 +95,13 @@ namespace repl {
9495
// Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY
9596
virtual void signalDrainComplete();
9697

98+
// produces a reply to a replSetSyncFrom command
99+
virtual void prepareSyncFromResponse(const ReplicationExecutor::CallbackData& data,
100+
int targetIndex,
101+
const OpTime& lastOpApplied,
102+
BSONObjBuilder* response,
103+
Status* result);
104+
97105
// produces a reply to a RAFT-style RequestVote RPC
98106
virtual void prepareRequestVoteResponse(const Date_t now,
99107
const BSONObj& cmdObj,
@@ -208,7 +216,7 @@ namespace repl {
208216
// the member we currently believe is primary, if one exists
209217
int _currentPrimaryIndex;
210218
// the member we are currently syncing from
211-
// NULL if no sync source (we are primary, or we cannot connect to anyone yet)
219+
// -1 if no sync source (we are primary, or we cannot connect to anyone yet)
212220
int _syncSourceIndex;
213221
// These members are not chosen as sync sources for a period of time, due to connection
214222
// issues with them
@@ -238,7 +246,10 @@ namespace repl {
238246
const MemberConfig& _selfConfig(); // Helper shortcut to self config
239247

240248
ReplicaSetConfig _currentConfig; // The current config, including a vector of MemberConfigs
241-
std::vector<MemberHeartbeatData> _hbdata; // heartbeat data for each member
249+
// heartbeat data for each member. It is guaranteed that this vector will be maintained
250+
// in the same order as the MemberConfigs in _currentConfig, therefore the member config
251+
// index can be used to index into this vector as well.
252+
std::vector<MemberHeartbeatData> _hbdata;
242253

243254
// Time when stepDown command expires
244255
Date_t _stepDownUntil;

src/mongo/db/repl/topology_coordinator_impl_test.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,72 @@ namespace {
386386
ASSERT_EQUALS(topocoord.getSyncSourceAddress(),HostAndPort("h3"));
387387
}
388388

389+
TEST(TopologyCoordinator, PrepareSyncFromResponse) {
390+
ReplicationExecutor::CallbackHandle cbh;
391+
ReplicationExecutor::CallbackData cbData(NULL,
392+
cbh,
393+
Status::OK());
394+
ReplicaSetConfig config;
395+
396+
ASSERT_OK(config.initialize(BSON("_id" << "rs0" <<
397+
"version" << 1 <<
398+
"members" << BSON_ARRAY(
399+
BSON("_id" << 10 << "host" << "hself") <<
400+
BSON("_id" << 20 << "host" << "h1") <<
401+
BSON("_id" << 30 << "host" << "h2") <<
402+
BSON("_id" << 40 << "host" << "h3")))));
403+
404+
OpTime staleOpTime(1, 1);
405+
OpTime ourOpTime(staleOpTime.getSecs() + 11, 1);
406+
407+
408+
TopologyCoordinatorImpl topocoord((Seconds(999)));
409+
Date_t now = 0;
410+
topocoord.updateConfig(cbData, config, 0, now++, OpTime(0,0));
411+
412+
MemberHeartbeatData hselfInfo(0);
413+
hselfInfo.setAuthIssue();
414+
topocoord.updateHeartbeatData(now++, hselfInfo, 10, OpTime(0,0));
415+
416+
MemberHeartbeatData h1Info(1);
417+
h1Info.setDownValues(now, "");
418+
topocoord.updateHeartbeatData(now++, h1Info, 20, OpTime(0,0));
419+
420+
MemberHeartbeatData h2Info(2);
421+
h2Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), staleOpTime, "", "");
422+
topocoord.updateHeartbeatData(now++, h2Info, 30, OpTime(0,0));
423+
424+
MemberHeartbeatData h3Info(2);
425+
h3Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), ourOpTime, "", "");
426+
topocoord.updateHeartbeatData(now++, h3Info, 40, OpTime(0,0));
427+
428+
Status result = Status::OK();
429+
BSONObjBuilder response0;
430+
topocoord.prepareSyncFromResponse(cbData, 0, ourOpTime, &response0, &result);
431+
ASSERT_EQUALS(ErrorCodes::Unauthorized, result);
432+
433+
BSONObjBuilder response1;
434+
topocoord.prepareSyncFromResponse(cbData, 1, ourOpTime, &response1, &result);
435+
ASSERT_EQUALS(ErrorCodes::HostUnreachable, result);
436+
437+
BSONObjBuilder response2;
438+
topocoord.prepareSyncFromResponse(cbData, 2, ourOpTime, &response2, &result);
439+
ASSERT_OK(result);
440+
topocoord.chooseNewSyncSource(now++, ourOpTime);
441+
ASSERT_EQUALS(topocoord.getSyncSourceAddress(), HostAndPort("h2"));
442+
ASSERT_EQUALS("requested member \"h2:27017\" is more than 10 seconds behind us",
443+
response2.obj()["warning"].String());
444+
445+
BSONObjBuilder response3;
446+
topocoord.prepareSyncFromResponse(cbData, 3, ourOpTime, &response3, &result);
447+
ASSERT_OK(result);
448+
topocoord.chooseNewSyncSource(now++, ourOpTime);
449+
ASSERT_EQUALS(topocoord.getSyncSourceAddress(), HostAndPort("h3"));
450+
BSONObj response3Obj = response3.obj();
451+
ASSERT_FALSE(response3Obj.hasField("warning"));
452+
ASSERT_EQUALS(HostAndPort("h2").toString(), response3Obj["prevSyncTarget"].String());
453+
}
454+
389455
TEST(TopologyCoordinator, ReplSetGetStatus) {
390456
// This test starts by configuring a TopologyCoordinator as a member of a 4 node replica
391457
// set, with each node in a different state.

0 commit comments

Comments
 (0)