Skip to content

Commit 504c299

Browse files
committed
SERVER-24222 Update current known primary from command metadata
This reverts commit ed3f25c. Fixed replication legacy test suite.
1 parent 190c3c7 commit 504c299

20 files changed

+146
-134
lines changed

src/mongo/db/repl/bgsync.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalS
8585
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
8686
BackgroundSync* bgsync);
8787
bool shouldStopFetching(const HostAndPort& source,
88-
const OpTime& sourceOpTime,
89-
bool sourceHasSyncSource) override;
88+
const rpc::ReplSetMetadata& metadata) override;
9089

9190
private:
9291
BackgroundSync* _bgsync;
@@ -99,15 +98,13 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground
9998
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
10099
_bgsync(bgsync) {}
101100

102-
bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source,
103-
const OpTime& sourceOpTime,
104-
bool sourceHasSyncSource) {
101+
bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
102+
const HostAndPort& source, const rpc::ReplSetMetadata& metadata) {
105103
if (_bgsync->shouldStopFetching()) {
106104
return true;
107105
}
108106

109-
return DataReplicatorExternalStateImpl::shouldStopFetching(
110-
source, sourceOpTime, sourceHasSyncSource);
107+
return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata);
111108
}
112109

113110
/**

src/mongo/db/repl/data_replicator_external_state.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ class DataReplicatorExternalState {
7777
* metadata).
7878
*/
7979
virtual bool shouldStopFetching(const HostAndPort& source,
80-
const OpTime& sourceOpTime,
81-
bool sourceHasSyncSource) = 0;
80+
const rpc::ReplSetMetadata& metadata) = 0;
8281

8382
private:
8483
/**

src/mongo/db/repl/data_replicator_external_state_impl.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,12 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata
6060
}
6161

6262
bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
63-
const OpTime& sourceOpTime,
64-
bool sourceHasSyncSource) {
63+
const rpc::ReplSetMetadata& metadata) {
6564
// Re-evaluate quality of sync target.
66-
if (_replicationCoordinator->shouldChangeSyncSource(
67-
source, sourceOpTime, sourceHasSyncSource)) {
65+
if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) {
6866
LOG(1) << "Canceling oplog query because we have to choose a sync source. Current source: "
69-
<< source << ", OpTime " << sourceOpTime
70-
<< ", hasSyncSource:" << sourceHasSyncSource;
67+
<< source << ", OpTime " << metadata.getLastOpVisible()
68+
<< ", its sync source index:" << metadata.getSyncSourceIndex();
7169
return true;
7270
}
7371
return false;

src/mongo/db/repl/data_replicator_external_state_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ class DataReplicatorExternalStateImpl : public DataReplicatorExternalState {
5151
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
5252

5353
bool shouldStopFetching(const HostAndPort& source,
54-
const OpTime& sourceOpTime,
55-
bool sourceHasSyncSource) override;
54+
const rpc::ReplSetMetadata& metadata) override;
5655

5756
private:
5857
StatusWith<OpTime> _multiApply(OperationContext* txn,

src/mongo/db/repl/data_replicator_external_state_mock.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata
4747
}
4848

4949
bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source,
50-
const OpTime& sourceOpTime,
51-
bool sourceHasSyncSource) {
50+
const rpc::ReplSetMetadata& metadata) {
5251
lastSyncSourceChecked = source;
53-
syncSourceLastOpTime = sourceOpTime;
54-
syncSourceHasSyncSource = sourceHasSyncSource;
52+
syncSourceLastOpTime = metadata.getLastOpVisible();
53+
syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
5554
return shouldStopFetchingResult;
5655
}
5756

src/mongo/db/repl/data_replicator_external_state_mock.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ class DataReplicatorExternalStateMock : public DataReplicatorExternalState {
4848
void processMetadata(const rpc::ReplSetMetadata& metadata) override;
4949

5050
bool shouldStopFetching(const HostAndPort& source,
51-
const OpTime& sourceOpTime,
52-
bool sourceHasSyncSource) override;
51+
const rpc::ReplSetMetadata& metadata) override;
5352

5453
// Returned by getCurrentTermAndLastCommittedOpTime.
5554
long long currentTerm = OpTime::kUninitializedTerm;

src/mongo/db/repl/data_replicator_test.cpp

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector {
8383
_blacklistedSource = host;
8484
}
8585
bool shouldChangeSyncSource(const HostAndPort& currentSource,
86-
const OpTime& sourcesOpTime,
87-
bool syncSourceHasSyncSource) override {
86+
const rpc::ReplSetMetadata& metadata) override {
8887
return false;
8988
}
9089
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
@@ -126,10 +125,8 @@ class DataReplicatorTest : public ReplicationExecutorTest, public SyncSourceSele
126125
_syncSourceSelector->blacklistSyncSource(host, until);
127126
}
128127
bool shouldChangeSyncSource(const HostAndPort& currentSource,
129-
const OpTime& sourcesOpTime,
130-
bool syncSourceHasSyncSource) override {
131-
return _syncSourceSelector->shouldChangeSyncSource(
132-
currentSource, sourcesOpTime, syncSourceHasSyncSource);
128+
const rpc::ReplSetMetadata& metadata) override {
129+
return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata);
133130
}
134131
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
135132
const OpTime& lastOpTimeFetched) override {
@@ -718,8 +715,7 @@ class TestSyncSourceSelector2 : public SyncSourceSelector {
718715
_blacklistedSource = host;
719716
}
720717
bool shouldChangeSyncSource(const HostAndPort& currentSource,
721-
const OpTime& sourcesOpTime,
722-
bool syncSourceHasSyncSource) override {
718+
const rpc::ReplSetMetadata& metadata) override {
723719
return false;
724720
}
725721
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
@@ -850,8 +846,7 @@ class ShutdownExecutorSyncSourceSelector : public SyncSourceSelector {
850846
}
851847
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {}
852848
bool shouldChangeSyncSource(const HostAndPort& currentSource,
853-
const OpTime& sourcesOpTime,
854-
bool syncSourceHasSyncSource) override {
849+
const rpc::ReplSetMetadata& metadata) override {
855850
return false;
856851
}
857852
SyncSourceResolverResponse selectSyncSource(OperationContext* txn,

src/mongo/db/repl/oplog_fetcher.cpp

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,8 @@ namespace {
5050
* Calculates await data timeout based on the current replica set configuration.
5151
*/
5252
Milliseconds calculateAwaitDataTimeout(const ReplicaSetConfig& config) {
53-
// Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent
54-
// on the election
55-
// timeout. This enables the sync source to communicate liveness of the
56-
// primary to secondaries.
53+
// Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election
54+
// timeout. This enables the sync source to communicate liveness of the primary to secondaries.
5755
// Under protocol version 0, use a default timeout of 2 seconds for awaitData.
5856
if (config.getProtocolVersion() == 1LL) {
5957
return config.getElectionTimeoutPeriod() / 2;
@@ -111,19 +109,15 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) {
111109

112110
/**
113111
* Checks the first batch of results from query.
114-
* 'documents' are the first batch of results returned from tailing the remote
115-
* oplog.
116-
* 'lastFetched' optime and hash should be consistent with the predicate in the
117-
* query.
112+
* 'documents' are the first batch of results returned from tailing the remote oplog.
113+
* 'lastFetched' optime and hash should be consistent with the predicate in the query.
118114
* Returns RemoteOplogStale if the oplog query has no results.
119-
* Returns OplogStartMissing if we cannot find the optime of the last fetched
120-
* operation in
115+
* Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
121116
* the remote oplog.
122117
*/
123118
Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) {
124119
if (documents.empty()) {
125-
// The GTE query from upstream returns nothing, so we're ahead of the
126-
// upstream.
120+
// The GTE query from upstream returns nothing, so we're ahead of the upstream.
127121
return Status(ErrorCodes::RemoteOplogStale,
128122
str::stream() << "We are ahead of the sync source. Our last op time fetched: "
129123
<< lastFetched.opTime.toString());
@@ -176,8 +170,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
176170
info.networkDocumentBytes += doc.objsize();
177171
++info.networkDocumentCount;
178172

179-
// If this is the first response (to the $gte query) then we already applied
180-
// the first doc.
173+
// If this is the first response (to the $gte query) then we already applied the first doc.
181174
if (first && info.networkDocumentCount == 1U) {
182175
continue;
183176
}
@@ -208,8 +201,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
208201
info.toApplyDocumentCount = documents.size();
209202
info.toApplyDocumentBytes = info.networkDocumentBytes;
210203
if (first) {
211-
// The count is one less since the first document found was already applied
212-
// ($gte $ts query)
204+
// The count is one less since the first document found was already applied ($gte $ts query)
213205
// and we will not apply it again.
214206
--info.toApplyDocumentCount;
215207
auto alreadyAppliedDocument = documents.cbegin();
@@ -302,11 +294,9 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
302294
}
303295

304296
const auto& queryResponse = result.getValue();
305-
OpTime sourcesLastOpTime;
306-
bool syncSourceHasSyncSource = false;
297+
rpc::ReplSetMetadata metadata;
307298

308-
// Forward metadata (containing liveness information) to data replicator
309-
// external state.
299+
// Forward metadata (containing liveness information) to data replicator external state.
310300
bool receivedMetadata =
311301
queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
312302
if (receivedMetadata) {
@@ -318,10 +308,8 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
318308
_onShutdown(metadataResult.getStatus());
319309
return;
320310
}
321-
auto metadata = metadataResult.getValue();
311+
metadata = metadataResult.getValue();
322312
_dataReplicatorExternalState->processMetadata(metadata);
323-
sourcesLastOpTime = metadata.getLastOpVisible();
324-
syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
325313
}
326314

327315
const auto& documents = queryResponse.documents;
@@ -337,8 +325,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
337325

338326
auto opTimeWithHash = getLastOpTimeWithHashFetched();
339327

340-
// Check start of remote oplog and, if necessary, stop fetcher to execute
341-
// rollback.
328+
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
342329
if (queryResponse.first) {
343330
auto status = checkRemoteOplogStart(documents, opTimeWithHash);
344331
if (!status.isOK()) {
@@ -347,8 +334,7 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
347334
return;
348335
}
349336

350-
// If this is the first batch and no rollback is needed, skip the first
351-
// document.
337+
// If this is the first batch and no rollback is needed, skip the first document.
352338
firstDocToApply++;
353339
}
354340

@@ -373,14 +359,15 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
373359
_lastFetched = opTimeWithHash;
374360
}
375361

376-
if (_dataReplicatorExternalState->shouldStopFetching(
377-
_fetcher.getSource(), sourcesLastOpTime, syncSourceHasSyncSource)) {
362+
if (_dataReplicatorExternalState->shouldStopFetching(_fetcher.getSource(), metadata)) {
378363
_onShutdown(Status(ErrorCodes::InvalidSyncSource,
379364
str::stream() << "sync source " << _fetcher.getSource().toString()
380365
<< " (last optime: "
381-
<< sourcesLastOpTime.toString()
382-
<< "; has sync source: "
383-
<< syncSourceHasSyncSource
366+
<< metadata.getLastOpVisible().toString()
367+
<< "; sync source index: "
368+
<< metadata.getSyncSourceIndex()
369+
<< "; primary index: "
370+
<< metadata.getPrimaryIndex()
384371
<< ") is no longer valid"),
385372
opTimeWithHash);
386373
return;
@@ -407,5 +394,6 @@ void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) {
407394
_onShutdownCallbackFn(status, opTimeWithHash);
408395
}
409396

397+
410398
} // namespace repl
411399
} // namespace mongo

src/mongo/db/repl/replication_coordinator_impl.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2901,14 +2901,10 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn
29012901
}
29022902

29032903
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
2904-
const OpTime& syncSourceLastOpTime,
2905-
bool syncSourceHasSyncSource) {
2904+
const rpc::ReplSetMetadata& metadata) {
29062905
LockGuard topoLock(_topoMutex);
2907-
return _topCoord->shouldChangeSyncSource(currentSource,
2908-
getMyLastAppliedOpTime(),
2909-
syncSourceLastOpTime,
2910-
syncSourceHasSyncSource,
2911-
_replExecutor.now());
2906+
return _topCoord->shouldChangeSyncSource(
2907+
currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now());
29122908
}
29132909

29142910
SyncSourceResolverResponse ReplicationCoordinatorImpl::selectSyncSource(

src/mongo/db/repl/replication_coordinator_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,7 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
275275
virtual void resetLastOpTimesFromOplog(OperationContext* txn) override;
276276

277277
virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
278-
const OpTime& syncSourceLastOpTime,
279-
bool syncSourceHasSyncSource) override;
278+
const rpc::ReplSetMetadata& metadata) override;
280279

281280
virtual SyncSourceResolverResponse selectSyncSource(OperationContext* txn,
282281
const OpTime& lastOpTimeFetched) override;

0 commit comments

Comments
 (0)