Skip to content

Commit 61453cc

Browse files
committed
Revert "SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries"
This reverts commit 3bab157.
1 parent a9a2772 commit 61453cc

17 files changed

+97
-413
lines changed

jstests/aggregation/sources/changeNotification/change_notification.js

Lines changed: 5 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,20 @@
44
(function() {
55
"use strict";
66

7-
var oplogProjection = {$project: {"_id.ts": 0}};
8-
97
// Helper for testing that pipeline returns correct set of results.
108
function testPipeline(pipeline, expectedResult, collection) {
11-
// Limit to the last N documents from the end of the oplog, because currently
12-
// $changeNotification always comes from the start of the oplog.
13-
pipeline.push({$sort: {"_id.ts": -1}});
14-
if (expectedResult.length > 0) {
15-
pipeline.push({$limit: expectedResult.length});
16-
}
179
// Strip the oplog fields we aren't testing.
18-
pipeline.push(oplogProjection);
19-
assert.docEq(collection.aggregate(pipeline).toArray().reverse(), expectedResult);
10+
pipeline.push({$limit: 1});
11+
pipeline.push({$project: {"_id.ts": 0}});
12+
assert.docEq(collection.aggregate(pipeline).toArray(), expectedResult);
2013
}
2114

22-
let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
23-
let nodes = replTest.startSet();
15+
var replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
16+
var nodes = replTest.startSet();
2417
replTest.initiate();
2518
replTest.awaitReplication();
2619

2720
db = replTest.getPrimary().getDB('test');
28-
db.getMongo().forceReadMode('commands');
2921

3022
jsTestLog("Testing single insert");
3123
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
@@ -137,145 +129,5 @@
137129
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
138130
testPipeline([{$changeNotification: {}}], [], db.dne1);
139131
testPipeline([{$changeNotification: {}}], [], db.dne2);
140-
141-
// Now make sure the cursor behaves like a tailable awaitData cursor.
142-
jsTestLog("Testing tailability");
143-
let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
144-
assert(!tailableCursor.hasNext());
145-
assert.writeOK(db.tailable1.insert({_id: 101, a: 1}));
146-
assert(tailableCursor.hasNext());
147-
assert.docEq(tailableCursor.next(), {
148-
"_id": {
149-
"_id": 101,
150-
"ns": "test.tailable1",
151-
},
152-
"documentKey": {"_id": 101},
153-
"newDocument": {"_id": 101, "a": 1},
154-
"ns": {"coll": "tailable1", "db": "test"},
155-
"operationType": "insert"
156-
});
157-
158-
jsTestLog("Testing awaitdata");
159-
let res = assert.commandWorked(db.runCommand({
160-
aggregate: "tailable2",
161-
pipeline: [{$changeNotification: {}}, oplogProjection],
162-
cursor: {}
163-
}));
164-
let aggcursor = res.cursor;
165-
166-
// We should get a valid cursor.
167-
assert.neq(aggcursor.id, 0);
168-
169-
// Initial batch size should be zero as there should be no data.
170-
assert.eq(aggcursor.firstBatch.length, 0);
171-
172-
// No data, so should return no results, but cursor should remain valid.
173-
res = assert.commandWorked(
174-
db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50}));
175-
aggcursor = res.cursor;
176-
assert.neq(aggcursor.id, 0);
177-
assert.eq(aggcursor.nextBatch.length, 0);
178-
179-
// Now insert something in parallel while waiting for it.
180-
let insertshell = startParallelShell(function() {
181-
// Wait for the getMore to appear in currentop.
182-
assert.soon(function() {
183-
return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length ==
184-
1;
185-
});
186-
assert.writeOK(db.tailable2.insert({_id: 102, a: 2}));
187-
});
188-
res = assert.commandWorked(
189-
db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000}));
190-
aggcursor = res.cursor;
191-
assert.eq(aggcursor.nextBatch.length, 1);
192-
assert.docEq(aggcursor.nextBatch[0], {
193-
"_id": {
194-
"_id": 102,
195-
"ns": "test.tailable2",
196-
},
197-
"documentKey": {"_id": 102},
198-
"newDocument": {"_id": 102, "a": 2},
199-
"ns": {"coll": "tailable2", "db": "test"},
200-
"operationType": "insert"
201-
});
202-
203-
// Wait for insert shell to terminate.
204-
insertshell();
205-
206-
jsTestLog("Testing awaitdata - no wake on insert to another collection");
207-
res = assert.commandWorked(db.runCommand({
208-
aggregate: "tailable3",
209-
pipeline: [{$changeNotification: {}}, oplogProjection],
210-
cursor: {}
211-
}));
212-
aggcursor = res.cursor;
213-
// We should get a valid cursor.
214-
assert.neq(aggcursor.id, 0);
215-
216-
// Initial batch size should be zero as there should be no data.
217-
assert.eq(aggcursor.firstBatch.length, 0);
218-
219-
// Now insert something in a different collection in parallel while waiting.
220-
insertshell = startParallelShell(function() {
221-
// Wait for the getMore to appear in currentop.
222-
assert.soon(function() {
223-
return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length ==
224-
1;
225-
});
226-
assert.writeOK(db.tailable3a.insert({_id: 103, a: 2}));
227-
});
228-
let start = new Date();
229-
res = assert.commandWorked(
230-
db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 1000}));
231-
let diff = (new Date()).getTime() - start.getTime();
232-
assert.gt(diff, 900, "AwaitData returned prematurely on insert to unrelated collection.");
233-
aggcursor = res.cursor;
234-
// Cursor should be valid with no data.
235-
assert.neq(aggcursor.id, 0);
236-
assert.eq(aggcursor.nextBatch.length, 0);
237-
238-
// Wait for insert shell to terminate.
239-
insertshell();
240-
241-
// This time, put something in a different collection, then in the correct collection.
242-
// We should wake up with just the correct data.
243-
insertshell = startParallelShell(function() {
244-
// Wait for the getMore to appear in currentop.
245-
assert.soon(function() {
246-
return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length ==
247-
1;
248-
});
249-
assert.writeOK(db.tailable3a.insert({_id: 104, a: 2}));
250-
assert(db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == 1);
251-
assert.writeOK(db.tailable3.insert({_id: 105, a: 3}));
252-
});
253-
res = assert.commandWorked(
254-
db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 5 * 60 * 1000}));
255-
aggcursor = res.cursor;
256-
assert.neq(aggcursor.id, 0);
257-
assert.eq(aggcursor.nextBatch.length, 1);
258-
assert.docEq(aggcursor.nextBatch[0], {
259-
"_id": {
260-
"_id": 105,
261-
"ns": "test.tailable3",
262-
},
263-
"documentKey": {"_id": 105},
264-
"newDocument": {"_id": 105, "a": 3},
265-
"ns": {"coll": "tailable3", "db": "test"},
266-
"operationType": "insert"
267-
});
268-
269-
// Wait for insert shell to terminate.
270-
insertshell();
271-
272-
jsTestLog("Ensuring attempt to read with legacy operations fails.");
273-
db.getMongo().forceReadMode('legacy');
274-
tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
275-
{cursor: {batchSize: 0}});
276-
assert.throws(function() {
277-
tailableCursor.next();
278-
}, [], "Legacy getMore expected to fail on changeNotification cursor.");
279-
280132
replTest.stopSet();
281133
}());

jstests/noPassthrough/awaitdata_getmore_cmd.js

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,8 @@
2828
cmdRes = db.runCommand({find: collName, tailable: true, awaitData: true});
2929
assert.commandFailed(cmdRes);
3030

31-
// With a non-existent collection, should succeed but return no data and a closed cursor.
32-
coll.drop();
33-
cmdRes = assert.commandWorked(db.runCommand({find: collName, tailable: true}));
34-
assert.eq(cmdRes.cursor.id, NumberLong(0));
35-
assert.eq(cmdRes.cursor.firstBatch.length, 0);
36-
3731
// Create a capped collection with 10 documents.
32+
coll.drop();
3833
assert.commandWorked(db.createCollection(collName, {capped: true, size: 2048}));
3934
for (var i = 0; i < 10; i++) {
4035
assert.writeOK(coll.insert({a: i}));
@@ -128,47 +123,4 @@
128123
}
129124
assert.gte((new Date()) - now, 2000);
130125

131-
// Test filtered inserts while writing to a capped collection.
132-
// Find with a filter which doesn't match any documents in the collection.
133-
cmdRes = assert.commandWorked(db.runCommand(
134-
{find: collName, batchSize: 2, filter: {x: 1}, awaitData: true, tailable: true}));
135-
assert.gt(cmdRes.cursor.id, NumberLong(0));
136-
assert.eq(cmdRes.cursor.ns, coll.getFullName());
137-
assert.eq(cmdRes.cursor.firstBatch.length, 0);
138-
139-
// getMore should time out if we insert a non-matching document.
140-
let insertshell = startParallelShell(function() {
141-
assert.soon(function() {
142-
return db.currentOp({op: "getmore", "command.collection": "await_data"})
143-
.inprog.length == 1;
144-
});
145-
assert.writeOK(db.await_data.insert({x: 0}));
146-
}, mongo.port);
147-
148-
now = new Date();
149-
cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 4000});
150-
assert.commandWorked(cmdRes);
151-
assert.gt(cmdRes.cursor.id, NumberLong(0));
152-
assert.eq(cmdRes.cursor.ns, coll.getFullName());
153-
assert.eq(cmdRes.cursor.nextBatch.length, 0);
154-
assert.gte((new Date()) - now,
155-
4000,
156-
"Insert not matching filter caused awaitData getMore to return prematurely.");
157-
insertshell();
158-
159-
// getMore should succeed if we insert a non-matching document followed by a matching one.
160-
insertshell = startParallelShell(function() {
161-
assert.writeOK(db.await_data.insert({x: 0}));
162-
assert.writeOK(db.await_data.insert({_id: "match", x: 1}));
163-
jsTestLog("Written");
164-
}, mongo.port);
165-
166-
cmdRes =
167-
db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 5 * 60 * 1000});
168-
assert.commandWorked(cmdRes);
169-
assert.gt(cmdRes.cursor.id, NumberLong(0));
170-
assert.eq(cmdRes.cursor.ns, coll.getFullName());
171-
assert.eq(cmdRes.cursor.nextBatch.length, 1);
172-
assert.docEq(cmdRes.cursor.nextBatch[0], {_id: "match", x: 1});
173-
insertshell();
174126
})();

src/mongo/db/clientcursor.h

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,6 @@ struct ClientCursorParams {
7171
}
7272
}
7373

74-
void setTailable(bool tailable) {
75-
if (tailable)
76-
queryOptions |= QueryOption_CursorTailable;
77-
else
78-
queryOptions &= ~QueryOption_CursorTailable;
79-
}
80-
81-
void setAwaitData(bool awaitData) {
82-
if (awaitData)
83-
queryOptions |= QueryOption_AwaitData;
84-
else
85-
queryOptions &= ~QueryOption_AwaitData;
86-
}
87-
8874
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
8975
const NamespaceString nss;
9076
std::vector<UserName> authenticatedUsers;
@@ -141,23 +127,10 @@ class ClientCursor {
141127
return _exec.get();
142128
}
143129

144-
/**
145-
* Returns the query options bitmask. If you'd like to know if the cursor is tailable or
146-
* awaitData, prefer using the specific methods isTailable() and isAwaitData() over using this
147-
* method.
148-
*/
149130
int queryOptions() const {
150131
return _queryOptions;
151132
}
152133

153-
bool isTailable() const {
154-
return _queryOptions & QueryOption_CursorTailable;
155-
}
156-
157-
bool isAwaitData() const {
158-
return _queryOptions & QueryOption_AwaitData;
159-
}
160-
161134
const BSONObj& getOriginatingCommandObj() const {
162135
return _originatingCommand;
163136
}

0 commit comments

Comments
 (0)