Skip to content

Commit 0494b30

Browse files
committed
Merge pull request neo4j#39 from neo4j/1.0-pool
Add session pooling
2 parents 4a82ae2 + cd14db8 commit 0494b30

File tree

10 files changed

+270
-24
lines changed

10 files changed

+270
-24
lines changed

src/v1/driver.js

+61-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
import Session from './session';
21+
import {Pool} from './internal/pool';
2122
import {connect} from "./internal/connector";
2223

2324
/**
@@ -37,24 +38,74 @@ class Driver {
3738
this._openSessions = {};
3839
this._sessionIdGenerator = 0;
3940
this._token = token || {};
41+
this._pool = new Pool(
42+
this._createConnection.bind(this),
43+
this._destroyConnection.bind(this),
44+
this._validateConnection.bind(this)
45+
);
4046
}
4147

4248
/**
43-
* Create and return new session
44-
* @return {Session} new session.
49+
* Create a new connection instance.
50+
* @return {Connection} new connector-api session instance, a low level session API.
51+
* @access private
4552
*/
46-
session() {
53+
_createConnection( release ) {
4754
let sessionId = this._sessionIdGenerator++;
4855
let conn = connect(this._url);
4956
conn.initialize(this._userAgent, this._token);
50-
let _driver = this;
51-
let _session = new Session( conn, () => {
52-
// On close of session, remove it from the list of open sessions
53-
delete _driver._openSessions[sessionId];
54-
});
57+
conn._id = sessionId;
58+
conn._release = () => release(conn);
59+
60+
this._openSessions[sessionId] = conn;
61+
return conn;
62+
}
63+
64+
/**
65+
* Check that a connection is usable
66+
* @return {boolean} true if the connection is open
67+
* @access private
68+
**/
69+
_validateConnection( conn ) {
70+
return conn.isOpen();
71+
}
72+
73+
/**
74+
* Dispose of a live session, closing any associated resources.
75+
* @return {Session} new session.
76+
* @access private
77+
*/
78+
_destroyConnection( conn ) {
79+
delete this._openSessions[conn._id];
80+
conn.close();
81+
}
82+
83+
/**
84+
* Create and return new session
85+
* @return {Session} new session.
86+
*/
87+
session() {
88+
let conn = this._pool.acquire();
89+
return new Session( conn, (cb) => {
90+
// This gets called on Session#close(), and is where we return
91+
// the pooled 'connection' instance.
92+
93+
// We don't pool Session instances, to avoid users using the Session
94+
// after they've called close. The `Session` object is just a thin
95+
// wrapper around Connection anyway, so it makes little difference.
5596

56-
this._openSessions[sessionId] = _session;
57-
return _session;
97+
// Queue up a 'reset', to ensure the next user gets a clean
98+
// session to work with. No need to flush, this will get sent
99+
// along with whatever the next thing the user wants to do with
100+
// this session ends up being, so we save the network round trip.
101+
conn.reset();
102+
103+
// Return connection to the pool
104+
conn._release();
105+
106+
// Call user callback
107+
if(cb) { cb(); }
108+
});
58109
}
59110

60111
/**

src/v1/internal/ch-node.js

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ class NodeChannel {
6060
_self.onmessage( new NodeBuffer( buffer ) );
6161
}
6262
});
63+
64+
this._conn.on('error', function(err){
65+
if( _self.onerror ) {
66+
_self.onerror(err);
67+
}
68+
});
6369
}
6470

6571
/**

src/v1/internal/ch-websocket.js

+6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ class WebSocketChannel {
5454
self.onmessage( b );
5555
}
5656
};
57+
58+
this._ws.onerror = () => {
59+
if( self.onerror ) {
60+
self.onerror();
61+
}
62+
}
5763
}
5864

5965
/**

src/v1/internal/connector.js

+22-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ else {
4040
let
4141
// Signature bytes for each message type
4242
INIT = 0x01, // 0000 0001 // INIT <user_agent>
43-
ACK_FAILURE = 0x0F, // 0000 1111 // ACK_FAILURE
43+
ACK_FAILURE = 0x0D, // 0000 1101 // ACK_FAILURE
44+
RESET = 0x0F, // 0000 1111 // RESET
4445
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
4546
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
4647
PULL_ALL = 0x3F, // 0011 1111 // PULL *
@@ -177,6 +178,9 @@ class Connection {
177178
this._unpacker = new packstream.Unpacker();
178179
this._isHandlingFailure = false;
179180

181+
// Set to true on fatal errors, to get this out of session pool.
182+
this._isBroken = false;
183+
180184
// For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
181185
this._unpacker.structMappers[NODE] = _mappers.node;
182186
this._unpacker.structMappers[RELATIONSHIP] = _mappers.rel;
@@ -198,14 +202,16 @@ class Connection {
198202
}
199203

200204
} else {
201-
// TODO: Report error
205+
this._isBroken = true;
202206
console.log("FATAL, unknown protocol version:", proposed)
203207
}
204208
};
205209

210+
this._ch.onerror = () => self._isBroken = true;
211+
206212
this._dechunker.onmessage = (buf) => {
207213
self._handleMessage( self._unpacker.unpack( buf ) );
208-
}
214+
};
209215

210216
let handshake = alloc( 5 * 4 );
211217
//magic preamble
@@ -269,6 +275,7 @@ class Connection {
269275
}
270276
break;
271277
default:
278+
this._isBroken = true;
272279
console.log("UNKNOWN MESSAGE: ", msg);
273280
}
274281
}
@@ -301,6 +308,13 @@ class Connection {
301308
this._chunker.messageBoundary();
302309
}
303310

311+
/** Queue a RESET-message to be sent to the database */
312+
reset( observer ) {
313+
this._queueObserver(observer);
314+
this._packer.packStruct( RESET );
315+
this._chunker.messageBoundary();
316+
}
317+
304318
/** Queue a ACK_FAILURE-message to be sent to the database */
305319
_ackFailure( observer ) {
306320
this._queueObserver(observer);
@@ -325,6 +339,11 @@ class Connection {
325339
this._chunker.flush();
326340
}
327341

342+
/** Check if this connection is in working condition */
343+
isOpen() {
344+
return !this._isBroken && this._ch._open;
345+
}
346+
328347
/**
329348
* Call close on the channel.
330349
* @param {function} cb - Function to call on close.

src/v1/internal/pool.js

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
class Pool {
21+
/**
22+
* @param create an allocation function that creates a new resource. It's given
23+
* a single argument, a function that will return the resource to
24+
* the pool if invoked, which is meant to be called on .dispose
25+
* or .close or whatever mechanism the resource uses to finalize.
26+
* @param destroy called with the resource when it is evicted from this pool
27+
* @param validate called at various times (like when an instance is acquired and
28+
* when it is returned). If this returns false, the resource will
29+
* be evicted
30+
* @param maxIdle the max number of resources that are allowed idle in the pool at
31+
* any time. If this threshold is exceeded, resources will be evicted.
32+
*/
33+
constructor(create, destroy=(()=>true), validate=(()=>true), maxIdle=50) {
34+
this._create = create;
35+
this._destroy = destroy;
36+
this._validate = validate;
37+
this._maxIdle = maxIdle;
38+
this._pool = [];
39+
this._release = this._release.bind(this);
40+
}
41+
42+
acquire() {
43+
if( this._pool.length > 0 ) {
44+
return this._pool.pop();
45+
} else {
46+
return this._create( this._release );
47+
}
48+
}
49+
50+
_release(resource) {
51+
if( this._pool.length >= this._maxIdle || !this._validate(resource) ) {
52+
this._destroy(resource);
53+
} else {
54+
this._pool.push(resource);
55+
}
56+
}
57+
}
58+
59+
export default {
60+
Pool
61+
}

src/v1/session.js

+7-8
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ class Session {
4747
* @param {Object} parameters - Map with parameters to use in statement
4848
* @return {Result} - New Result
4949
*/
50-
run(statement, parameters) {
50+
run(statement, parameters = {}) {
5151
if(typeof statement === 'object' && statement.text) {
5252
parameters = statement.parameters || {};
5353
statement = statement.text;
5454
}
5555
let streamObserver = new StreamObserver();
5656
if (!this._hasTx) {
57-
this._conn.run(statement, parameters || {}, streamObserver);
57+
this._conn.run(statement, parameters, streamObserver);
5858
this._conn.pullAll(streamObserver);
5959
this._conn.sync();
6060
} else {
@@ -68,7 +68,7 @@ class Session {
6868
* Begin a new transaction in this session. A session can have at most one transaction running at a time, if you
6969
* want to run multiple concurrent transactions, you should use multiple concurrent sessions.
7070
*
71-
* While a transaction is open the session cannot be used to run statements.
71+
* While a transaction is open the session cannot be used to run statements outside the transaction.
7272
*
7373
* @returns {Transaction} - New Transaction
7474
*/
@@ -82,13 +82,12 @@ class Session {
8282
}
8383

8484
/**
85-
* Close connection
86-
* @param {function()} cb - Function to be called on connection close
85+
* Close this session
86+
* @param {function()} cb - Function to be called after the session has been closed
8787
* @return
8888
*/
89-
close(cb) {
90-
this._onClose();
91-
this._conn.close(cb);
89+
close(cb=(()=>null)) {
90+
this._onClose(cb);
9291
}
9392
}
9493

test/internal/connector.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ var connect = require("../../lib/v1/internal/connector.js").connect;
2121

2222
describe('connector', function() {
2323

24-
fit('should read/write basic messages', function(done) {
24+
it('should read/write basic messages', function(done) {
2525
// Given
2626
var conn = connect("bolt://localhost")
2727

0 commit comments

Comments
 (0)