Skip to content

Commit 0686b1e

Browse files
committed
SERVER-29131 Support resumeAfter option to control where to start returning notifications from, which always errors if no entry with the given resumeToken exists
1 parent 07d4d94 commit 0686b1e

16 files changed

+950
-125
lines changed

jstests/aggregation/sources/changeNotification/change_notification.js

Lines changed: 167 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,39 @@
33
"use strict";
44

55
const oplogProjection = {$project: {"_id.ts": 0}};
6+
function getCollectionNameFromFullNamespace(ns) {
7+
return ns.split(/\.(.+)/)[1];
8+
}
69

7-
/**
8-
* Tests the output of a $changeNotification stage, asserting only that the result at the end of
9-
* the change stream on the collection 'collection' (the newest matching entry in the oplog) is
10-
* equal to 'expectedResult'.
11-
*
12-
* Note this change assumes that the set of changes will fit within one batch.
13-
*/
14-
function checkLatestChange(expectedResult, collection) {
15-
const cmdResponse = assert.commandWorked(db.runCommand({
16-
aggregate: collection.getName(),
17-
pipeline: [
18-
{$changeNotification: {}},
19-
// Strip the oplog fields we aren't testing.
20-
{$project: {"_id.ts": 0}}
21-
],
22-
cursor: {}
23-
}));
24-
const firstBatch = cmdResponse.cursor.firstBatch;
25-
assert.neq(firstBatch.length, 0);
26-
assert.docEq(firstBatch[firstBatch.length - 1], expectedResult);
10+
// Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges
11+
// with the pipeline, then insert the changes, then run assertNextBatchMatches with the result
12+
// of startWatchingChanges and the expected set of results.
13+
function startWatchingChanges(pipeline, collection) {
14+
// Strip the oplog fields we aren't testing.
15+
pipeline.push(oplogProjection);
16+
// Waiting for replication assures no previous operations will be included.
17+
replTest.awaitReplication();
18+
let res = assert.commandWorked(
19+
db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}}));
20+
assert.neq(res.cursor.id, 0);
21+
return res.cursor;
2722
}
2823

29-
/**
30-
* Tests that there are no changes in the 'collection'.
31-
*/
32-
function assertNoLatestChange(collection) {
33-
const cmdResponse = assert.commandWorked(db.runCommand({
34-
aggregate: collection.getName(),
35-
pipeline: [
36-
{$changeNotification: {}},
37-
],
38-
cursor: {}
24+
function assertNextBatchMatches({cursor, expectedBatch}) {
25+
replTest.awaitReplication();
26+
if (expectedBatch.length == 0)
27+
assert.commandWorked(db.adminCommand(
28+
{configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
29+
let res = assert.commandWorked(db.runCommand({
30+
getMore: cursor.id,
31+
collection: getCollectionNameFromFullNamespace(cursor.ns),
32+
maxTimeMS: 5 * 60 * 1000,
33+
batchSize: (expectedBatch.length + 1)
3934
}));
40-
assert.eq(cmdResponse.cursor.firstBatch.length, 0);
35+
if (expectedBatch.length == 0)
36+
assert.commandWorked(db.adminCommand(
37+
{configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
38+
assert.docEq(res.cursor.nextBatch, expectedBatch);
4139
}
4240

4341
let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
@@ -49,6 +47,7 @@
4947
db.getMongo().forceReadMode('commands');
5048

5149
jsTestLog("Testing single insert");
50+
let cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
5251
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
5352
let expected = {
5453
_id: {
@@ -60,9 +59,10 @@
6059
ns: {coll: "t1", db: "test"},
6160
operationType: "insert",
6261
};
63-
checkLatestChange(expected, db.t1);
62+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
6463

6564
jsTestLog("Testing second insert");
65+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
6666
assert.writeOK(db.t1.insert({_id: 1, a: 2}));
6767
expected = {
6868
_id: {
@@ -74,9 +74,10 @@
7474
ns: {coll: "t1", db: "test"},
7575
operationType: "insert",
7676
};
77-
checkLatestChange(expected, db.t1);
77+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
7878

7979
jsTestLog("Testing update");
80+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
8081
assert.writeOK(db.t1.update({_id: 0}, {a: 3}));
8182
expected = {
8283
_id: {_id: 0, ns: "test.t1"},
@@ -85,9 +86,10 @@
8586
ns: {coll: "t1", db: "test"},
8687
operationType: "replace",
8788
};
88-
checkLatestChange(expected, db.t1);
89+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
8990

9091
jsTestLog("Testing update of another field");
92+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
9193
assert.writeOK(db.t1.update({_id: 0}, {b: 3}));
9294
expected = {
9395
_id: {_id: 0, ns: "test.t1"},
@@ -96,9 +98,10 @@
9698
ns: {coll: "t1", db: "test"},
9799
operationType: "replace",
98100
};
99-
checkLatestChange(expected, db.t1);
101+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
100102

101103
jsTestLog("Testing upsert");
104+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
102105
assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true}));
103106
expected = {
104107
_id: {
@@ -110,10 +113,11 @@
110113
ns: {coll: "t1", db: "test"},
111114
operationType: "insert",
112115
};
113-
checkLatestChange(expected, db.t1);
116+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
114117

115118
jsTestLog("Testing partial update with $inc");
116119
assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1}));
120+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
117121
assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}}));
118122
expected = {
119123
_id: {_id: 3, ns: "test.t1"},
@@ -123,9 +127,10 @@
123127
operationType: "update",
124128
updateDescription: {removedFields: [], updatedFields: {b: 3}},
125129
};
126-
checkLatestChange(expected, db.t1);
130+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
127131

128132
jsTestLog("Testing delete");
133+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
129134
assert.writeOK(db.t1.remove({_id: 1}));
130135
expected = {
131136
_id: {_id: 1, ns: "test.t1"},
@@ -134,11 +139,13 @@
134139
ns: {coll: "t1", db: "test"},
135140
operationType: "delete",
136141
};
137-
checkLatestChange(expected, db.t1);
142+
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
138143

139144
jsTestLog("Testing intervening write on another collection");
145+
cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
146+
let t2cursor = startWatchingChanges([{$changeNotification: {}}], db.t2);
140147
assert.writeOK(db.t2.insert({_id: 100, c: 1}));
141-
checkLatestChange(expected, db.t1);
148+
assertNextBatchMatches({cursor: cursor, expectedBatch: []});
142149
expected = {
143150
_id: {
144151
_id: 100,
@@ -149,21 +156,24 @@
149156
ns: {coll: "t2", db: "test"},
150157
operationType: "insert",
151158
};
152-
checkLatestChange(expected, db.t2);
159+
assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
153160

154161
jsTestLog("Testing rename");
162+
t2cursor = startWatchingChanges([{$changeNotification: {}}], db.t2);
155163
assert.writeOK(db.t2.renameCollection("t3"));
156164
expected = {_id: {ns: "test.$cmd"}, operationType: "invalidate", fullDocument: null};
157-
checkLatestChange(expected, db.t2);
165+
assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
158166

159167
jsTestLog("Testing insert that looks like rename");
168+
const dne1cursor = startWatchingChanges([{$changeNotification: {}}], db.dne1);
169+
const dne2cursor = startWatchingChanges([{$changeNotification: {}}], db.dne2);
160170
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
161-
assertNoLatestChange(db.dne1);
162-
assertNoLatestChange(db.dne2);
171+
assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []});
172+
assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []});
163173

164174
// Now make sure the cursor behaves like a tailable awaitData cursor.
165175
jsTestLog("Testing tailability");
166-
let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
176+
const tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
167177
assert(!tailableCursor.hasNext());
168178
assert.writeOK(db.tailable1.insert({_id: 101, a: 1}));
169179
assert(tailableCursor.hasNext());
@@ -192,9 +202,11 @@
192202
// Initial batch size should be zero as there should be no data.
193203
assert.eq(aggcursor.firstBatch.length, 0);
194204

195-
// No data, so should return no results, but cursor should remain valid.
205+
// No data, so should return no results, but cursor should remain valid. Note we are
206+
// specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the
207+
// wait.
196208
res = assert.commandWorked(
197-
db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50}));
209+
db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000}));
198210
aggcursor = res.cursor;
199211
assert.neq(aggcursor.id, 0);
200212
assert.eq(aggcursor.nextBatch.length, 0);
@@ -294,11 +306,117 @@
294306

295307
jsTestLog("Ensuring attempt to read with legacy operations fails.");
296308
db.getMongo().forceReadMode('legacy');
297-
tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
298-
{cursor: {batchSize: 0}});
309+
const legacyCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
310+
{cursor: {batchSize: 0}});
299311
assert.throws(function() {
300-
tailableCursor.next();
312+
legacyCursor.next();
301313
}, [], "Legacy getMore expected to fail on changeNotification cursor.");
302314

315+
/**
316+
* Gets one document from the cursor using getMore with awaitData disabled. Asserts if no
317+
* document is present.
318+
*/
319+
function getOneDoc(cursor) {
320+
replTest.awaitReplication();
321+
assert.commandWorked(db.adminCommand(
322+
{configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
323+
let res = assert.commandWorked(db.runCommand({
324+
getMore: cursor.id,
325+
collection: getCollectionNameFromFullNamespace(cursor.ns),
326+
batchSize: 1
327+
}));
328+
assert.eq(res.cursor.nextBatch.length, 1);
329+
assert.commandWorked(
330+
db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
331+
return res.cursor.nextBatch[0];
332+
}
333+
334+
/**
335+
* Attempts to get a document from the cursor with awaitData disabled, and asserts if a document
336+
* is present.
337+
*/
338+
function assertNextBatchIsEmpty(cursor) {
339+
replTest.awaitReplication();
340+
assert.commandWorked(db.adminCommand(
341+
{configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
342+
let res = assert.commandWorked(db.runCommand({
343+
getMore: cursor.id,
344+
collection: getCollectionNameFromFullNamespace(cursor.ns),
345+
batchSize: 1
346+
}));
347+
assert.eq(res.cursor.nextBatch.length, 0);
348+
assert.commandWorked(
349+
db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
350+
}
351+
352+
jsTestLog("Testing resumability");
353+
assert.commandWorked(db.createCollection("resume1"));
354+
355+
// Note we do not project away 'id.ts' as it is part of the resume token.
356+
res = assert.commandWorked(
357+
db.runCommand({aggregate: "resume1", pipeline: [{$changeNotification: {}}], cursor: {}}));
358+
let resumeCursor = res.cursor;
359+
assert.neq(resumeCursor.id, 0);
360+
assert.eq(resumeCursor.firstBatch.length, 0);
361+
362+
// Insert a document and save the resulting change notification.
363+
assert.writeOK(db.resume1.insert({_id: 1}));
364+
const firstInsertChangeDoc = getOneDoc(resumeCursor);
365+
assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
366+
367+
jsTestLog("Testing resume after one document.");
368+
res = assert.commandWorked(db.runCommand({
369+
aggregate: "resume1",
370+
pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}],
371+
cursor: {batchSize: 0}
372+
}));
373+
resumeCursor = res.cursor;
374+
assertNextBatchIsEmpty(resumeCursor);
375+
376+
jsTestLog("Inserting additional documents.");
377+
assert.writeOK(db.resume1.insert({_id: 2}));
378+
const secondInsertChangeDoc = getOneDoc(resumeCursor);
379+
assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
380+
assert.writeOK(db.resume1.insert({_id: 3}));
381+
const thirdInsertChangeDoc = getOneDoc(resumeCursor);
382+
assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
383+
assertNextBatchIsEmpty(resumeCursor);
384+
385+
jsTestLog("Testing resume after first document of three.");
386+
res = assert.commandWorked(db.runCommand({
387+
aggregate: "resume1",
388+
pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}],
389+
cursor: {batchSize: 0}
390+
}));
391+
resumeCursor = res.cursor;
392+
assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc);
393+
assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
394+
assertNextBatchIsEmpty(resumeCursor);
395+
396+
jsTestLog("Testing resume after second document of three.");
397+
res = assert.commandWorked(db.runCommand({
398+
aggregate: "resume1",
399+
pipeline: [{$changeNotification: {resumeAfter: secondInsertChangeDoc._id}}],
400+
cursor: {batchSize: 0}
401+
}));
402+
resumeCursor = res.cursor;
403+
assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
404+
assertNextBatchIsEmpty(resumeCursor);
405+
406+
jsTestLog("Testing that resume is possible after the collection is dropped.");
407+
assert(db.resume1.drop());
408+
const invalidateDoc = getOneDoc(resumeCursor);
409+
assert.eq(invalidateDoc.operationType, "invalidate");
410+
res = assert.commandWorked(db.runCommand({
411+
aggregate: "resume1",
412+
pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}],
413+
cursor: {batchSize: 0}
414+
}));
415+
resumeCursor = res.cursor;
416+
assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc);
417+
assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
418+
assert.docEq(getOneDoc(resumeCursor), invalidateDoc);
419+
assertNextBatchIsEmpty(resumeCursor);
420+
303421
replTest.stopSet();
304422
}());

0 commit comments

Comments
 (0)