Skip to content

Commit cd14db8

Browse files
committed
Plug new pool into driver
1 parent 77937ce commit cd14db8

File tree

7 files changed

+101
-20
lines changed

7 files changed

+101
-20
lines changed

src/v1/driver.js

+61-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
import Session from './session';
21+
import {Pool} from './internal/pool';
2122
import {connect} from "./internal/connector";
2223

2324
/**
@@ -37,24 +38,74 @@ class Driver {
3738
this._openSessions = {};
3839
this._sessionIdGenerator = 0;
3940
this._token = token || {};
41+
this._pool = new Pool(
42+
this._createConnection.bind(this),
43+
this._destroyConnection.bind(this),
44+
this._validateConnection.bind(this)
45+
);
4046
}
4147

4248
/**
43-
* Create and return new session
44-
* @return {Session} new session.
49+
* Create a new connection instance.
50+
* @return {Connection} new connector-api session instance, a low level session API.
51+
* @access private
4552
*/
46-
session() {
53+
_createConnection( release ) {
4754
let sessionId = this._sessionIdGenerator++;
4855
let conn = connect(this._url);
4956
conn.initialize(this._userAgent, this._token);
50-
let _driver = this;
51-
let _session = new Session( conn, () => {
52-
// On close of session, remove it from the list of open sessions
53-
delete _driver._openSessions[sessionId];
54-
});
57+
conn._id = sessionId;
58+
conn._release = () => release(conn);
59+
60+
this._openSessions[sessionId] = conn;
61+
return conn;
62+
}
63+
64+
/**
65+
* Check that a connection is usable
66+
* @return {boolean} true if the connection is open
67+
* @access private
68+
**/
69+
_validateConnection( conn ) {
70+
return conn.isOpen();
71+
}
72+
73+
/**
74+
* Dispose of a live session, closing any associated resources.
75+
* @return {Session} new session.
76+
* @access private
77+
*/
78+
_destroyConnection( conn ) {
79+
delete this._openSessions[conn._id];
80+
conn.close();
81+
}
82+
83+
/**
84+
* Create and return new session
85+
* @return {Session} new session.
86+
*/
87+
session() {
88+
let conn = this._pool.acquire();
89+
return new Session( conn, (cb) => {
90+
// This gets called on Session#close(), and is where we return
91+
// the pooled 'connection' instance.
92+
93+
// We don't pool Session instances, to avoid users using the Session
94+
// after they've called close. The `Session` object is just a thin
95+
// wrapper around Connection anyway, so it makes little difference.
5596

56-
this._openSessions[sessionId] = _session;
57-
return _session;
97+
// Queue up a 'reset', to ensure the next user gets a clean
98+
// session to work with. No need to flush, this will get sent
99+
// along with whatever the next thing the user wants to do with
100+
// this session ends up being, so we save the network round trip.
101+
conn.reset();
102+
103+
// Return connection to the pool
104+
conn._release();
105+
106+
// Call user callback
107+
if(cb) { cb(); }
108+
});
58109
}
59110

60111
/**

src/v1/internal/ch-node.js

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ class NodeChannel {
6060
_self.onmessage( new NodeBuffer( buffer ) );
6161
}
6262
});
63+
64+
this._conn.on('error', function(err){
65+
if( _self.onerror ) {
66+
_self.onerror(err);
67+
}
68+
});
6369
}
6470

6571
/**

src/v1/internal/ch-websocket.js

+6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ class WebSocketChannel {
5454
self.onmessage( b );
5555
}
5656
};
57+
58+
this._ws.onerror = () => {
59+
if( self.onerror ) {
60+
self.onerror();
61+
}
62+
}
5763
}
5864

5965
/**

src/v1/internal/connector.js

+22-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ else {
4040
let
4141
// Signature bytes for each message type
4242
INIT = 0x01, // 0000 0001 // INIT <user_agent>
43-
ACK_FAILURE = 0x0F, // 0000 1111 // ACK_FAILURE
43+
ACK_FAILURE = 0x0D, // 0000 1101 // ACK_FAILURE
44+
RESET = 0x0F, // 0000 1111 // RESET
4445
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
4546
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
4647
PULL_ALL = 0x3F, // 0011 1111 // PULL *
@@ -176,6 +177,9 @@ class Connection {
176177
this._unpacker = new packstream.Unpacker();
177178
this._isHandlingFailure = false;
178179

180+
// Set to true on fatal errors, to get this out of session pool.
181+
this._isBroken = false;
182+
179183
// For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
180184
this._unpacker.structMappers[NODE] = _mappers.node;
181185
this._unpacker.structMappers[RELATIONSHIP] = _mappers.rel;
@@ -197,14 +201,16 @@ class Connection {
197201
}
198202

199203
} else {
200-
// TODO: Report error
204+
this._isBroken = true;
201205
console.log("FATAL, unknown protocol version:", proposed)
202206
}
203207
};
204208

209+
this._ch.onerror = () => self._isBroken = true;
210+
205211
this._dechunker.onmessage = (buf) => {
206212
self._handleMessage( self._unpacker.unpack( buf ) );
207-
}
213+
};
208214

209215
let handshake = alloc( 5 * 4 );
210216
//magic preamble
@@ -268,6 +274,7 @@ class Connection {
268274
}
269275
break;
270276
default:
277+
this._isBroken = true;
271278
console.log("UNKNOWN MESSAGE: ", msg);
272279
}
273280
}
@@ -300,6 +307,13 @@ class Connection {
300307
this._chunker.messageBoundary();
301308
}
302309

310+
/** Queue a RESET-message to be sent to the database */
311+
reset( observer ) {
312+
this._queueObserver(observer);
313+
this._packer.packStruct( RESET );
314+
this._chunker.messageBoundary();
315+
}
316+
303317
/** Queue a ACK_FAILURE-message to be sent to the database */
304318
_ackFailure( observer ) {
305319
this._queueObserver(observer);
@@ -324,6 +338,11 @@ class Connection {
324338
this._chunker.flush();
325339
}
326340

341+
/** Check if this connection is in working condition */
342+
isOpen() {
343+
return !this._isBroken && this._ch._open;
344+
}
345+
327346
/**
328347
* Call close on the channel.
329348
* @param {function} cb - Function to call on close.

src/v1/session.js

+4-5
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ class Session {
4747
* @param {Object} parameters - Map with parameters to use in statement
4848
* @return {Result} - New Result
4949
*/
50-
run(statement, parameters) {
50+
run(statement, parameters = {}) {
5151
if(typeof statement === 'object' && statement.text) {
5252
parameters = statement.parameters || {};
5353
statement = statement.text;
5454
}
5555
let streamObserver = new StreamObserver();
5656
if (!this._hasTx) {
57-
this._conn.run(statement, parameters || {}, streamObserver);
57+
this._conn.run(statement, parameters, streamObserver);
5858
this._conn.pullAll(streamObserver);
5959
this._conn.sync();
6060
} else {
@@ -86,9 +86,8 @@ class Session {
8686
* @param {function()} cb - Function to be called after the session has been closed
8787
* @return
8888
*/
89-
close(cb) {
90-
this._onClose();
91-
this._conn.close(cb);
89+
close(cb=(()=>null)) {
90+
this._onClose(cb);
9291
}
9392
}
9493

test/internal/pool.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
var Pool = require('../../lib/v1/internal/pool').Pool;
2121

22-
fdescribe('Pool', function() {
22+
describe('Pool', function() {
2323
it('allocates if pool is empty', function() {
2424
// Given
2525
var counter = 0;

test/v1/examples.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ var neo4j = require("../../lib/v1");
2121

2222
var _console = console;
2323

24-
describe('transaction', function() {
24+
describe('examples', function() {
2525

2626
var driver, session, out, console;
2727

0 commit comments

Comments
 (0)