Skip to content

Commit 551fb26

Browse files
committed
SERVER-22136 Attach term metadata to UpdatePosition command
1 parent 8d41cd9 commit 551fb26

12 files changed

+128
-85
lines changed

src/mongo/db/dbcommands.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
#include "mongo/db/write_concern.h"
9393
#include "mongo/rpc/metadata.h"
9494
#include "mongo/rpc/metadata/config_server_metadata.h"
95+
#include "mongo/rpc/metadata/repl_set_metadata.h"
9596
#include "mongo/rpc/metadata/server_selection_metadata.h"
9697
#include "mongo/rpc/metadata/sharding_metadata.h"
9798
#include "mongo/rpc/protocol.h"
@@ -1237,8 +1238,9 @@ void appendOpTimeMetadata(OperationContext* txn,
12371238
// Attach our own last opTime.
12381239
repl::OpTime lastOpTimeFromClient =
12391240
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
1240-
replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, metadataBob);
1241-
1241+
if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
1242+
replCoord->prepareReplMetadata(lastOpTimeFromClient, metadataBob);
1243+
}
12421244
// For commands from mongos, append some info to help getLastError(w) work.
12431245
// TODO: refactor out of here as part of SERVER-18236
12441246
if (isShardingAware || isConfig) {

src/mongo/db/repl/replication_coordinator.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -679,9 +679,8 @@ class ReplicationCoordinator : public SyncSourceSelector {
679679
/**
680680
* Prepares a metadata object describing the current term, primary, and lastOp information.
681681
*/
682-
virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request,
683-
const OpTime& lastOpTimeFromClient,
684-
BSONObjBuilder* builder) = 0;
682+
virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
683+
BSONObjBuilder* builder) const = 0;
685684

686685
/**
687686
* Returns true if the V1 election protocol is being used and false otherwise.
@@ -759,7 +758,7 @@ class ReplicationCoordinator : public SyncSourceSelector {
759758
/**
760759
* Gets the latest OpTime of the currentCommittedSnapshot.
761760
*/
762-
virtual OpTime getCurrentCommittedSnapshotOpTime() = 0;
761+
virtual OpTime getCurrentCommittedSnapshotOpTime() const = 0;
763762

764763
/**
765764
* Appends connection information to the provided BSONObjBuilder.

src/mongo/db/repl/replication_coordinator_impl.cpp

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
338338
return _priorityTakeoverWhen;
339339
}
340340

341-
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() {
341+
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
342342
stdx::lock_guard<stdx::mutex> lk(_mutex);
343343
if (_currentCommittedSnapshot) {
344344
return _currentCommittedSnapshot->opTime;
@@ -1889,49 +1889,55 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const {
18891889

18901890
StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(
18911891
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) const {
1892-
stdx::lock_guard<stdx::mutex> lock(_mutex);
1893-
invariant(_rsConfig.isInitialized());
1894-
// Do not send updates if we have been removed from the config.
1895-
if (_selfIndex == -1) {
1896-
return Status(ErrorCodes::NodeNotFound,
1897-
"This node is not in the current replset configuration.");
1898-
}
18991892
BSONObjBuilder cmdBuilder;
1900-
cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
1901-
// Create an array containing objects each live member connected to us and for ourself.
1902-
BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
1903-
for (const auto& slaveInfo : _slaveInfo) {
1904-
if (slaveInfo.lastAppliedOpTime.isNull()) {
1905-
// Don't include info on members we haven't heard from yet.
1906-
continue;
1907-
}
1908-
// Don't include members we think are down.
1909-
if (!slaveInfo.self && slaveInfo.down) {
1910-
continue;
1911-
}
1893+
{
1894+
stdx::lock_guard<stdx::mutex> lock(_mutex);
1895+
invariant(_rsConfig.isInitialized());
1896+
// Do not send updates if we have been removed from the config.
1897+
if (_selfIndex == -1) {
1898+
return Status(ErrorCodes::NodeNotFound,
1899+
"This node is not in the current replset configuration.");
1900+
}
1901+
cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
1902+
// Create an array containing objects each live member connected to us and for ourself.
1903+
BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
1904+
for (const auto& slaveInfo : _slaveInfo) {
1905+
if (slaveInfo.lastAppliedOpTime.isNull()) {
1906+
// Don't include info on members we haven't heard from yet.
1907+
continue;
1908+
}
1909+
// Don't include members we think are down.
1910+
if (!slaveInfo.self && slaveInfo.down) {
1911+
continue;
1912+
}
19121913

1913-
BSONObjBuilder entry(arrayBuilder.subobjStart());
1914-
switch (commandStyle) {
1915-
case ReplSetUpdatePositionCommandStyle::kNewStyle:
1916-
slaveInfo.lastDurableOpTime.append(&entry,
1917-
UpdatePositionArgs::kDurableOpTimeFieldName);
1918-
slaveInfo.lastAppliedOpTime.append(&entry,
1919-
UpdatePositionArgs::kAppliedOpTimeFieldName);
1920-
break;
1921-
case ReplSetUpdatePositionCommandStyle::kOldStyle:
1922-
entry.append("_id", slaveInfo.rid);
1923-
if (isV1ElectionProtocol()) {
1924-
slaveInfo.lastDurableOpTime.append(&entry, "optime");
1925-
} else {
1926-
entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp());
1927-
}
1928-
break;
1914+
BSONObjBuilder entry(arrayBuilder.subobjStart());
1915+
switch (commandStyle) {
1916+
case ReplSetUpdatePositionCommandStyle::kNewStyle:
1917+
slaveInfo.lastDurableOpTime.append(&entry,
1918+
UpdatePositionArgs::kDurableOpTimeFieldName);
1919+
slaveInfo.lastAppliedOpTime.append(&entry,
1920+
UpdatePositionArgs::kAppliedOpTimeFieldName);
1921+
break;
1922+
case ReplSetUpdatePositionCommandStyle::kOldStyle:
1923+
entry.append("_id", slaveInfo.rid);
1924+
if (isV1ElectionProtocol()) {
1925+
slaveInfo.lastDurableOpTime.append(&entry, "optime");
1926+
} else {
1927+
entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp());
1928+
}
1929+
break;
1930+
}
1931+
entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId);
1932+
entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
19291933
}
1930-
entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId);
1931-
entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
1934+
arrayBuilder.done();
19321935
}
1933-
arrayBuilder.done();
19341936

1937+
// Add metadata to command. Old style parsing logic will reject the metadata.
1938+
if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) {
1939+
prepareReplMetadata(OpTime(), &cmdBuilder);
1940+
}
19351941
return cmdBuilder.obj();
19361942
}
19371943

@@ -3121,18 +3127,15 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
31213127
return Status::OK();
31223128
}
31233129

3124-
void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
3125-
const OpTime& lastOpTimeFromClient,
3126-
BSONObjBuilder* builder) {
3127-
if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
3128-
rpc::ReplSetMetadata metadata;
3129-
LockGuard topoLock(_topoMutex);
3130+
void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
3131+
BSONObjBuilder* builder) const {
3132+
rpc::ReplSetMetadata metadata;
3133+
LockGuard topoLock(_topoMutex);
31303134

3131-
OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
3132-
OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
3133-
_topCoord->prepareReplResponseMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
3134-
metadata.writeToMetadata(builder);
3135-
}
3135+
OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
3136+
OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
3137+
_topCoord->prepareReplMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
3138+
metadata.writeToMetadata(builder);
31363139
}
31373140

31383141
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {

src/mongo/db/repl/replication_coordinator_impl.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,8 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
286286
const ReplSetRequestVotesArgs& args,
287287
ReplSetRequestVotesResponse* response) override;
288288

289-
void prepareReplResponseMetadata(const rpc::RequestInterface&,
290-
const OpTime& lastOpTimeFromClient,
291-
BSONObjBuilder* builder) override;
289+
void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
290+
BSONObjBuilder* builder) const override;
292291

293292
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
294293
ReplSetHeartbeatResponse* response) override;
@@ -313,7 +312,7 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
313312

314313
virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override;
315314

316-
virtual OpTime getCurrentCommittedSnapshotOpTime() override;
315+
virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
317316

318317
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
319318
const SnapshotName& untilSnapshot) override;

src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
155155
replMetadata = responseStatus;
156156
}
157157
if (replMetadata.isOK()) {
158-
// Asynchronous stepdown could happen, but it will be queued in executor after
159-
// this function, so we cannot and don't need to wait for it to finish.
158+
// Asynchronous stepdown could happen, but it will wait for _topoMutex and execute
159+
// after this function, so we cannot and don't need to wait for it to finish.
160160
_processReplSetMetadata_incallback(replMetadata.getValue());
161161
}
162162
}

src/mongo/db/repl/replication_coordinator_impl_test.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1911,7 +1911,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) {
19111911
BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand(
19121912
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle));
19131913

1914-
ASSERT_EQUALS(2, cmd.nFields());
1914+
ASSERT_EQUALS(3, cmd.nFields());
19151915
ASSERT_EQUALS(UpdatePositionArgs::kCommandFieldName, cmd.firstElement().fieldNameStringData());
19161916

19171917
std::set<long long> memberIds;
@@ -4483,6 +4483,40 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve
44834483
ASSERT_EQUALS(1U, memberIds4.size());
44844484
}
44854485

4486+
TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
4487+
assertStartSuccess(
4488+
BSON("_id"
4489+
<< "mySet"
4490+
<< "version"
4491+
<< 1
4492+
<< "members"
4493+
<< BSON_ARRAY(BSON("_id" << 0 << "host"
4494+
<< "test1:1234")
4495+
<< BSON("_id" << 1 << "host"
4496+
<< "test2:1234")
4497+
<< BSON("_id" << 2 << "host"
4498+
<< "test3:1234"))
4499+
<< "protocolVersion"
4500+
<< 1
4501+
<< "settings"
4502+
<< BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)),
4503+
HostAndPort("test1", 1234));
4504+
OpTime optime(Timestamp(100, 2), 0);
4505+
getReplCoord()->setMyLastAppliedOpTime(optime);
4506+
getReplCoord()->setMyLastDurableOpTime(optime);
4507+
4508+
// Set last committed optime via metadata.
4509+
rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
4510+
getReplCoord()->processReplSetMetadata(syncSourceMetadata);
4511+
getReplCoord()->onSnapshotCreate(optime, SnapshotName(1));
4512+
4513+
BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand(
4514+
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle));
4515+
auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd));
4516+
ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm());
4517+
ASSERT_EQUALS(metadata.getLastOpVisible(), optime);
4518+
}
4519+
44864520
TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) {
44874521
assertStartSuccess(
44884522
BSON("_id"

src/mongo/db/repl/replication_coordinator_mock.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,9 +377,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes(
377377
return Status::OK();
378378
}
379379

380-
void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request,
381-
const OpTime& lastOpTimeFromClient,
382-
BSONObjBuilder* builder) {}
380+
void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
381+
BSONObjBuilder* builder) const {}
383382

384383
Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
385384
ReplSetHeartbeatResponse* response) {
@@ -414,7 +413,7 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho
414413

415414
void ReplicationCoordinatorMock::dropAllSnapshots() {}
416415

417-
OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() {
416+
OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const {
418417
return OpTime();
419418
}
420419

src/mongo/db/repl/replication_coordinator_mock.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,8 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator {
216216
const ReplSetRequestVotesArgs& args,
217217
ReplSetRequestVotesResponse* response);
218218

219-
void prepareReplResponseMetadata(const rpc::RequestInterface& request,
220-
const OpTime& lastOpTimeFromClient,
221-
BSONObjBuilder* builder) override;
219+
void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
220+
BSONObjBuilder* builder) const override;
222221

223222
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
224223
ReplSetHeartbeatResponse* response);
@@ -241,7 +240,7 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator {
241240

242241
virtual void dropAllSnapshots() override;
243242

244-
virtual OpTime getCurrentCommittedSnapshotOpTime() override;
243+
virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
245244

246245
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
247246
const SnapshotName& untilSnapshot) override;

src/mongo/db/repl/replset_commands.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,9 @@ class CmdReplSetUpdatePosition : public ReplSetCommand {
658658
int,
659659
string& errmsg,
660660
BSONObjBuilder& result) {
661-
Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result);
661+
auto replCoord = repl::ReplicationCoordinator::get(txn->getClient()->getServiceContext());
662+
663+
Status status = replCoord->checkReplEnabledForCommand(&result);
662664
if (!status.isOK())
663665
return appendCommandStatus(result, status);
664666

@@ -667,6 +669,14 @@ class CmdReplSetUpdatePosition : public ReplSetCommand {
667669
if (cmdObj.hasField("handshake"))
668670
return true;
669671

672+
auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj);
673+
if (metadataResult.isOK()) {
674+
// New style update position command has metadata, which may inform the
675+
// upstream of a higher term.
676+
auto metadata = metadataResult.getValue();
677+
replCoord->processReplSetMetadata(metadata);
678+
}
679+
670680
// In the case of an update from a member with an invalid replica set config,
671681
// we return our current config version.
672682
long long configVersion = -1;
@@ -676,8 +686,7 @@ class CmdReplSetUpdatePosition : public ReplSetCommand {
676686
status = args.initialize(cmdObj);
677687
if (status.isOK()) {
678688
// v3.2.4+ style replSetUpdatePosition command.
679-
status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition(
680-
args, &configVersion);
689+
status = replCoord->processReplSetUpdatePosition(args, &configVersion);
681690

682691
if (status == ErrorCodes::InvalidReplicaSetConfig) {
683692
result.append("configVersion", configVersion);
@@ -690,8 +699,7 @@ class CmdReplSetUpdatePosition : public ReplSetCommand {
690699
if (!status.isOK())
691700
return appendCommandStatus(result, status);
692701

693-
status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition(
694-
oldArgs, &configVersion);
702+
status = replCoord->processReplSetUpdatePosition(oldArgs, &configVersion);
695703

696704
if (status == ErrorCodes::InvalidReplicaSetConfig) {
697705
result.append("configVersion", configVersion);

src/mongo/db/repl/topology_coordinator.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,9 +414,9 @@ class TopologyCoordinator {
414414
/**
415415
* Prepares a BSONObj describing the current term, primary, and lastOp information.
416416
*/
417-
virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
418-
const OpTime& lastVisibleOpTime,
419-
const OpTime& lastCommittedOpTime) const = 0;
417+
virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
418+
const OpTime& lastVisibleOpTime,
419+
const OpTime& lastCommittedOpTime) const = 0;
420420

421421
/**
422422
* Writes into 'output' all the information needed to generate a summary of the current

0 commit comments

Comments
 (0)