Skip to content

Commit 5593fd8

Browse files
jseysterEvergreen Agent
authored andcommitted
SERVER-50769 Change streams no longer balk at empty applyOps
(cherry picked from commit e9122ba)
1 parent 425a5ff commit 5593fd8

File tree

6 files changed

+411
-4
lines changed

6 files changed

+411
-4
lines changed

buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ selector:
5353
- jstests/sharding/agg_explain_fmt.js
5454
- jstests/sharding/change_stream_metadata_notifications.js
5555
- jstests/sharding/change_stream_transaction_sharded.js
56+
- jstests/sharding/change_stream_empty_apply_ops.js
5657
- jstests/sharding/change_streams.js
5758
- jstests/sharding/collation_lookup.js
5859
- jstests/sharding/collation_targeting.js

etc/backports_required_for_multiversion_tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ all:
8888
test_file: jstests/replsets/read_operations_during_step_down.js
8989
- ticket: SERVER-50417
9090
test_file: jstests/replsets/read_operations_during_step_up.js
91+
- ticket: SERVER-50769
92+
test_file: jstests/sharding/change_stream_empty_apply_ops.js
9193

9294
# Tests that should only be excluded from particular suites should be listed under that suite.
9395
suites:
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Confirms that change streams correctly handle prepared transactions with an empty applyOps entry.
2+
// This test creats a multi-shard transaction in which one of the participating shards has only a
3+
// no-op write, resulting in the empty applyOps scenario we wish to test. Exercises the fix for
4+
// SERVER-50769.
5+
// @tags: [
6+
// requires_sharding,
7+
// uses_change_streams,
8+
// uses_multi_shard_transaction,
9+
// uses_transactions,
10+
// ]
11+
(function() {
12+
"use strict";
13+
14+
const dbName = "test";
15+
const collName = "change_stream_empty_apply_ops";
16+
const namespace = dbName + "." + collName;
17+
18+
const st = new ShardingTest({
19+
shards: 2,
20+
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
21+
});
22+
23+
const mongosConn = st.s;
24+
const db = mongosConn.getDB(dbName);
25+
const coll = db.getCollection(collName);
26+
27+
assert.commandWorked(coll.createIndex({shard: 1}));
28+
st.ensurePrimaryShard(dbName, st.shard0.shardName);
29+
// Shard the test collection and split it into two chunks: one that contains all {shard: 1}
30+
// documents and one that contains all {shard: 2} documents.
31+
st.shardColl(collName,
32+
{shard: 1} /* shard key */,
33+
{shard: 2} /* split at */,
34+
{shard: 2} /* move the chunk containing {shard: 2} to its own shard */,
35+
dbName,
36+
true);
37+
// Seed each chunk with an initial document.
38+
assert.commandWorked(coll.insert({shard: 1}, {writeConcern: {w: "majority"}}));
39+
assert.commandWorked(coll.insert({shard: 2}, {writeConcern: {w: "majority"}}));
40+
41+
// Open up change streams.
42+
const changeStreamCursorColl = coll.watch();
43+
const changeStreamCursorDB = db.watch();
44+
const changeStreamCursorCluster = mongosConn.watch();
45+
46+
// Start a transaction, which will include both shards.
47+
const sesion = db.getMongo().startSession({causalConsistency: true});
48+
const sessionDb = sesion.getDatabase(dbName);
49+
const sessionColl = sessionDb[collName];
50+
51+
sesion.startTransaction({readConcern: {level: "majority"}});
52+
53+
// This no-op will make one of the shards a transaction participant without generating an actual
54+
// write. The transaction will send an empty prepared transaction to the shard, in the form of an
55+
// applyOps command with no operations.
56+
sessionColl.findAndModify({query: {shard: 1}, update: {$setOnInsert: {a: 1}}});
57+
58+
// This write, which is not a no-op, occurs on the other shard.
59+
sessionColl.findAndModify({query: {shard: 2}, update: {$set: {a: 1}}});
60+
61+
assert.commandWorked(sesion.commitTransaction_forTesting());
62+
63+
// Each change stream should see exactly one update, resulting from the valid write on shard 2.
64+
[changeStreamCursorColl, changeStreamCursorDB, changeStreamCursorCluster].forEach(function(
65+
changeStreamCursor) {
66+
assert.soon(() => changeStreamCursor.hasNext());
67+
const changeDoc = changeStreamCursor.next();
68+
assert.eq(changeDoc.documentKey.shard, 2);
69+
assert.eq(changeDoc.operationType, "update");
70+
71+
assert(!changeStreamCursor.hasNext());
72+
});
73+
74+
st.stop();
75+
})();

src/mongo/db/pipeline/document_source_change_stream.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,10 @@ BSONObj getTxnApplyOpsFilter(BSONElement nsMatch, const NamespaceString& nss) {
177177
BSONObjBuilder applyOpsBuilder;
178178
applyOpsBuilder.append("op", "c");
179179

180-
// "o.applyOps" must be an array with at least one element
181-
applyOpsBuilder.append("o.applyOps.0", BSON("$exists" << true));
180+
// "o.applyOps" stores the list of operations, so it must be an array.
181+
applyOpsBuilder.append("o.applyOps",
182+
BSON("$type"
183+
<< "array"));
182184
applyOpsBuilder.append("lsid", BSON("$exists" << true));
183185
applyOpsBuilder.append("txnNumber", BSON("$exists" << true));
184186
applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true)));

0 commit comments

Comments
 (0)