Skip to content

Commit fd835cf

Browse files
W-A-Jamesbaileympearson
authored andcommitted
fix(NODE-4834): ensure that MessageStream is destroyed when connections are destroyed (#3482)
1 parent 796801c commit fd835cf

File tree

2 files changed

+218
-34
lines changed

2 files changed

+218
-34
lines changed

src/cmap/connection.ts

+14-29
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
154154
address: string;
155155
socketTimeoutMS: number;
156156
monitorCommands: boolean;
157+
/** Indicates that the connection (including underlying TCP socket) has been closed. */
157158
closed: boolean;
158-
destroyed: boolean;
159159
lastHelloMS?: number;
160160
serverApi?: ServerApi;
161161
helloOk?: boolean;
@@ -204,7 +204,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
204204
this.monitorCommands = options.monitorCommands;
205205
this.serverApi = options.serverApi;
206206
this.closed = false;
207-
this.destroyed = false;
208207
this[kHello] = null;
209208
this[kClusterTime] = null;
210209

@@ -297,10 +296,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
297296
if (this.closed) {
298297
return;
299298
}
300-
301-
this[kStream].destroy(error);
302-
303-
this.closed = true;
299+
this.destroy({ force: false });
304300

305301
for (const op of this[kQueue].values()) {
306302
op.cb(error);
@@ -314,8 +310,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
314310
if (this.closed) {
315311
return;
316312
}
317-
318-
this.closed = true;
313+
this.destroy({ force: false });
319314

320315
const message = `connection ${this.id} to ${this.address} closed`;
321316
for (const op of this[kQueue].values()) {
@@ -332,9 +327,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
332327
}
333328

334329
this[kDelayedTimeoutId] = setTimeout(() => {
335-
this[kStream].destroy();
336-
337-
this.closed = true;
330+
this.destroy({ force: false });
338331

339332
const message = `connection ${this.id} to ${this.address} timed out`;
340333
const beforeHandshake = this.hello == null;
@@ -447,31 +440,23 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
447440
this.removeAllListeners(Connection.PINNED);
448441
this.removeAllListeners(Connection.UNPINNED);
449442

450-
if (this[kStream] == null || this.destroyed) {
451-
this.destroyed = true;
452-
if (typeof callback === 'function') {
453-
callback();
454-
}
455-
456-
return;
457-
}
443+
this[kMessageStream].destroy();
444+
this.closed = true;
458445

459446
if (options.force) {
460447
this[kStream].destroy();
461-
this.destroyed = true;
462-
if (typeof callback === 'function') {
463-
callback();
448+
if (callback) {
449+
return process.nextTick(callback);
464450
}
465-
466-
return;
467451
}
468452

469-
this[kStream].end(() => {
470-
this.destroyed = true;
471-
if (typeof callback === 'function') {
472-
callback();
453+
if (!this[kStream].writableEnded) {
454+
this[kStream].end(callback);
455+
} else {
456+
if (callback) {
457+
return process.nextTick(callback);
473458
}
474-
});
459+
}
475460
}
476461

477462
command(

test/unit/cmap/connection.test.ts

+204-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const connectionOptionsDefaults = {
3131

3232
/** The absolute minimum socket API needed by Connection as of writing this test */
3333
class FakeSocket extends EventEmitter {
34+
writableEnded: boolean;
3435
address() {
3536
// is never called
3637
}
@@ -39,6 +40,14 @@ class FakeSocket extends EventEmitter {
3940
}
4041
destroy() {
4142
// is called, has no side effects
43+
this.writableEnded = true;
44+
}
45+
end(cb) {
46+
this.writableEnded = true;
47+
// nextTick to simulate I/O delay
48+
if (typeof cb === 'function') {
49+
process.nextTick(cb);
50+
}
4251
}
4352
get remoteAddress() {
4453
return 'iLoveJavaScript';
@@ -48,6 +57,20 @@ class FakeSocket extends EventEmitter {
4857
}
4958
}
5059

60+
class InputStream extends Readable {
61+
writableEnded: boolean;
62+
constructor(options?) {
63+
super(options);
64+
}
65+
66+
end(cb) {
67+
this.writableEnded = true;
68+
if (typeof cb === 'function') {
69+
process.nextTick(cb);
70+
}
71+
}
72+
}
73+
5174
describe('new Connection()', function () {
5275
let server;
5376
after(() => mock.cleanup());
@@ -106,7 +129,7 @@ describe('new Connection()', function () {
106129
expect(err).to.be.instanceOf(MongoNetworkTimeoutError);
107130
expect(result).to.not.exist;
108131

109-
expect(conn).property('stream').property('destroyed', true);
132+
expect(conn).property('stream').property('writableEnded', true);
110133

111134
done();
112135
});
@@ -175,7 +198,7 @@ describe('new Connection()', function () {
175198

176199
context('when multiple hellos exist on the stream', function () {
177200
let callbackSpy;
178-
const inputStream = new Readable();
201+
const inputStream = new InputStream();
179202
const document = { ok: 1 };
180203
const last = { isWritablePrimary: true };
181204

@@ -394,7 +417,7 @@ describe('new Connection()', function () {
394417
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
395418
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
396419
kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId');
397-
messageStream = connection[messageStreamSymbol];
420+
messageStream = sinon.spy(connection[messageStreamSymbol]);
398421
});
399422

400423
afterEach(() => {
@@ -407,13 +430,15 @@ describe('new Connection()', function () {
407430

408431
driverSocket.emit('timeout');
409432
expect(connection.onTimeout).to.have.been.calledOnce;
433+
expect(connection.destroy).to.not.have.been.called;
410434
expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass);
411435
expect(connection).to.have.property('closed', false);
412-
expect(driverSocket.destroy).to.not.have.been.called;
436+
expect(driverSocket.end).to.not.have.been.called;
413437

414438
clock.tick(1);
415439

416-
expect(driverSocket.destroy).to.have.been.calledOnce;
440+
expect(driverSocket.end).to.have.been.calledOnce;
441+
expect(connection.destroy).to.have.been.calledOnce;
417442
expect(connection).to.have.property('closed', true);
418443
});
419444

@@ -438,6 +463,88 @@ describe('new Connection()', function () {
438463
expect(connection).to.have.property('closed', false);
439464
expect(connection).to.have.property(kDelayedTimeoutId, null);
440465
});
466+
467+
it('destroys the message stream and socket', () => {
468+
expect(connection).to.have.property(kDelayedTimeoutId, null);
469+
470+
driverSocket.emit('timeout');
471+
472+
clock.tick(1);
473+
474+
expect(connection.onTimeout).to.have.been.calledOnce;
475+
expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass);
476+
477+
expect(messageStream.destroy).to.have.been.calledOnce;
478+
expect(driverSocket.destroy).to.not.have.been.called;
479+
expect(driverSocket.end).to.have.been.calledOnce;
480+
});
481+
});
482+
483+
describe('onError()', () => {
484+
let connection: sinon.SinonSpiedInstance<Connection>;
485+
let clock: sinon.SinonFakeTimers;
486+
let timerSandbox: sinon.SinonFakeTimers;
487+
let driverSocket: sinon.SinonSpiedInstance<FakeSocket>;
488+
let messageStream: MessageStream;
489+
beforeEach(() => {
490+
timerSandbox = createTimerSandbox();
491+
clock = sinon.useFakeTimers();
492+
driverSocket = sinon.spy(new FakeSocket());
493+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
494+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
495+
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
496+
messageStream = sinon.spy(connection[messageStreamSymbol]);
497+
});
498+
499+
afterEach(() => {
500+
timerSandbox.restore();
501+
clock.restore();
502+
});
503+
504+
it('destroys the message stream and socket', () => {
505+
messageStream.emit('error');
506+
clock.tick(1);
507+
expect(connection.onError).to.have.been.calledOnce;
508+
connection.destroy({ force: false });
509+
clock.tick(1);
510+
expect(messageStream.destroy).to.have.been.called;
511+
expect(driverSocket.destroy).to.not.have.been.called;
512+
expect(driverSocket.end).to.have.been.calledOnce;
513+
});
514+
});
515+
516+
describe('onClose()', () => {
517+
let connection: sinon.SinonSpiedInstance<Connection>;
518+
let clock: sinon.SinonFakeTimers;
519+
let timerSandbox: sinon.SinonFakeTimers;
520+
let driverSocket: sinon.SinonSpiedInstance<FakeSocket>;
521+
let messageStream: MessageStream;
522+
beforeEach(() => {
523+
timerSandbox = createTimerSandbox();
524+
clock = sinon.useFakeTimers();
525+
526+
driverSocket = sinon.spy(new FakeSocket());
527+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
528+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
529+
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
530+
messageStream = sinon.spy(connection[messageStreamSymbol]);
531+
});
532+
533+
afterEach(() => {
534+
timerSandbox.restore();
535+
clock.restore();
536+
});
537+
538+
it('destroys the message stream and socket', () => {
539+
driverSocket.emit('close');
540+
clock.tick(1);
541+
expect(connection.onClose).to.have.been.calledOnce;
542+
connection.destroy({ force: false });
543+
clock.tick(1);
544+
expect(messageStream.destroy).to.have.been.called;
545+
expect(driverSocket.destroy).to.not.have.been.called;
546+
expect(driverSocket.end).to.have.been.calledOnce;
547+
});
441548
});
442549

443550
describe('.hasSessionSupport', function () {
@@ -491,4 +598,96 @@ describe('new Connection()', function () {
491598
});
492599
});
493600
});
601+
602+
describe('destroy()', () => {
603+
let connection: sinon.SinonSpiedInstance<Connection>;
604+
let clock: sinon.SinonFakeTimers;
605+
let timerSandbox: sinon.SinonFakeTimers;
606+
let driverSocket: sinon.SinonSpiedInstance<FakeSocket>;
607+
let messageStream: MessageStream;
608+
beforeEach(() => {
609+
timerSandbox = createTimerSandbox();
610+
clock = sinon.useFakeTimers();
611+
612+
driverSocket = sinon.spy(new FakeSocket());
613+
// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
614+
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
615+
const messageStreamSymbol = getSymbolFrom(connection, 'messageStream');
616+
messageStream = sinon.spy(connection[messageStreamSymbol]);
617+
});
618+
619+
afterEach(() => {
620+
timerSandbox.restore();
621+
clock.restore();
622+
});
623+
624+
context('when options.force == true', function () {
625+
it('calls stream.destroy', () => {
626+
connection.destroy({ force: true });
627+
clock.tick(1);
628+
expect(driverSocket.destroy).to.have.been.calledOnce;
629+
});
630+
631+
it('does not call stream.end', () => {
632+
connection.destroy({ force: true });
633+
clock.tick(1);
634+
expect(driverSocket.end).to.not.have.been.called;
635+
});
636+
637+
it('destroys the tcp socket', () => {
638+
connection.destroy({ force: true });
639+
clock.tick(1);
640+
expect(driverSocket.destroy).to.have.been.calledOnce;
641+
});
642+
643+
it('destroys the messageStream', () => {
644+
connection.destroy({ force: true });
645+
clock.tick(1);
646+
expect(messageStream.destroy).to.have.been.calledOnce;
647+
});
648+
649+
it('calls stream.destroy whenever destroy is called ', () => {
650+
connection.destroy({ force: true });
651+
connection.destroy({ force: true });
652+
connection.destroy({ force: true });
653+
clock.tick(1);
654+
expect(driverSocket.destroy).to.have.been.calledThrice;
655+
});
656+
});
657+
658+
context('when options.force == false', function () {
659+
it('calls stream.end', () => {
660+
connection.destroy({ force: false });
661+
clock.tick(1);
662+
expect(driverSocket.end).to.have.been.calledOnce;
663+
});
664+
665+
it('does not call stream.destroy', () => {
666+
connection.destroy({ force: false });
667+
clock.tick(1);
668+
expect(driverSocket.destroy).to.not.have.been.called;
669+
});
670+
671+
it('ends the tcp socket', () => {
672+
connection.destroy({ force: false });
673+
clock.tick(1);
674+
expect(driverSocket.end).to.have.been.calledOnce;
675+
});
676+
677+
it('destroys the messageStream', () => {
678+
connection.destroy({ force: false });
679+
clock.tick(1);
680+
expect(messageStream.destroy).to.have.been.calledOnce;
681+
});
682+
683+
it('calls stream.end exactly once when destroy is called multiple times', () => {
684+
connection.destroy({ force: false });
685+
connection.destroy({ force: false });
686+
connection.destroy({ force: false });
687+
connection.destroy({ force: false });
688+
clock.tick(1);
689+
expect(driverSocket.end).to.have.been.calledOnce;
690+
});
691+
});
692+
});
494693
});

0 commit comments

Comments
 (0)