Skip to content

Commit b83a9ba

Browse files
committed
SERVER-17969 Move shardCollection operation to the catalog manager
Used by both mapReduce and shardCollection commands. Conflicts: src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp
1 parent 6610529 commit b83a9ba

File tree

7 files changed

+191
-163
lines changed

7 files changed

+191
-163
lines changed

src/mongo/s/catalog/catalog_manager.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ namespace mongo {
4444
class DatabaseType;
4545
class OperationContext;
4646
class Shard;
47+
class ShardKeyPattern;
4748
class ShardType;
4849
class Status;
4950
template<typename T> class StatusWith;
@@ -82,6 +83,26 @@ namespace mongo {
8283
*/
8384
virtual Status enableSharding(const std::string& dbName) = 0;
8485

86+
/**
87+
* Shards a collection. Assumes that the database is enabled for sharding.
88+
*
89+
* @param ns: namespace of collection to shard
90+
* @param fieldsAndOrder: shardKey pattern
91+
* @param unique: if true, ensure underlying index enforces a unique constraint.
92+
* @param initPoints: create chunks based on a set of specified split points.
93+
* @param initShards: if NULL, use primary shard as lone shard for DB.
94+
*
95+
* WARNING: It's not completely safe to place initial chunks onto non-primary
96+
* shards using this method because a conflict may result if multiple map-reduce
97+
* operations are writing to the same output collection, for instance.
98+
*
99+
*/
100+
virtual Status shardCollection(const std::string& ns,
101+
const ShardKeyPattern& fieldsAndOrder,
102+
bool unique,
103+
std::vector<BSONObj>* initPoints,
104+
std::vector<Shard>* initShards = NULL) = 0;
105+
85106
/**
86107
*
87108
* Adds a new shard. It expects a standalone mongod process or replica set to be running

src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,13 @@
4949
#include "mongo/s/catalog/type_changelog.h"
5050
#include "mongo/s/catalog/type_chunk.h"
5151
#include "mongo/s/catalog/type_shard.h"
52+
#include "mongo/s/chunk_manager.h"
5253
#include "mongo/s/client/dbclient_multi_command.h"
5354
#include "mongo/s/client/shard_connection.h"
5455
#include "mongo/s/distlock.h"
5556
#include "mongo/s/client/shard.h"
57+
#include "mongo/s/config.h"
58+
#include "mongo/s/shard_key_pattern.h"
5659
#include "mongo/s/type_database.h"
5760
#include "mongo/s/write_ops/batched_command_request.h"
5861
#include "mongo/s/write_ops/batched_command_response.h"
@@ -403,6 +406,94 @@ namespace {
403406
return updateDatabase(dbName, db);
404407
}
405408

409+
Status CatalogManagerLegacy::shardCollection(const string& ns,
410+
const ShardKeyPattern& fieldsAndOrder,
411+
bool unique,
412+
vector<BSONObj>* initPoints,
413+
vector<Shard>* initShards) {
414+
415+
StatusWith<DatabaseType> status = getDatabase(nsToDatabase(ns));
416+
if (!status.isOK()) {
417+
return status.getStatus();
418+
}
419+
DatabaseType dbt = status.getValue();
420+
Shard dbPrimary = Shard::make(dbt.getPrimary());
421+
422+
log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder;
423+
424+
// Record start in changelog
425+
BSONObjBuilder collectionDetail;
426+
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
427+
collectionDetail.append("collection", ns);
428+
collectionDetail.append("primary", dbPrimary.toString());
429+
430+
BSONArray initialShards;
431+
if (initShards == NULL)
432+
initialShards = BSONArray();
433+
else {
434+
BSONArrayBuilder b;
435+
for (unsigned i = 0; i < initShards->size(); i++) {
436+
b.append((*initShards)[i].getName());
437+
}
438+
initialShards = b.arr();
439+
}
440+
441+
collectionDetail.append("initShards", initialShards);
442+
collectionDetail.append("numChunks", static_cast<int>(initPoints->size() + 1));
443+
444+
logChange(NULL, "shardCollection.start", ns, collectionDetail.obj());
445+
446+
ChunkManagerPtr manager(new ChunkManager(ns, fieldsAndOrder, unique));
447+
manager->createFirstChunks(_configServerConnectionString.toString(),
448+
dbPrimary,
449+
initPoints,
450+
initShards);
451+
manager->loadExistingRanges(_configServerConnectionString.toString(), NULL);
452+
453+
CollectionInfo collInfo;
454+
collInfo.useChunkManager(manager);
455+
collInfo.save(ns);
456+
manager->reload(true);
457+
458+
// Tell the primary mongod to refresh its data
459+
// TODO: Think the real fix here is for mongos to just
460+
// assume that all collections are sharded, when we get there
461+
for (int i = 0;i < 4;i++) {
462+
if (i == 3) {
463+
warning() << "too many tries updating initial version of " << ns
464+
<< " on shard primary " << dbPrimary
465+
<< ", other mongoses may not see the collection as sharded immediately";
466+
break;
467+
}
468+
try {
469+
ShardConnection conn(dbPrimary, ns);
470+
bool isVersionSet = conn.setVersion();
471+
conn.done();
472+
if (!isVersionSet) {
473+
warning() << "could not update initial version of "
474+
<< ns << " on shard primary " << dbPrimary;
475+
} else {
476+
break;
477+
}
478+
}
479+
catch (const DBException& e) {
480+
warning() << "could not update initial version of " << ns
481+
<< " on shard primary " << dbPrimary
482+
<< causedBy(e);
483+
}
484+
sleepsecs(i);
485+
}
486+
487+
// Record finish in changelog
488+
BSONObjBuilder finishDetail;
489+
490+
finishDetail.append("version", manager->getVersion().toString());
491+
492+
logChange(NULL, "shardCollection", ns, finishDetail.obj());
493+
494+
return Status::OK();
495+
}
496+
406497
Status CatalogManagerLegacy::createDatabase(const std::string& dbName, const Shard* shard) {
407498
invariant(nsIsDbOnly(dbName));
408499

@@ -559,7 +650,7 @@ namespace {
559650
}
560651

561652
StatusWith<ShardDrainingStatus> CatalogManagerLegacy::removeShard(OperationContext* txn,
562-
const std::string& name) {
653+
const std::string& name) {
563654
ScopedDbConnection conn(_configServerConnectionString, 30);
564655

565656
if (conn->count(ShardType::ConfigNS,

src/mongo/s/catalog/legacy/catalog_manager_legacy.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ namespace mongo {
5353

5454
virtual Status enableSharding(const std::string& dbName);
5555

56+
virtual Status shardCollection(const std::string& ns,
57+
const ShardKeyPattern& fieldsAndOrder,
58+
bool unique,
59+
std::vector<BSONObj>* initPoints,
60+
std::vector<Shard>* initShards);
61+
5662
virtual StatusWith<std::string> addShard(const std::string& name,
5763
const ConnectionString& shardConnectionString,
5864
const long long maxSize);

src/mongo/s/commands/cluster_shard_collection_cmd.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "mongo/db/hasher.h"
4747
#include "mongo/db/write_concern_options.h"
4848
#include "mongo/s/catalog/catalog_cache.h"
49+
#include "mongo/s/catalog/catalog_manager.h"
4950
#include "mongo/s/chunk_manager.h"
5051
#include "mongo/s/cluster_write.h"
5152
#include "mongo/s/grid.h"
@@ -406,7 +407,13 @@ namespace {
406407
proposedKey,
407408
careAboutUnique);
408409

409-
config->shardCollection(ns, proposedShardKey, careAboutUnique, &initSplits);
410+
Status status = grid.catalogManager()->shardCollection(ns,
411+
proposedShardKey,
412+
careAboutUnique,
413+
&initSplits);
414+
if (!status.isOK()) {
415+
return appendCommandStatus(result, status);
416+
}
410417

411418
result << "collectionsharded" << ns;
412419

src/mongo/s/commands_public.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1857,11 +1857,14 @@ namespace {
18571857
vector<Shard> outShards( shardSet.begin() , shardSet.end() );
18581858

18591859
ShardKeyPattern sortKeyPattern(sortKey);
1860-
confOut->shardCollection(finalColLong,
1861-
sortKeyPattern,
1862-
true,
1863-
&sortedSplitPts,
1864-
&outShards);
1860+
Status status = grid.catalogManager()->shardCollection(finalColLong,
1861+
sortKeyPattern,
1862+
true,
1863+
&sortedSplitPts,
1864+
&outShards);
1865+
if (!status.isOK()) {
1866+
return appendCommandStatus(result, status);
1867+
}
18651868

18661869
}
18671870

src/mongo/s/config.cpp

Lines changed: 16 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ namespace mongo {
7676
Shard Shard::EMPTY;
7777

7878

79-
DBConfig::CollectionInfo::CollectionInfo( const BSONObj& in ) {
79+
CollectionInfo::CollectionInfo(const BSONObj& in) {
8080
_dirty = false;
8181
_dropped = in[CollectionType::dropped()].trueValue();
8282

@@ -87,18 +87,18 @@ namespace mongo {
8787
_dirty = false;
8888
}
8989

90-
DBConfig::CollectionInfo::~CollectionInfo() {
90+
CollectionInfo::~CollectionInfo() {
9191

9292
}
9393

94-
void DBConfig::CollectionInfo::resetCM(ChunkManager* cm) {
94+
void CollectionInfo::resetCM(ChunkManager* cm) {
9595
invariant(cm);
9696
invariant(_cm);
9797

9898
_cm.reset(cm);
9999
}
100100

101-
void DBConfig::CollectionInfo::shard(ChunkManager* manager){
101+
void CollectionInfo::shard(ChunkManager* manager) {
102102
// Do this *first* so we're invisible to everyone else
103103
manager->loadExistingRanges(configServer.getPrimary().getConnString(), NULL);
104104

@@ -107,12 +107,8 @@ namespace mongo {
107107
// This helps prevent errors when dropping in a different process
108108
//
109109

110-
if (manager->numChunks() != 0){
111-
_cm = ChunkManagerPtr(manager);
112-
_key = manager->getShardKeyPattern().toBSON().getOwned();
113-
_unqiue = manager->isUnique();
114-
_dirty = true;
115-
_dropped = false;
110+
if (manager->numChunks() != 0) {
111+
useChunkManager(ChunkManagerPtr(manager));
116112
}
117113
else{
118114
warning() << "no chunks found for collection " << manager->getns()
@@ -121,14 +117,22 @@ namespace mongo {
121117
}
122118
}
123119

124-
void DBConfig::CollectionInfo::unshard() {
120+
void CollectionInfo::unshard() {
125121
_cm.reset();
126122
_dropped = true;
127123
_dirty = true;
128124
_key = BSONObj();
129125
}
130126

131-
void DBConfig::CollectionInfo::save( const string& ns ) {
127+
void CollectionInfo::useChunkManager(ChunkManagerPtr manager) {
128+
_cm = manager;
129+
_key = manager->getShardKeyPattern().toBSON().getOwned();
130+
_unique = manager->isUnique();
131+
_dirty = true;
132+
_dropped = false;
133+
}
134+
135+
void CollectionInfo::save(const string& ns) {
132136
BSONObj key = BSON( "_id" << ns );
133137

134138
BSONObjBuilder val;
@@ -218,97 +222,6 @@ namespace mongo {
218222
if( save ) _save();
219223
}
220224

221-
boost::shared_ptr<ChunkManager> DBConfig::shardCollection(
222-
const string& ns,
223-
const ShardKeyPattern& fieldsAndOrder,
224-
bool unique,
225-
vector<BSONObj>* initPoints,
226-
vector<Shard>* initShards) {
227-
228-
uassert(8042, "db doesn't have sharding enabled", _shardingEnabled);
229-
230-
ChunkManagerPtr manager;
231-
232-
{
233-
boost::lock_guard<boost::mutex> lk( _lock );
234-
235-
CollectionInfo& ci = _collections[ns];
236-
uassert( 8043 , "collection already sharded" , ! ci.isSharded() );
237-
238-
log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl;
239-
240-
// Record start in changelog
241-
BSONObjBuilder collectionDetail;
242-
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
243-
collectionDetail.append("collection", ns);
244-
collectionDetail.append("primary", getPrimary().toString());
245-
246-
BSONArray initialShards;
247-
if (initShards == NULL)
248-
initialShards = BSONArray();
249-
else {
250-
BSONArrayBuilder b;
251-
for (unsigned i = 0; i < initShards->size(); i++) {
252-
b.append((*initShards)[i].getName());
253-
}
254-
initialShards = b.arr();
255-
}
256-
257-
collectionDetail.append("initShards", initialShards);
258-
collectionDetail.append("numChunks", (int)(initPoints->size() + 1));
259-
260-
grid.catalogManager()->logChange(NULL,
261-
"shardCollection.start",
262-
ns,
263-
collectionDetail.obj());
264-
265-
ChunkManager* cm = new ChunkManager( ns, fieldsAndOrder, unique );
266-
cm->createFirstChunks(configServer.getPrimary().getConnString(),
267-
getPrimary(),
268-
initPoints,
269-
initShards);
270-
ci.shard(cm);
271-
272-
_save();
273-
274-
// Save the initial chunk manager for later, no need to reload if we're in this lock
275-
manager = ci.getCM();
276-
verify( manager.get() );
277-
}
278-
279-
// Tell the primary mongod to refresh it's data
280-
// TODO: Think the real fix here is for mongos to just assume all collections sharded, when we get there
281-
for( int i = 0; i < 4; i++ ){
282-
if( i == 3 ){
283-
warning() << "too many tries updating initial version of " << ns << " on shard primary " << getPrimary() <<
284-
", other mongoses may not see the collection as sharded immediately" << endl;
285-
break;
286-
}
287-
try {
288-
ShardConnection conn( getPrimary(), ns );
289-
if (!conn.setVersion()) {
290-
warning() << "could not update initial version of "
291-
<< ns << " on shard primary " << getPrimary();
292-
}
293-
conn.done();
294-
break;
295-
}
296-
catch( DBException& e ){
297-
warning() << "could not update initial version of " << ns << " on shard primary " << getPrimary() <<
298-
causedBy( e ) << endl;
299-
}
300-
sleepsecs( i );
301-
}
302-
303-
// Record finish in changelog
304-
BSONObjBuilder finishDetail;
305-
finishDetail.append("version", manager->getVersion().toString());
306-
307-
grid.catalogManager()->logChange(NULL, "shardCollection", ns, finishDetail.obj());
308-
309-
return manager;
310-
}
311-
312225
bool DBConfig::removeSharding( const string& ns ) {
313226
if ( ! _shardingEnabled ) {
314227

0 commit comments

Comments
 (0)