Skip to content

feat(NODE-4683): make ChangeStream an async iterable #3454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Nov 2, 2022
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: finish tests for asyncIterator
  • Loading branch information
andymina committed Oct 27, 2022
commit c6b409960f3b2156bf12e51c6165c4efd3f4e971
130 changes: 81 additions & 49 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2200,13 +2200,41 @@ describe('ChangeStream resumability', function () {
});

context.only('#asyncIterator', function () {
/**
* TODO(andymina): three test cases to cover
*
* happy path - asyncIterable works
* unhappy path - it errors out
* resumable error - continues but also throws the error out
*/
for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

await collection.insertOne({ city: 'New York City' });

let total_changes = 0;
for await (const change of changeStream) {
total_changes++;
if (total_changes === 1) {
changeStream.close();
}
}

expect(aggregateEvents).to.have.lengthOf(2);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
Expand All @@ -2220,10 +2248,8 @@ describe('ChangeStream resumability', function () {
// In order to test the resume, we need to ensure that at least one document has
// been iterated so we have a resume token to resume on.

// insert the doc
await collection.insertOne({ city: 'New York City' });

// fail the call
const mock = sinon
.stub(changeStream.cursor, '_getMore')
.callsFake((_batchSize, callback) => {
Expand All @@ -2249,58 +2275,64 @@ describe('ChangeStream resumability', function () {
);
}

// happy path
it('happy path', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
await collection.insertMany(docs);

let count = 0;
for await (const change of changeStream) {
const { fullDocument } = change;
expect(fullDocument.city).to.equal(docs[count].city);

count++;
if (count === 3) {
expect(docs.length).to.equal(count);
changeStream.close();
}
}
});

// unhappy path
it(
'unhappy path',
'can iterate through changes',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

const unresumableErrorCode = 1000;
await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
} as FailPoint);
const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
await collection.insertMany(docs);

await collection.insertOne({ city: 'New York City' });
let count = 0;
for await (const change of changeStream) {
const { fullDocument } = change;
expect(fullDocument.city).to.equal(docs[count].city);

try {
for await (const change of changeStream) {
// should not run
count++;
if (count === 3) {
changeStream.close();
}
} catch (error) {
expect(error).to.be.instanceOf(MongoServerError);
}

expect(docs.length).to.equal(count);
}
);

context('when the error is not a resumable error', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

const unresumableErrorCode = 1000;
await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
} as FailPoint);

await collection.insertOne({ city: 'New York City' });

try {
for await (const change of changeStream) {
// should not run
}
} catch (error) {
expect(error).to.be.instanceOf(MongoServerError);
}
}
);
});

});
});

Expand Down