Skip to content

fix(NODE-4783): handle orphaned operation descriptions #3463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: error on invalid queue size
  • Loading branch information
durran committed Nov 15, 2022
commit cf2650e550e6fdec9e3591586472f6403ec72777
26 changes: 17 additions & 9 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
MongoMissingDependencyError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoRuntimeError,
MongoServerError,
MongoWriteConcernError
} from '../error';
Expand Down Expand Up @@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter');
/** @internal */
const kDelayedTimeoutId = Symbol('delayedTimeoutId');

const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';

/** @internal */
export interface CommandOptions extends BSONSerializeOptions {
command?: boolean;
Expand Down Expand Up @@ -374,15 +377,20 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (!operationDescription && this.isMonitoringConnection) {
// NODE-4783: How do we recover from this when the initial hello's requestId is not
// the responseTo when hello responses have been skipped?
//
// Get the first orphaned operation description.
const entry = this[kQueue].entries().next();
if (entry) {
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
// If the orphaned operation description exists then set it.
operationDescription = orphaned;
// Remove the entry with the bad request id from the queue.
this[kQueue].delete(requestId);

// First check if the map is of invalid size
if (this[kQueue].size > 1) {
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
} else {
// Get the first orphaned operation description.
const entry = this[kQueue].entries().next();
if (entry) {
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
// If the orphaned operation description exists then set it.
operationDescription = orphaned;
// Remove the entry with the bad request id from the queue.
this[kQueue].delete(requestId);
}
}
}

Expand Down
70 changes: 62 additions & 8 deletions test/unit/cmap/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { EventEmitter, on } from 'events';
import { EventEmitter, once } from 'events';
import { Socket } from 'net';
import * as sinon from 'sinon';
import { Readable } from 'stream';
Expand All @@ -9,7 +9,7 @@ import { BinMsg } from '../../../src/cmap/commands';
import { connect } from '../../../src/cmap/connect';
import { Connection, hasSessionSupport } from '../../../src/cmap/connection';
import { MessageStream } from '../../../src/cmap/message_stream';
import { MongoNetworkTimeoutError } from '../../../src/error';
import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error';
import { isHello, ns } from '../../../src/utils';
import * as mock from '../../tools/mongodb-mock/index';
import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils';
Expand Down Expand Up @@ -172,12 +172,13 @@ describe('new Connection()', function () {
let callbackSpy;
const inputStream = new Readable();
const document = { ok: 1 };
const last = { isWritablePrimary: true };

beforeEach(function () {
callbackSpy = sinon.spy();
const firstHello = generateOpMsgBuffer(document);
const secondHello = generateOpMsgBuffer(document);
const thirdHello = generateOpMsgBuffer(document);
const thirdHello = generateOpMsgBuffer(last);
const buffer = Buffer.concat([firstHello, secondHello, thirdHello]);

connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults));
Expand All @@ -199,9 +200,10 @@ describe('new Connection()', function () {
inputStream.push(null);
});

it('calls the operation description callback with the document', async function () {
await on(inputStream, 'message');
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
it('calls the callback with the last hello document', async function () {
const messages = await once(connection, 'message');
expect(messages[0].responseTo).to.equal(0);
expect(callbackSpy).to.be.calledOnceWith(undefined, last);
});
});

Expand Down Expand Up @@ -230,8 +232,8 @@ describe('new Connection()', function () {
const msg = generateOpMsgBuffer(document);
const msgHeader: MessageHeader = {
length: msg.readInt32LE(0),
requestId: msg.readInt32LE(4),
responseTo: msg.readInt32LE(8),
requestId: 1,
responseTo: 0, // This will not match.
opCode: msg.readInt32LE(12)
};
const msgBody = msg.subarray(16);
Expand Down Expand Up @@ -284,6 +286,58 @@ describe('new Connection()', function () {
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
});
});

context('when more than one operation description is in the queue', function () {
let spyOne;
let spyTwo;
const document = { ok: 1 };

beforeEach(function () {
spyOne = sinon.spy();
spyTwo = sinon.spy();

// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
connection.isMonitoringConnection = true;
const queueSymbol = getSymbolFrom(connection, 'queue');
queue = connection[queueSymbol];

// Create the operation descriptions.
const descriptionOne: OperationDescription = {
requestId: 1,
cb: spyOne
};
const descriptionTwo: OperationDescription = {
requestId: 2,
cb: spyTwo
};

// Stick an operation description in the queue.
queue.set(2, descriptionOne);
queue.set(3, descriptionTwo);
// Emit a message that matches the existing operation description.
const msg = generateOpMsgBuffer(document);
const msgHeader: MessageHeader = {
length: msg.readInt32LE(0),
requestId: 2,
responseTo: 1,
opCode: msg.readInt32LE(12)
};
const msgBody = msg.subarray(16);

const message = new BinMsg(msg, msgHeader, msgBody);
connection.onMessage(message);
});

it('calls all operation description callbacks with an error', function () {
expect(spyOne).to.be.calledOnce;
expect(spyTwo).to.be.calledOnce;
const errorOne = spyOne.firstCall.args[0];
const errorTwo = spyTwo.firstCall.args[0];
expect(errorOne).to.be.instanceof(MongoRuntimeError);
expect(errorTwo).to.be.instanceof(MongoRuntimeError);
});
});
});
});

Expand Down