Skip to content

Commit a6caba6

Browse files
committed
SERVER-23648 Move executorForAddShard out of ShardRegistry and into addShard imlementation
1 parent 6b3b92d commit a6caba6

13 files changed

+163
-138
lines changed

src/mongo/db/commands/conn_pool_stats.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class PoolStats final : public Command {
8888
auto registry = grid.shardRegistry();
8989
if (registry) {
9090
registry->appendConnectionStats(&stats);
91+
grid.catalogManager(txn)->appendConnectionStats(&stats);
9192
}
9293

9394
// Output to a BSON object.

src/mongo/db/s/sharding_state_test.cpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,8 @@ void initGrid(OperationContext* txn, const ConnectionString& configConnString) {
7373
auto executorPool = stdx::make_unique<executor::TaskExecutorPool>();
7474
executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExec));
7575

76-
// Set up executor used for a few special operations during addShard.
77-
auto specialNet(stdx::make_unique<executor::NetworkInterfaceMock>());
78-
// auto specialMockNet = specialNet.get();
79-
auto specialExec = makeThreadPoolTestExecutor(std::move(specialNet));
80-
81-
auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory),
82-
std::move(executorPool),
83-
mockNetwork,
84-
std::move(specialExec),
85-
configConnString));
76+
auto shardRegistry(stdx::make_unique<ShardRegistry>(
77+
std::move(shardFactory), std::move(executorPool), mockNetwork, configConnString));
8678
shardRegistry->startup();
8779

8880
grid.init(

src/mongo/s/catalog/catalog_manager.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ template <typename T>
6363
class StatusWith;
6464
class TagsType;
6565

66+
namespace executor {
67+
struct ConnectionPoolStats;
68+
}
69+
6670
/**
6771
* Used to indicate to the caller of the removeShard method whether draining of chunks for
6872
* a particular shard has started, is ongoing, or has been completed.
@@ -439,6 +443,11 @@ class CatalogManager {
439443
virtual Status appendInfoForConfigServerDatabases(OperationContext* txn,
440444
BSONArrayBuilder* builder) = 0;
441445

446+
/**
447+
* Append information about the connection pools owned by the CatalogManager.
448+
*/
449+
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
450+
442451

443452
virtual StatusWith<DistLockManager::ScopedDistLock> distLock(
444453
OperationContext* txn,

src/mongo/s/catalog/catalog_manager_mock.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,4 +236,6 @@ Status CatalogManagerMock::appendInfoForConfigServerDatabases(OperationContext*
236236
return Status::OK();
237237
}
238238

239+
void CatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
240+
239241
} // namespace mongo

src/mongo/s/catalog/catalog_manager_mock.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ class CatalogManagerMock : public CatalogManager {
168168
Status appendInfoForConfigServerDatabases(OperationContext* txn,
169169
BSONArrayBuilder* builder) override;
170170

171+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
172+
171173
private:
172174
std::unique_ptr<DistLockManagerMock> _mockDistLockMgr;
173175
};

src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -124,27 +124,13 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
124124
response->setOk(false);
125125
}
126126

127-
/**
128-
* Validates that the specified connection string can serve as a shard server. In particular,
129-
* this function checks that the shard can be contacted, that it is not already member of
130-
* another sharded cluster and etc.
131-
*
132-
* @param shardRegistry Shard registry to use for opening connections to the shards.
133-
* @param connectionString Connection string to be attempted as a shard host.
134-
* @param shardProposedName Optional proposed name for the shard. Can be omitted in which case
135-
* a unique name for the shard will be generated from the shard's connection string. If it
136-
* is not omitted, the value cannot be the empty string.
137-
*
138-
* On success returns a partially initialized shard type object corresponding to the requested
139-
* shard. It will have the hostName field set and optionally the name, if the name could be
140-
* generated from either the proposed name or the connection string set name. The returned
141-
* shard's name should be checked and if empty, one should be generated using some uniform
142-
* algorithm.
143-
*/
144-
StatusWith<ShardType> validateHostAsShard(OperationContext* txn,
145-
ShardRegistry* shardRegistry,
146-
const ConnectionString& connectionString,
147-
const std::string* shardProposedName) {
127+
} // namespace
128+
129+
StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard(
130+
OperationContext* txn,
131+
ShardRegistry* shardRegistry,
132+
const ConnectionString& connectionString,
133+
const std::string* shardProposedName) {
148134
if (connectionString.type() == ConnectionString::INVALID) {
149135
return {ErrorCodes::BadValue, "Invalid connection string"};
150136
}
@@ -153,14 +139,13 @@ StatusWith<ShardType> validateHostAsShard(OperationContext* txn,
153139
return {ErrorCodes::BadValue, "shard name cannot be empty"};
154140
}
155141

142+
// TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
156143
const std::shared_ptr<Shard> shardConn{shardRegistry->createConnection(connectionString)};
157144
invariant(shardConn);
158-
159-
const ReadPreferenceSetting readPref{ReadPreference::PrimaryOnly};
145+
auto targeter = shardConn->getTargeter();
160146

161147
// Is it mongos?
162-
auto cmdStatus = shardRegistry->runIdempotentCommandForAddShard(
163-
txn, shardConn, readPref, "admin", BSON("isdbgrid" << 1));
148+
auto cmdStatus = _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isdbgrid" << 1));
164149
if (!cmdStatus.isOK()) {
165150
return cmdStatus.getStatus();
166151
}
@@ -171,8 +156,7 @@ StatusWith<ShardType> validateHostAsShard(OperationContext* txn,
171156
}
172157

173158
// Is it a replica set?
174-
cmdStatus = shardRegistry->runIdempotentCommandForAddShard(
175-
txn, shardConn, readPref, "admin", BSON("isMaster" << 1));
159+
cmdStatus = _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
176160
if (!cmdStatus.isOK()) {
177161
return cmdStatus.getStatus();
178162
}
@@ -205,8 +189,7 @@ StatusWith<ShardType> validateHostAsShard(OperationContext* txn,
205189
}
206190

207191
// Is it a mongos config server?
208-
cmdStatus = shardRegistry->runIdempotentCommandForAddShard(
209-
txn, shardConn, readPref, "admin", BSON("replSetGetStatus" << 1));
192+
cmdStatus = _runCommandForAddShard(txn, targeter.get(), "admin", BSON("replSetGetStatus" << 1));
210193
if (!cmdStatus.isOK()) {
211194
return cmdStatus.getStatus();
212195
}
@@ -298,22 +281,15 @@ StatusWith<ShardType> validateHostAsShard(OperationContext* txn,
298281
return shard;
299282
}
300283

301-
/**
302-
* Runs the listDatabases command on the specified host and returns the names of all databases
303-
* it returns excluding those named local and admin, since they serve administrative purpose.
304-
*/
305-
StatusWith<std::vector<std::string>> getDBNamesListFromShard(
284+
StatusWith<std::vector<std::string>> CatalogManagerReplicaSet::_getDBNamesListFromShard(
306285
OperationContext* txn, ShardRegistry* shardRegistry, const ConnectionString& connectionString) {
286+
// TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
307287
const std::shared_ptr<Shard> shardConn{
308288
shardRegistry->createConnection(connectionString).release()};
309289
invariant(shardConn);
310290

311-
auto cmdStatus = shardRegistry->runIdempotentCommandForAddShard(
312-
txn,
313-
shardConn,
314-
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
315-
"admin",
316-
BSON("listDatabases" << 1));
291+
auto cmdStatus = _runCommandForAddShard(
292+
txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1));
317293
if (!cmdStatus.isOK()) {
318294
return cmdStatus.getStatus();
319295
}
@@ -338,15 +314,22 @@ StatusWith<std::vector<std::string>> getDBNamesListFromShard(
338314
return dbNames;
339315
}
340316

341-
} // namespace
342-
343-
CatalogManagerReplicaSet::CatalogManagerReplicaSet(std::unique_ptr<DistLockManager> distLockManager)
344-
: _distLockManager(std::move(distLockManager)) {}
317+
CatalogManagerReplicaSet::CatalogManagerReplicaSet(
318+
std::unique_ptr<DistLockManager> distLockManager,
319+
std::unique_ptr<executor::TaskExecutor> addShardExecutor)
320+
: _distLockManager(std::move(distLockManager)),
321+
_executorForAddShard(std::move(addShardExecutor)) {}
345322

346323
CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default;
347324

348325
Status CatalogManagerReplicaSet::startup(OperationContext* txn) {
326+
stdx::lock_guard<stdx::mutex> lk(_mutex);
327+
if (_started) {
328+
return Status::OK();
329+
}
330+
_started = true;
349331
_distLockManager->startUp();
332+
_executorForAddShard->startup();
350333
return Status::OK();
351334
}
352335

@@ -359,6 +342,43 @@ void CatalogManagerReplicaSet::shutDown(OperationContext* txn) {
359342

360343
invariant(_distLockManager);
361344
_distLockManager->shutDown(txn);
345+
_executorForAddShard->shutdown();
346+
_executorForAddShard->join();
347+
}
348+
349+
StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
350+
OperationContext* txn,
351+
RemoteCommandTargeter* targeter,
352+
const std::string& dbName,
353+
const BSONObj& cmdObj) {
354+
auto host = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly},
355+
RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
356+
if (!host.isOK()) {
357+
return host.getStatus();
358+
}
359+
360+
executor::RemoteCommandRequest request(
361+
host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
362+
StatusWith<executor::RemoteCommandResponse> responseStatus =
363+
Status(ErrorCodes::InternalError, "Internal error running command");
364+
365+
auto callStatus = _executorForAddShard->scheduleRemoteCommand(
366+
request,
367+
[&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
368+
responseStatus = args.response;
369+
});
370+
if (!callStatus.isOK()) {
371+
return callStatus.getStatus();
372+
}
373+
374+
// Block until the command is carried out
375+
_executorForAddShard->wait(callStatus.getValue());
376+
377+
if (!responseStatus.isOK()) {
378+
return responseStatus.getStatus();
379+
}
380+
381+
return responseStatus.getValue().data.getOwned();
362382
}
363383

364384
StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
@@ -367,7 +387,7 @@ StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
367387
const long long maxSize) {
368388
// Validate the specified connection string may serve as shard at all
369389
auto shardStatus =
370-
validateHostAsShard(txn, grid.shardRegistry(), shardConnectionString, shardProposedName);
390+
_validateHostAsShard(txn, grid.shardRegistry(), shardConnectionString, shardProposedName);
371391
if (!shardStatus.isOK()) {
372392
// TODO: This is a workaround for the case were we could have some bad shard being
373393
// requested to be added and we put that bad connection string on the global replica set
@@ -379,7 +399,7 @@ StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
379399

380400
ShardType& shardType = shardStatus.getValue();
381401

382-
auto dbNamesStatus = getDBNamesListFromShard(txn, grid.shardRegistry(), shardConnectionString);
402+
auto dbNamesStatus = _getDBNamesListFromShard(txn, grid.shardRegistry(), shardConnectionString);
383403
if (!dbNamesStatus.isOK()) {
384404
return dbNamesStatus.getStatus();
385405
}
@@ -1969,4 +1989,8 @@ Status CatalogManagerReplicaSet::appendInfoForConfigServerDatabases(OperationCon
19691989
return Status::OK();
19701990
}
19711991

1992+
void CatalogManagerReplicaSet::appendConnectionStats(executor::ConnectionPoolStats* stats) {
1993+
_executorForAddShard->appendConnectionStats(stats);
1994+
}
1995+
19721996
} // namespace mongo

src/mongo/s/catalog/replset/catalog_manager_replica_set.h

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class VersionType;
4545
*/
4646
class CatalogManagerReplicaSet final : public CatalogManager {
4747
public:
48-
explicit CatalogManagerReplicaSet(std::unique_ptr<DistLockManager> distLockManager);
48+
CatalogManagerReplicaSet(std::unique_ptr<DistLockManager> distLockManager,
49+
std::unique_ptr<executor::TaskExecutor> addShardExecutor);
4950
virtual ~CatalogManagerReplicaSet();
5051

5152
/**
@@ -179,6 +180,8 @@ class CatalogManagerReplicaSet final : public CatalogManager {
179180
Status appendInfoForConfigServerDatabases(OperationContext* txn,
180181
BSONArrayBuilder* builder) override;
181182

183+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
184+
182185
/**
183186
* Runs a read command against the config server with majority read concern.
184187
*/
@@ -212,6 +215,37 @@ class CatalogManagerReplicaSet final : public CatalogManager {
212215
*/
213216
StatusWith<std::string> _generateNewShardName(OperationContext* txn);
214217

218+
/**
219+
* Validates that the specified connection string can serve as a shard server. In particular,
220+
* this function checks that the shard can be contacted, that it is not already member of
221+
* another sharded cluster and etc.
222+
*
223+
* @param shardRegistry Shard registry to use for getting a targeter to the shard-to-be.
224+
* @param connectionString Connection string to be attempted as a shard host.
225+
* @param shardProposedName Optional proposed name for the shard. Can be omitted in which case
226+
* a unique name for the shard will be generated from the shard's connection string. If it
227+
* is not omitted, the value cannot be the empty string.
228+
*
229+
* On success returns a partially initialized ShardType object corresponding to the requested
230+
* shard. It will have the hostName field set and optionally the name, if the name could be
231+
* generated from either the proposed name or the connection string set name. The returned
232+
* shard's name should be checked and if empty, one should be generated using some uniform
233+
* algorithm.
234+
*/
235+
StatusWith<ShardType> _validateHostAsShard(OperationContext* txn,
236+
ShardRegistry* shardRegistry,
237+
const ConnectionString& connectionString,
238+
const std::string* shardProposedName);
239+
240+
/**
241+
* Runs the listDatabases command on the specified host and returns the names of all databases
242+
* it returns excluding those named local and admin, since they serve administrative purpose.
243+
*/
244+
StatusWith<std::vector<std::string>> _getDBNamesListFromShard(
245+
OperationContext* txn,
246+
ShardRegistry* shardRegistry,
247+
const ConnectionString& connectionString);
248+
215249
/**
216250
* Creates the specified collection name in the config database.
217251
*/
@@ -236,6 +270,15 @@ class CatalogManagerReplicaSet final : public CatalogManager {
236270
const NamespaceString& ns,
237271
BSONObj query);
238272

273+
/**
274+
* Runs a command against a "shard" that is not yet in the cluster and thus not present in the
275+
* ShardRegistry.
276+
*/
277+
StatusWith<BSONObj> _runCommandForAddShard(OperationContext* txn,
278+
RemoteCommandTargeter* targeter,
279+
const std::string& dbName,
280+
const BSONObj& cmdObj);
281+
239282
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
240283
OperationContext* txn,
241284
const ReadPreferenceSetting& readPref,
@@ -289,12 +332,20 @@ class CatalogManagerReplicaSet final : public CatalogManager {
289332

290333
stdx::mutex _mutex;
291334

292-
// Distribted lock manager singleton.
335+
// Distributed lock manager singleton.
293336
std::unique_ptr<DistLockManager> _distLockManager; // (R)
294337

338+
// Executor specifically used for sending commands to servers that are in the process of being
339+
// added as shards. Does not have any connection hook set on it, thus it can be used to talk
340+
// to servers that are not yet in the ShardRegistry.
341+
std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R)
342+
295343
// True if shutDown() has been called. False, otherwise.
296344
bool _inShutdown = false; // (M)
297345

346+
// True if startup() has been called.
347+
bool _started = false; // (M)
348+
298349
// Last known highest opTime from the config server.
299350
repl::OpTime _configOpTime; // (M)
300351

src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -485,12 +485,11 @@ TEST_F(AddShardTest, UnreachableHost) {
485485
ASSERT_EQUALS("host unreachable", status.getStatus().reason());
486486
});
487487

488-
for (int i = 0; i < 3; i++) { // ShardRegistry will retry 3 times
489-
onCommandForAddShard([](const RemoteCommandRequest& request) {
490-
ASSERT_EQ(request.target, HostAndPort("StandaloneHost:12345"));
491-
return StatusWith<BSONObj>{ErrorCodes::HostUnreachable, "host unreachable"};
492-
});
493-
}
488+
onCommandForAddShard([](const RemoteCommandRequest& request) {
489+
ASSERT_EQ(request.target, HostAndPort("StandaloneHost:12345"));
490+
return StatusWith<BSONObj>{ErrorCodes::HostUnreachable, "host unreachable"};
491+
});
492+
494493

495494
future.timed_get(kFutureTimeout);
496495
}

src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,9 @@ class DistLockCatalogFixture : public mongo::unittest::Test {
122122
auto executorPool = stdx::make_unique<executor::TaskExecutorPool>();
123123
executorPool->addExecutors(std::move(executorsForPool), std::move(fixedExecutor));
124124

125-
auto addShardExecutor = executor::makeThreadPoolTestExecutor(
126-
stdx::make_unique<executor::NetworkInterfaceMock>());
127-
128125
ConnectionString configCS(HostAndPort("dummy:1234"));
129-
_shardRegistry = stdx::make_unique<ShardRegistry>(stdx::make_unique<ShardFactoryMock>(),
130-
std::move(executorPool),
131-
network,
132-
std::move(addShardExecutor),
133-
configCS);
126+
_shardRegistry = stdx::make_unique<ShardRegistry>(
127+
stdx::make_unique<ShardFactoryMock>(), std::move(executorPool), network, configCS);
134128
_shardRegistry->startup();
135129

136130
_distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(_shardRegistry.get());

0 commit comments

Comments
 (0)