Skip to content

Commit ce8bc52

Browse files
bigmontzrobsdedude
andauthored
Fix connection getting timeout while idle (neo4j#1167)
When the connection goes idle, an observer is appended. And this causes the receive timeout to start to count. This behaviour causes connections timing out while being idle in the pool. For fixing this, the connection received methods to handle its status changing to idle and back from idle. When the connection goes idle, it should stop the timeouts and do not start any timeout until it gets busy again. This change also impacts the `hasOngoingRequests` method, since idle connections don't have ongoing requests. Co-authored-by: Robsdedude <[email protected]>
1 parent fda596c commit ce8bc52

File tree

8 files changed

+233
-15
lines changed

8 files changed

+233
-15
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-pooled.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
226226
}
227227

228228
static _installIdleObserverOnConnection (conn, observer) {
229-
conn._queueObserver(observer)
229+
conn._setIdle(observer)
230230
}
231231

232232
static _removeIdleObserverOnConnection (conn) {
233-
conn._updateCurrentObserver()
233+
conn._unsetIdle()
234234
}
235235

236236
_handleSecurityError (error, address, connection) {

packages/bolt-connection/src/connection/connection-channel.js

+25-2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
125125
) {
126126
super(errorHandler)
127127
this._authToken = null
128+
this._idle = false
128129
this._reseting = false
129130
this._resetObservers = []
130131
this._id = idGenerator++
@@ -393,7 +394,26 @@ export default class ChannelConnection extends Connection {
393394
}
394395

395396
/**
396-
* This method still here because it's used by the {@link PooledConnectionProvider}
397+
* This method is used by the {@link PooledConnectionProvider}
398+
*
399+
* @param {any} observer
400+
*/
401+
_setIdle (observer) {
402+
this._idle = true
403+
this._ch.stopReceiveTimeout()
404+
this._protocol.queueObserverIfProtocolIsNotBroken(observer)
405+
}
406+
407+
/**
408+
* This method is used by the {@link PooledConnectionProvider}
409+
*/
410+
_unsetIdle () {
411+
this._idle = false
412+
this._updateCurrentObserver()
413+
}
414+
415+
/**
416+
* This method still here because of the connection-channel.tests.js
397417
*
398418
* @param {any} observer
399419
*/
@@ -402,7 +422,7 @@ export default class ChannelConnection extends Connection {
402422
}
403423

404424
hasOngoingObservableRequests () {
405-
return this._protocol.hasOngoingObservableRequests()
425+
return !this._idle && this._protocol.hasOngoingObservableRequests()
406426
}
407427

408428
/**
@@ -500,6 +520,9 @@ export default class ChannelConnection extends Connection {
500520
* @param {number} requestsNumber Ongoing requests number
501521
*/
502522
_handleOngoingRequestsNumberChange (requestsNumber) {
523+
if (this._idle) {
524+
return
525+
}
503526
if (requestsNumber === 0) {
504527
this._ch.stopReceiveTimeout()
505528
} else {

packages/bolt-connection/test/connection/connection-channel.test.js

+139-4
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ describe('ChannelConnection', () => {
559559
})
560560

561561
describe('.__handleOngoingRequestsNumberChange()', () => {
562-
it('should call channel.stopReceiveTimeout when requets number equals to 0', () => {
562+
it('should call channel.stopReceiveTimeout when requests number equals to 0', () => {
563563
const channel = {
564564
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
565565
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
@@ -571,7 +571,7 @@ describe('ChannelConnection', () => {
571571
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
572572
})
573573

574-
it('should not call channel.startReceiveTimeout when requets number equals to 0', () => {
574+
it('should not call channel.startReceiveTimeout when requests number equals to 0', () => {
575575
const channel = {
576576
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
577577
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
@@ -585,7 +585,7 @@ describe('ChannelConnection', () => {
585585

586586
it.each([
587587
[1], [2], [3], [5], [8], [13], [3000]
588-
])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => {
588+
])('should call channel.startReceiveTimeout when requests number equals to %d', (requests) => {
589589
const channel = {
590590
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
591591
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
@@ -599,7 +599,7 @@ describe('ChannelConnection', () => {
599599

600600
it.each([
601601
[1], [2], [3], [5], [8], [13], [3000]
602-
])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => {
602+
])('should not call channel.stopReceiveTimeout when requests number equals to %d', (requests) => {
603603
const channel = {
604604
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
605605
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
@@ -610,6 +610,68 @@ describe('ChannelConnection', () => {
610610

611611
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
612612
})
613+
614+
it.each([
615+
[0], [1], [2], [3], [5], [8], [13], [3000]
616+
])('should not call channel.stopReceiveTimeout or startReceiveTimeout when requests number equals to %d and connection is idle', (requests) => {
617+
const channel = {
618+
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
619+
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
620+
}
621+
const protocol = {
622+
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
623+
}
624+
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
625+
connection._setIdle({})
626+
channel.stopReceiveTimeout.mockClear()
627+
628+
connection._handleOngoingRequestsNumberChange(requests)
629+
630+
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
631+
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
632+
})
633+
634+
it.each([
635+
[1], [2], [3], [5], [8], [13], [3000]
636+
])('should call channel.startReceiveTimeout when requests number equals to %d and connection is not idle anymore', (requests) => {
637+
const channel = {
638+
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
639+
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
640+
}
641+
const protocol = {
642+
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
643+
updateCurrentObserver: jest.fn(() => {})
644+
}
645+
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
646+
connection._setIdle({})
647+
connection._unsetIdle()
648+
channel.stopReceiveTimeout.mockClear()
649+
650+
connection._handleOngoingRequestsNumberChange(requests)
651+
652+
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
653+
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1)
654+
})
655+
656+
it('should call channel.stopReceiveTimeout when requests number equals to 0 and connection is not idle anymore', () => {
657+
const channel = {
658+
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
659+
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
660+
}
661+
const protocol = {
662+
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
663+
updateCurrentObserver: jest.fn(() => {})
664+
}
665+
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
666+
connection._setIdle({})
667+
connection._unsetIdle()
668+
channel.stopReceiveTimeout.mockClear()
669+
670+
connection._handleOngoingRequestsNumberChange(0)
671+
672+
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
673+
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
674+
})
613675
})
614676

615677
describe('.resetAndFlush()', () => {
@@ -1181,6 +1243,44 @@ describe('ChannelConnection', () => {
11811243
})
11821244

11831245
describe('.hasOngoingObservableRequests()', () => {
1246+
it('should return false if connection is idle', () => {
1247+
const protocol = {
1248+
hasOngoingObservableRequests: jest.fn(() => true),
1249+
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
1250+
}
1251+
const channel = {
1252+
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
1253+
}
1254+
1255+
const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
1256+
connection._setIdle({})
1257+
1258+
const result = connection.hasOngoingObservableRequests()
1259+
1260+
expect(result).toBe(false)
1261+
expect(protocol.hasOngoingObservableRequests).not.toBeCalledWith()
1262+
})
1263+
1264+
it('should redirect request to the protocol when connection is not idle anymore', () => {
1265+
const protocol = {
1266+
hasOngoingObservableRequests: jest.fn(() => true),
1267+
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
1268+
updateCurrentObserver: jest.fn(() => {})
1269+
}
1270+
const channel = {
1271+
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
1272+
}
1273+
1274+
const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
1275+
connection._setIdle({})
1276+
connection._unsetIdle()
1277+
1278+
const result = connection.hasOngoingObservableRequests()
1279+
1280+
expect(result).toBe(true)
1281+
expect(protocol.hasOngoingObservableRequests).toBeCalledWith()
1282+
})
1283+
11841284
it('should call redirect request to the protocol', () => {
11851285
const protocol = {
11861286
hasOngoingObservableRequests: jest.fn(() => true)
@@ -1195,6 +1295,41 @@ describe('ChannelConnection', () => {
11951295
})
11961296
})
11971297

1298+
describe('._setIdle()', () => {
1299+
it('should stop receive timeout and enqueue observer', () => {
1300+
const protocol = {
1301+
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
1302+
}
1303+
const channel = {
1304+
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
1305+
}
1306+
const observer = {
1307+
onComplete: () => {}
1308+
}
1309+
1310+
const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
1311+
1312+
connection._setIdle(observer)
1313+
1314+
expect(channel.stopReceiveTimeout).toBeCalledTimes(1)
1315+
expect(protocol.queueObserverIfProtocolIsNotBroken).toBeCalledWith(observer)
1316+
})
1317+
})
1318+
1319+
describe('._unsetIdle()', () => {
1320+
it('should update current observer', () => {
1321+
const protocol = {
1322+
updateCurrentObserver: jest.fn(() => {})
1323+
}
1324+
1325+
const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol })
1326+
1327+
connection._unsetIdle()
1328+
1329+
expect(protocol.updateCurrentObserver).toBeCalledTimes(1)
1330+
})
1331+
})
1332+
11981333
function spyOnConnectionChannel ({
11991334
channel,
12001335
errorHandler,

packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
226226
}
227227

228228
static _installIdleObserverOnConnection (conn, observer) {
229-
conn._queueObserver(observer)
229+
conn._setIdle(observer)
230230
}
231231

232232
static _removeIdleObserverOnConnection (conn) {
233-
conn._updateCurrentObserver()
233+
conn._unsetIdle()
234234
}
235235

236236
_handleSecurityError (error, address, connection) {

packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js

+25-2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
125125
) {
126126
super(errorHandler)
127127
this._authToken = null
128+
this._idle = false
128129
this._reseting = false
129130
this._resetObservers = []
130131
this._id = idGenerator++
@@ -393,7 +394,26 @@ export default class ChannelConnection extends Connection {
393394
}
394395

395396
/**
396-
* This method still here because it's used by the {@link PooledConnectionProvider}
397+
* This method is used by the {@link PooledConnectionProvider}
398+
*
399+
* @param {any} observer
400+
*/
401+
_setIdle (observer) {
402+
this._idle = true
403+
this._ch.stopReceiveTimeout()
404+
this._protocol.queueObserverIfProtocolIsNotBroken(observer)
405+
}
406+
407+
/**
408+
* This method is used by the {@link PooledConnectionProvider}
409+
*/
410+
_unsetIdle () {
411+
this._idle = false
412+
this._updateCurrentObserver()
413+
}
414+
415+
/**
416+
* This method still here because of the connection-channel.tests.js
397417
*
398418
* @param {any} observer
399419
*/
@@ -402,7 +422,7 @@ export default class ChannelConnection extends Connection {
402422
}
403423

404424
hasOngoingObservableRequests () {
405-
return this._protocol.hasOngoingObservableRequests()
425+
return !this._idle && this._protocol.hasOngoingObservableRequests()
406426
}
407427

408428
/**
@@ -500,6 +520,9 @@ export default class ChannelConnection extends Connection {
500520
* @param {number} requestsNumber Ongoing requests number
501521
*/
502522
_handleOngoingRequestsNumberChange (requestsNumber) {
523+
if (this._idle) {
524+
return
525+
}
503526
if (requestsNumber === 0) {
504527
this._ch.stopReceiveTimeout()
505528
} else {

packages/neo4j-driver/test/internal/connection-provider-pooled.test.js

+25
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,29 @@ describe('#unit PooledConnectionProvider', () => {
7171
clock.uninstall()
7272
}
7373
})
74+
75+
it('_installIdleObserverOnConnection should set connection as idle', () => {
76+
const connection = new FakeConnection()
77+
const observer = { onCompleted: () => {} }
78+
79+
PooledConnectionProvider._installIdleObserverOnConnection(connection, observer)
80+
81+
expect(connection._idle).toBe(true)
82+
expect(connection._idleObserver).toBe(observer)
83+
})
84+
85+
it('_removeIdleObserverOnConnection should unset connection as idle', () => {
86+
const connection = new FakeConnection()
87+
const observer = { onCompleted: () => {} }
88+
89+
PooledConnectionProvider._installIdleObserverOnConnection(connection, observer)
90+
91+
expect(connection._idle).toBe(true)
92+
expect(connection._idleObserver).toBe(observer)
93+
94+
PooledConnectionProvider._removeIdleObserverOnConnection(connection)
95+
96+
expect(connection._idle).toBe(false)
97+
expect(connection._idleObserver).toBe(null)
98+
})
7499
})

packages/neo4j-driver/test/internal/fake-connection.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ export default class FakeConnection extends Connection {
3939
this._databaseId = null
4040
this._requestRoutingInformationMock = null
4141
this._creationTimestamp = Date.now()
42-
42+
this._idle = false
43+
this._idleObserver = null
4344
this.resetInvoked = 0
4445
this.releaseInvoked = 0
4546
this.seenQueries = []
@@ -101,6 +102,16 @@ export default class FakeConnection extends Connection {
101102
return this._idleTimestamp
102103
}
103104

105+
_setIdle (observer) {
106+
this._idle = true
107+
this._idleObserver = observer
108+
}
109+
110+
_unsetIdle () {
111+
this._idle = false
112+
this._idleObserver = null
113+
}
114+
104115
protocol () {
105116
// return fake protocol object that simply records seen queries and parameters
106117
return {

0 commit comments

Comments
 (0)