Skip to content

Commit e544d88

Browse files
fix(NODE-5064): consolidate connection cleanup logic and ensure socket is always closed (#3572)
Co-authored-by: Warren James <[email protected]>
1 parent 0df03ef commit e544d88

File tree

2 files changed

+519
-98
lines changed

2 files changed

+519
-98
lines changed

src/cmap/connection.ts

+55-60
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
154154
address: string;
155155
socketTimeoutMS: number;
156156
monitorCommands: boolean;
157+
/** Indicates that the connection (including underlying TCP socket) has been closed. */
157158
closed: boolean;
158-
destroyed: boolean;
159159
lastHelloMS?: number;
160160
serverApi?: ServerApi;
161161
helloOk?: boolean;
@@ -204,7 +204,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
204204
this.monitorCommands = options.monitorCommands;
205205
this.serverApi = options.serverApi;
206206
this.closed = false;
207-
this.destroyed = false;
208207
this[kHello] = null;
209208
this[kClusterTime] = null;
210209

@@ -294,56 +293,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
294293
}
295294

296295
onError(error: Error) {
297-
if (this.closed) {
298-
return;
299-
}
300-
301-
this[kStream].destroy(error);
302-
303-
this.closed = true;
304-
305-
for (const op of this[kQueue].values()) {
306-
op.cb(error);
307-
}
308-
309-
this[kQueue].clear();
310-
this.emit(Connection.CLOSE);
296+
this.cleanup(true, error);
311297
}
312298

313299
onClose() {
314-
if (this.closed) {
315-
return;
316-
}
317-
318-
this.closed = true;
319-
320300
const message = `connection ${this.id} to ${this.address} closed`;
321-
for (const op of this[kQueue].values()) {
322-
op.cb(new MongoNetworkError(message));
323-
}
324-
325-
this[kQueue].clear();
326-
this.emit(Connection.CLOSE);
301+
this.cleanup(true, new MongoNetworkError(message));
327302
}
328303

329304
onTimeout() {
330-
if (this.closed) {
331-
return;
332-
}
333-
334305
this[kDelayedTimeoutId] = setTimeout(() => {
335-
this[kStream].destroy();
336-
337-
this.closed = true;
338-
339306
const message = `connection ${this.id} to ${this.address} timed out`;
340307
const beforeHandshake = this.hello == null;
341-
for (const op of this[kQueue].values()) {
342-
op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake }));
343-
}
344-
345-
this[kQueue].clear();
346-
this.emit(Connection.CLOSE);
308+
this.cleanup(true, new MongoNetworkTimeoutError(message, { beforeHandshake }));
347309
}, 1).unref(); // No need for this timer to hold the event loop open
348310
}
349311

@@ -364,7 +326,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
364326

365327
// First check if the map is of invalid size
366328
if (this[kQueue].size > 1) {
367-
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
329+
this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE));
368330
} else {
369331
// Get the first orphaned operation description.
370332
const entry = this[kQueue].entries().next();
@@ -444,34 +406,67 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
444406
}
445407

446408
destroy(options: DestroyOptions, callback?: Callback): void {
409+
if (this.closed) {
410+
process.nextTick(() => callback?.());
411+
return;
412+
}
413+
if (typeof callback === 'function') {
414+
this.once('close', () => process.nextTick(() => callback()));
415+
}
416+
417+
// load balanced mode requires that these listeners remain on the connection
418+
// after cleanup on timeouts, errors or close so we remove them before calling
419+
// cleanup.
447420
this.removeAllListeners(Connection.PINNED);
448421
this.removeAllListeners(Connection.UNPINNED);
422+
const message = `connection ${this.id} to ${this.address} closed`;
423+
this.cleanup(options.force, new MongoNetworkError(message));
424+
}
449425

450-
if (this[kStream] == null || this.destroyed) {
451-
this.destroyed = true;
452-
if (typeof callback === 'function') {
453-
callback();
454-
}
455-
426+
/**
427+
* A method that cleans up the connection. When `force` is true, this method
428+
* forcibly destroys the socket.
429+
*
430+
* If an error is provided, any in-flight operations will be closed with the error.
431+
*
432+
* This method does nothing if the connection is already closed.
433+
*/
434+
private cleanup(force: boolean, error?: Error): void {
435+
if (this.closed) {
456436
return;
457437
}
458438

459-
if (options.force) {
460-
this[kStream].destroy();
461-
this.destroyed = true;
462-
if (typeof callback === 'function') {
463-
callback();
439+
this.closed = true;
440+
441+
const completeCleanup = () => {
442+
for (const op of this[kQueue].values()) {
443+
op.cb(error);
464444
}
465445

446+
this[kQueue].clear();
447+
448+
this.emit(Connection.CLOSE);
449+
};
450+
451+
this[kStream].removeAllListeners();
452+
this[kMessageStream].removeAllListeners();
453+
454+
this[kMessageStream].destroy();
455+
456+
if (force) {
457+
this[kStream].destroy();
458+
completeCleanup();
466459
return;
467460
}
468461

469-
this[kStream].end(() => {
470-
this.destroyed = true;
471-
if (typeof callback === 'function') {
472-
callback();
473-
}
474-
});
462+
if (!this[kStream].writableEnded) {
463+
this[kStream].end(() => {
464+
this[kStream].destroy();
465+
completeCleanup();
466+
});
467+
} else {
468+
completeCleanup();
469+
}
475470
}
476471

477472
command(

0 commit comments

Comments
 (0)