Skip to content

Commit 767e041

Browse files
committed
SERVER-29609 Enable updateLookup for sharded change streams.
1 parent 45d35fe commit 767e041

15 files changed

+859
-244
lines changed

buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ selector:
3030
# New feature in v3.6 mongos and mongod.
3131
- jstests/sharding/advance_logical_time_with_valid_signature.js
3232
- jstests/sharding/after_cluster_time.js
33+
- jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
34+
- jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
35+
- jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
3336
- jstests/sharding/change_stream_invalidation.js
3437
- jstests/sharding/change_stream_remove_shard.js
3538
- jstests/sharding/change_streams.js

jstests/sharding/change_streams.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292

9393
// Test that using change streams with any stages not allowed to run on mongos results in an
9494
// error.
95-
assertErrorCode(mongosColl, [{$changeStream: {fullDocument: "updateLookup"}}], 40470);
9695
assertErrorCode(
9796
mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
9897

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Tests the behavior of looking up the post image for change streams on collections which are
2+
// sharded with a compound shard key.
3+
(function() {
4+
"use strict";
5+
6+
// For supportsMajorityReadConcern().
7+
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
8+
9+
if (!supportsMajorityReadConcern()) {
10+
jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
11+
return;
12+
}
13+
14+
const st = new ShardingTest({
15+
shards: 2,
16+
rs: {
17+
nodes: 1,
18+
enableMajorityReadConcern: '',
19+
// Use a higher frequency for periodic noops to speed up the test.
20+
setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
21+
}
22+
});
23+
24+
const mongosDB = st.s0.getDB(jsTestName());
25+
const mongosColl = mongosDB[jsTestName()];
26+
27+
assert.commandWorked(mongosDB.dropDatabase());
28+
29+
// Enable sharding on the test DB and ensure its primary is shard0000.
30+
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
31+
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
32+
33+
// Shard the test collection with a compound shard key: a, b, c. Then split it into two chunks,
34+
// and put one chunk on each shard.
35+
assert.commandWorked(mongosDB.adminCommand(
36+
{shardCollection: mongosColl.getFullName(), key: {a: 1, b: 1, c: 1}}));
37+
38+
// Split the collection into 2 chunks:
39+
// [{a: MinKey, b: MinKey, c: MinKey}, {a: 1, b: MinKey, c: MinKey})
40+
// and
41+
// [{a: 1, b: MinKey, c: MinKey}, {a: MaxKey, b: MaxKey, c: MaxKey}).
42+
assert.commandWorked(mongosDB.adminCommand(
43+
{split: mongosColl.getFullName(), middle: {a: 1, b: MinKey, c: MinKey}}));
44+
45+
// Move the upper chunk to shard 1.
46+
assert.commandWorked(mongosDB.adminCommand({
47+
moveChunk: mongosColl.getFullName(),
48+
find: {a: 1, b: MinKey, c: MinKey},
49+
to: st.rs1.getURL()
50+
}));
51+
52+
const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
53+
54+
const nDocs = 6;
55+
const bValues = ["one", "two", "three", "four", "five", "six"];
56+
57+
// This shard key function results in 1/3rd of documents on shard0 and 2/3rds on shard1.
58+
function shardKeyFromId(id) {
59+
return {a: id % 3, b: bValues[id], c: id % 2};
60+
}
61+
62+
// Do some writes.
63+
for (let id = 0; id < nDocs; ++id) {
64+
const documentKey = Object.merge({_id: id}, shardKeyFromId(id));
65+
assert.writeOK(mongosColl.insert(documentKey));
66+
assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 1}}));
67+
}
68+
69+
for (let id = 0; id < nDocs; ++id) {
70+
assert.soon(() => changeStream.hasNext());
71+
let next = changeStream.next();
72+
assert.eq(next.operationType, "insert");
73+
assert.eq(next.documentKey, {_id: id});
74+
75+
assert.soon(() => changeStream.hasNext());
76+
next = changeStream.next();
77+
assert.eq(next.operationType, "update");
78+
assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
79+
assert.docEq(next.fullDocument,
80+
Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 1}));
81+
}
82+
83+
// Test that the change stream can still see the updated post image, even if a chunk is
84+
// migrated.
85+
for (let id = 0; id < nDocs; ++id) {
86+
const documentKey = Object.merge({_id: id}, shardKeyFromId(id));
87+
assert.writeOK(mongosColl.update(documentKey, {$set: {updatedCount: 2}}));
88+
}
89+
90+
// Move the upper chunk back to shard 0.
91+
assert.commandWorked(mongosDB.adminCommand({
92+
moveChunk: mongosColl.getFullName(),
93+
find: {a: 1, b: MinKey, c: MinKey},
94+
to: st.rs0.getURL()
95+
}));
96+
97+
for (let id = 0; id < nDocs; ++id) {
98+
assert.soon(() => changeStream.hasNext());
99+
let next = changeStream.next();
100+
assert.eq(next.operationType, "update");
101+
assert.eq(next.documentKey, Object.merge(shardKeyFromId(id), {_id: id}));
102+
assert.docEq(next.fullDocument,
103+
Object.merge(shardKeyFromId(id), {_id: id, updatedCount: 2}));
104+
}
105+
106+
st.stop();
107+
})();
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Tests the behavior of looking up the post image for change streams on collections which are
2+
// sharded with a hashed shard key.
3+
(function() {
4+
"use strict";
5+
6+
// For supportsMajorityReadConcern().
7+
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
8+
9+
if (!supportsMajorityReadConcern()) {
10+
jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
11+
return;
12+
}
13+
14+
const st = new ShardingTest({
15+
shards: 2,
16+
enableBalancer: false,
17+
rs: {
18+
nodes: 1,
19+
enableMajorityReadConcern: '',
20+
// Use a higher frequency for periodic noops to speed up the test.
21+
setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
22+
}
23+
});
24+
25+
const mongosDB = st.s0.getDB(jsTestName());
26+
const mongosColl = mongosDB[jsTestName()];
27+
28+
assert.commandWorked(mongosDB.dropDatabase());
29+
30+
// Enable sharding on the test DB and ensure its primary is shard0000.
31+
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
32+
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
33+
34+
// Shard the test collection on the field "shardKey", and split it into two chunks.
35+
assert.commandWorked(mongosDB.adminCommand({
36+
shardCollection: mongosColl.getFullName(),
37+
numInitialChunks: 2,
38+
key: {shardKey: "hashed"}
39+
}));
40+
41+
// Make sure the negative chunk is on shard 0.
42+
assert.commandWorked(mongosDB.adminCommand({
43+
moveChunk: mongosColl.getFullName(),
44+
bounds: [{shardKey: MinKey}, {shardKey: NumberLong("0")}],
45+
to: st.rs0.getURL()
46+
}));
47+
48+
// Make sure the positive chunk is on shard 1.
49+
assert.commandWorked(mongosDB.adminCommand({
50+
moveChunk: mongosColl.getFullName(),
51+
bounds: [{shardKey: NumberLong("0")}, {shardKey: MaxKey}],
52+
to: st.rs1.getURL()
53+
}));
54+
55+
const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
56+
57+
// Write enough documents that we likely have some on each shard.
58+
const nDocs = 1000;
59+
for (let id = 0; id < nDocs; ++id) {
60+
assert.writeOK(mongosColl.insert({_id: id, shardKey: id}));
61+
assert.writeOK(mongosColl.update({shardKey: id}, {$set: {updatedCount: 1}}));
62+
}
63+
64+
for (let id = 0; id < nDocs; ++id) {
65+
assert.soon(() => changeStream.hasNext());
66+
let next = changeStream.next();
67+
assert.eq(next.operationType, "insert");
68+
// TODO SERVER-30599 this documentKey should contain the shard key.
69+
assert.eq(next.documentKey, {_id: id});
70+
71+
assert.soon(() => changeStream.hasNext());
72+
next = changeStream.next();
73+
assert.eq(next.operationType, "update");
74+
assert.eq(next.documentKey, {shardKey: id, _id: id});
75+
assert.docEq(next.fullDocument, {_id: id, shardKey: id, updatedCount: 1});
76+
}
77+
78+
st.stop();
79+
})();
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Tests the behavior of looking up the post image for change streams on collections which are
2+
// sharded with a key which is just the "_id" field.
3+
(function() {
4+
"use strict";
5+
6+
// For supportsMajorityReadConcern().
7+
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
8+
9+
if (!supportsMajorityReadConcern()) {
10+
jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
11+
return;
12+
}
13+
14+
const st = new ShardingTest({
15+
shards: 2,
16+
rs: {
17+
nodes: 1,
18+
enableMajorityReadConcern: '',
19+
// Use a higher frequency for periodic noops to speed up the test.
20+
setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
21+
}
22+
});
23+
24+
const mongosDB = st.s0.getDB(jsTestName());
25+
const mongosColl = mongosDB[jsTestName()];
26+
27+
assert.commandWorked(mongosDB.dropDatabase());
28+
29+
// Enable sharding on the test DB and ensure its primary is shard0000.
30+
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
31+
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
32+
33+
// Shard the test collection on _id.
34+
assert.commandWorked(
35+
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
36+
37+
// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
38+
assert.commandWorked(
39+
mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
40+
41+
// Move the [0, MaxKey) chunk to shard0001.
42+
assert.commandWorked(mongosDB.adminCommand(
43+
{moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
44+
45+
// Write a document to each chunk.
46+
assert.writeOK(mongosColl.insert({_id: -1}));
47+
assert.writeOK(mongosColl.insert({_id: 1}));
48+
49+
const changeStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}]);
50+
51+
// Do some writes.
52+
assert.writeOK(mongosColl.insert({_id: 1000}));
53+
assert.writeOK(mongosColl.insert({_id: -1000}));
54+
assert.writeOK(mongosColl.update({_id: 1000}, {$set: {updatedCount: 1}}));
55+
assert.writeOK(mongosColl.update({_id: -1000}, {$set: {updatedCount: 1}}));
56+
57+
for (let nextId of[1000, -1000]) {
58+
assert.soon(() => changeStream.hasNext());
59+
let next = changeStream.next();
60+
assert.eq(next.operationType, "insert");
61+
assert.eq(next.documentKey, {_id: nextId});
62+
}
63+
64+
for (let nextId of[1000, -1000]) {
65+
assert.soon(() => changeStream.hasNext());
66+
let next = changeStream.next();
67+
assert.eq(next.operationType, "update");
68+
// Only the "_id" field is present in next.documentKey because the shard key is the _id.
69+
assert.eq(next.documentKey, {_id: nextId});
70+
assert.docEq(next.fullDocument, {_id: nextId, updatedCount: 1});
71+
}
72+
73+
// Test that the change stream can still see the updated post image, even if a chunk is
74+
// migrated.
75+
assert.writeOK(mongosColl.update({_id: 1000}, {$set: {updatedCount: 2}}));
76+
assert.writeOK(mongosColl.update({_id: -1000}, {$set: {updatedCount: 2}}));
77+
78+
// Split the [0, MaxKey) chunk into 2: [0, 500), [500, MaxKey).
79+
assert.commandWorked(
80+
mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 500}}));
81+
// Move the [500, MaxKey) chunk back to shard0000.
82+
assert.commandWorked(mongosDB.adminCommand(
83+
{moveChunk: mongosColl.getFullName(), find: {_id: 1000}, to: st.rs0.getURL()}));
84+
85+
for (let nextId of[1000, -1000]) {
86+
assert.soon(() => changeStream.hasNext());
87+
let next = changeStream.next();
88+
assert.eq(next.operationType, "update");
89+
assert.eq(next.documentKey, {_id: nextId});
90+
assert.docEq(next.fullDocument, {_id: nextId, updatedCount: 2});
91+
}
92+
93+
st.stop();
94+
})();

src/mongo/db/pipeline/document_source_change_stream.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,12 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
351351
// There should only be one close cursor stage. If we're on the shards and producing input
352352
// to be merged, do not add a close cursor stage, since the mongos will already have one.
353353
stages.push_back(DocumentSourceCloseCursor::create(expCtx));
354-
}
355-
if (shouldLookupPostImage) {
356-
uassert(
357-
40470,
358-
str::stream() << "looking up the full document after an update is not yet supported on "
359-
"sharded collections",
360-
!expCtx->inMongos);
361-
stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
354+
355+
// There should be only one post-image lookup stage. If we're on the shards and producing
356+
// input to be merged, the lookup is done on the mongos.
357+
if (shouldLookupPostImage) {
358+
stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
359+
}
362360
}
363361
return stages;
364362
}

src/mongo/db/pipeline/document_source_cursor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ void DocumentSourceCursor::loadBatch() {
106106
// Furthermore, if we need to return the latest oplog time (in the tailable and
107107
// needs-merge case), batching will result in a wrong time.
108108
if (shouldWaitForInserts(pExpCtx->opCtx) ||
109-
(pExpCtx->isTailable() && pExpCtx->needsMerge) ||
109+
(pExpCtx->isTailableAwaitData() && pExpCtx->needsMerge) ||
110110
memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
111111
// End this batch and prepare PlanExecutor for yielding.
112112
_exec->saveState();
@@ -115,7 +115,7 @@ void DocumentSourceCursor::loadBatch() {
115115
}
116116
// Special case for tailable cursor -- EOF doesn't preclude more results, so keep
117117
// the PlanExecutor alive.
118-
if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
118+
if (state == PlanExecutor::IS_EOF && pExpCtx->isTailableAwaitData()) {
119119
_exec->saveState();
120120
return;
121121
}

src/mongo/db/pipeline/document_source_lookup_change_post_image.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,12 @@ class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMong
6262
}
6363

6464
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
65+
invariant(pipeState != Pipeline::SplitState::kSplitForShards);
6566
StageConstraints constraints(StreamType::kStreaming,
6667
PositionRequirement::kNone,
67-
HostTypeRequirement::kAnyShard,
68+
pipeState == Pipeline::SplitState::kUnsplit
69+
? HostTypeRequirement::kNone
70+
: HostTypeRequirement::kMongoS,
6871
DiskUseRequirement::kNoDiskUse,
6972
FacetRequirement::kNotAllowed,
7073
ChangeStreamRequirement::kChangeStreamStage);

src/mongo/db/pipeline/expression_context.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,10 @@ class ExpressionContext : public RefCountable {
113113
};
114114

115115
/**
116-
* Convenience call that returns true if the tailableMode indicate a tailable query.
116+
* Convenience call that returns true if the tailableMode indicates a tailable and awaitData
117+
* query.
117118
*/
118-
bool isTailable() const {
119+
bool isTailableAwaitData() const {
119120
return tailableMode == TailableMode::kTailableAndAwaitData;
120121
}
121122

0 commit comments

Comments
 (0)