Skip to content

Commit 7bf7e4e

Browse files
committed
SERVER-23660 Implement scatter gather runner to scan all nodes for highest oplog entry
1 parent 2a7f7fa commit 7bf7e4e

File tree

8 files changed

+569
-44
lines changed

8 files changed

+569
-44
lines changed

src/mongo/db/repl/SConscript

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ env.Library('repl_coordinator_impl',
337337
'check_quorum_for_config_change.cpp',
338338
'elect_cmd_runner.cpp',
339339
'freshness_checker.cpp',
340+
'freshness_scanner.cpp',
340341
'repl_client_info.cpp',
341342
'replica_set_config_checks.cpp',
342343
'replication_coordinator_impl.cpp',
@@ -935,3 +936,17 @@ env.CppUnitTest(
935936
],
936937
)
937938

939+
env.CppUnitTest(
940+
target='freshness_scanner_test',
941+
source=[
942+
'freshness_scanner_test.cpp',
943+
],
944+
LIBDEPS=[
945+
'repl_coordinator_impl',
946+
'replica_set_messages',
947+
'replmocks',
948+
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
949+
'$BUILD_DIR/mongo/db/commands_test_crutch',
950+
'$BUILD_DIR/mongo/db/service_context_noop_init',
951+
],
952+
)
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* Copyright 2016 MongoDB Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License, version 3,
6+
* as published by the Free Software Foundation.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* GNU Affero General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU Affero General Public License
14+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
*
16+
* As a special exception, the copyright holders give permission to link the
17+
* code of portions of this program with the OpenSSL library under certain
18+
* conditions as described in each individual source file and distribute
19+
* linked combinations including the program with the OpenSSL library. You
20+
* must comply with the GNU Affero General Public License in all respects for
21+
* all of the code used other than as permitted herein. If you modify file(s)
22+
* with this exception, you may extend this exception to your version of the
23+
* file(s), but you are not obligated to do so. If you do not wish to do so,
24+
* delete this exception statement from your version. If you delete this
25+
* exception statement from all source files in the program, then also delete
26+
* it in the license file.
27+
*/
28+
29+
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
30+
31+
#include "mongo/platform/basic.h"
32+
33+
#include "mongo/db/repl/freshness_scanner.h"
34+
35+
#include "mongo/base/status.h"
36+
#include "mongo/db/repl/bson_extract_optime.h"
37+
#include "mongo/db/repl/optime.h"
38+
#include "mongo/db/repl/replication_executor.h"
39+
#include "mongo/db/repl/scatter_gather_runner.h"
40+
#include "mongo/rpc/get_status_from_command_result.h"
41+
#include "mongo/util/log.h"
42+
43+
namespace mongo {
44+
namespace repl {
45+
46+
using executor::RemoteCommandRequest;
47+
48+
FreshnessScanner::Algorithm::Algorithm(const ReplicaSetConfig& rsConfig,
49+
int myIndex,
50+
Milliseconds timeout)
51+
: _rsConfig(rsConfig), _myIndex(myIndex), _timeout(timeout) {
52+
for (int index = 0; index < _rsConfig.getNumMembers(); index++) {
53+
if (index != _myIndex) {
54+
_targets.push_back(_rsConfig.getMemberAt(index).getHostAndPort());
55+
}
56+
}
57+
_totalRequests = _targets.size();
58+
}
59+
60+
std::vector<RemoteCommandRequest> FreshnessScanner::Algorithm::getRequests() const {
61+
BSONObjBuilder cmdBuilder;
62+
cmdBuilder << "replSetGetStatus" << 1;
63+
const BSONObj getStatusCmd = cmdBuilder.obj();
64+
65+
std::vector<RemoteCommandRequest> requests;
66+
for (auto& target : _targets) {
67+
requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, _timeout));
68+
}
69+
return requests;
70+
}
71+
72+
void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& request,
73+
const ResponseStatus& response) {
74+
_responsesProcessed++;
75+
if (!response.isOK()) { // failed response
76+
LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": "
77+
<< response.getStatus();
78+
} else {
79+
BSONObj opTimesObj = response.getValue().data.getObjectField("optimes");
80+
OpTime lastOpTime;
81+
Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime);
82+
if (!status.isOK()) {
83+
return;
84+
}
85+
86+
int index = _rsConfig.findMemberIndexByHostAndPort(request.target);
87+
FreshnessInfo freshnessInfo{index, lastOpTime};
88+
89+
auto cmp =
90+
[](const FreshnessInfo& a, const FreshnessInfo& b) { return a.opTime > b.opTime; };
91+
auto iter =
92+
std::upper_bound(_freshnessInfos.begin(), _freshnessInfos.end(), freshnessInfo, cmp);
93+
_freshnessInfos.insert(iter, freshnessInfo);
94+
}
95+
}
96+
97+
bool FreshnessScanner::Algorithm::hasReceivedSufficientResponses() const {
98+
return _responsesProcessed == _totalRequests;
99+
}
100+
101+
FreshnessScanner::Result FreshnessScanner::Algorithm::getResult() const {
102+
invariant(hasReceivedSufficientResponses());
103+
return _freshnessInfos;
104+
}
105+
106+
StatusWith<ReplicationExecutor::EventHandle> FreshnessScanner::start(
107+
ReplicationExecutor* executor,
108+
const ReplicaSetConfig& rsConfig,
109+
int myIndex,
110+
Milliseconds timeout) {
111+
_algorithm.reset(new Algorithm(rsConfig, myIndex, timeout));
112+
_runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
113+
return _runner->start();
114+
}
115+
116+
void FreshnessScanner::cancel() {
117+
_runner->cancel();
118+
}
119+
120+
FreshnessScanner::Result FreshnessScanner::getResult() const {
121+
return _algorithm->getResult();
122+
}
123+
124+
} // namespace repl
125+
} // namespace mongo
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright (C) 2016 MongoDB Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License, version 3,
6+
* as published by the Free Software Foundation.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* GNU Affero General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU Affero General Public License
14+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
*
16+
* As a special exception, the copyright holders give permission to link the
17+
* code of portions of this program with the OpenSSL library under certain
18+
* conditions as described in each individual source file and distribute
19+
* linked combinations including the program with the OpenSSL library. You
20+
* must comply with the GNU Affero General Public License in all respects for
21+
* all of the code used other than as permitted herein. If you modify file(s)
22+
* with this exception, you may extend this exception to your version of the
23+
* file(s), but you are not obligated to do so. If you do not wish to do so,
24+
* delete this exception statement from your version. If you delete this
25+
* exception statement from all source files in the program, then also delete
26+
* it in the license file.
27+
*/
28+
29+
#pragma once
30+
31+
#include <memory>
32+
#include <vector>
33+
34+
#include "mongo/base/disallow_copying.h"
35+
#include "mongo/bson/timestamp.h"
36+
#include "mongo/db/repl/optime.h"
37+
#include "mongo/db/repl/replica_set_config.h"
38+
#include "mongo/db/repl/replication_executor.h"
39+
#include "mongo/db/repl/scatter_gather_algorithm.h"
40+
#include "mongo/db/repl/scatter_gather_runner.h"
41+
#include "mongo/stdx/functional.h"
42+
43+
namespace mongo {
44+
45+
class Status;
46+
47+
namespace repl {
48+
49+
class ScatterGatherRunner;
50+
51+
class FreshnessScanner {
52+
MONGO_DISALLOW_COPYING(FreshnessScanner);
53+
54+
public:
55+
struct FreshnessInfo {
56+
// The index of node in ReplicaSetConfig.
57+
int index;
58+
// The latest applied opTime on that node.
59+
OpTime opTime;
60+
};
61+
62+
using Result = std::vector<FreshnessInfo>;
63+
64+
class Algorithm : public ScatterGatherAlgorithm {
65+
public:
66+
Algorithm(const ReplicaSetConfig& rsConfig, int myIndex, Milliseconds timeout);
67+
virtual std::vector<executor::RemoteCommandRequest> getRequests() const;
68+
virtual void processResponse(const executor::RemoteCommandRequest& request,
69+
const ResponseStatus& response);
70+
virtual bool hasReceivedSufficientResponses() const;
71+
72+
/**
73+
* Returns a sorted list of nodes in descending lastAppliedOptime order.
74+
*
75+
* It is invalid to call this before hasReceivedSufficientResponses returns true.
76+
*/
77+
Result getResult() const;
78+
79+
private:
80+
const ReplicaSetConfig _rsConfig;
81+
std::vector<HostAndPort> _targets;
82+
const int _myIndex;
83+
const Milliseconds _timeout;
84+
Result _freshnessInfos;
85+
int _responsesProcessed = 0;
86+
int _totalRequests = 0;
87+
};
88+
89+
FreshnessScanner() = default;
90+
virtual ~FreshnessScanner() = default;
91+
92+
/**
93+
* Begins the process of sending replSetGetFreshness commands to all nodes
94+
* in currentConfig, in attempt to find the most up-to-date oplog.
95+
*
96+
* evh can be used to schedule a callback when the process is complete.
97+
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
98+
**/
99+
StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
100+
const ReplicaSetConfig& rsConfig,
101+
int myIndex,
102+
Milliseconds timeout);
103+
104+
/**
105+
* Informs the FreshnessScanner to cancel further processing.
106+
*/
107+
void cancel();
108+
109+
Result getResult() const;
110+
111+
private:
112+
std::unique_ptr<Algorithm> _algorithm;
113+
std::unique_ptr<ScatterGatherRunner> _runner;
114+
};
115+
116+
} // namespace repl
117+
} // namespace mongo

0 commit comments

Comments
 (0)