Skip to content

Commit 6fc0342

Browse files
committed
Refactoring to better support new protocol versions
Connection is now less coupled with protocol messages. It is able to write an arbitrary `RequestMessage` instead of exposing functions to write messages of a specific type, like `Connection#pullAll()`. Also introduced a `BoltProtocol` abstraction which exposes only logical operations, like "initialize the connection" or "run a query". It should make it simpler to introduce new protocol version that uses different messages to execute same logical operations.
1 parent 48b437f commit 6fc0342

14 files changed

+544
-200
lines changed

src/v1/driver.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ class Driver {
132132
* @access private
133133
*/
134134
_createConnection(hostPort, release) {
135-
let conn = connect(hostPort, this._config, this._connectionErrorCode(), this._log);
136-
let streamObserver = new _ConnectionStreamObserver(this, conn);
137-
conn.initialize(this._userAgent, this._token, streamObserver);
135+
const conn = connect(hostPort, this._config, this._connectionErrorCode(), this._log);
136+
const streamObserver = new _ConnectionStreamObserver(this, conn);
137+
conn.protocol().initialize(this._userAgent, this._token, streamObserver);
138138
conn._release = () => release(hostPort, conn);
139139

140140
this._openConnections[conn.id] = conn;

src/v1/internal/bolt-protocol.js

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import RequestMessage from './request-message';
20+
21+
export default class BoltProtocol {
22+
23+
/**
24+
* @constructor
25+
* @param {Connection} connection the connection.
26+
* @param {Packer} packer the packer.
27+
* @param {Unpacker} unpacker the unpacker.
28+
*/
29+
constructor(connection, packer, unpacker) {
30+
this._connection = connection;
31+
this._packer = packer;
32+
this._unpacker = unpacker;
33+
}
34+
35+
/**
36+
* Get the packer.
37+
* @return {Packer} the protocol's packer.
38+
*/
39+
packer() {
40+
return this._packer;
41+
}
42+
43+
/**
44+
* Get the unpacker.
45+
* @return {Unpacker} the protocol's unpacker.
46+
*/
47+
unpacker() {
48+
return this._unpacker;
49+
}
50+
51+
/**
52+
* Perform initialization and authentication of the underlying connection.
53+
* @param {string} clientName the client name.
54+
* @param {object} authToken the authentication token.
55+
* @param {StreamObserver} observer the response observer.
56+
*/
57+
initialize(clientName, authToken, observer) {
58+
const message = RequestMessage.init(clientName, authToken);
59+
this._connection.write(message, observer, true);
60+
}
61+
62+
/**
63+
* Send a Cypher statement through the underlying connection.
64+
* @param {string} statement the cypher statement.
65+
* @param {object} parameters the statement parameters.
66+
* @param {StreamObserver} observer the response observer.
67+
*/
68+
run(statement, parameters, observer) {
69+
const runMessage = RequestMessage.run(statement, parameters);
70+
const pullAllMessage = RequestMessage.pullAll();
71+
72+
this._connection.write(runMessage, observer, false);
73+
this._connection.write(pullAllMessage, observer, true);
74+
}
75+
76+
/**
77+
* Send a RESET through the underlying connection.
78+
* @param {StreamObserver} observer the response observer.
79+
*/
80+
reset(observer) {
81+
const message = RequestMessage.reset();
82+
this._connection.write(message, observer, true);
83+
}
84+
}

src/v1/internal/buf.js

+2-3
Original file line numberDiff line numberDiff line change
@@ -578,11 +578,10 @@ try {
578578
} catch(e) {}
579579

580580
/**
581-
* Allocate a new buffer using whatever mechanism is most sensible for the
582-
* current platform
581+
* Allocate a new buffer using whatever mechanism is most sensible for the current platform.
583582
* @access private
584583
* @param {Integer} size
585-
* @return new buffer
584+
* @return {BaseBuffer} new buffer
586585
*/
587586
function alloc (size) {
588587
return new _DefaultBuffer(size);

src/v1/internal/connector.js

+52-96
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import StreamObserver from './stream-observer';
2727
import {ServerVersion, VERSION_3_2_0} from './server-version';
2828
import Logger from './logger';
2929
import ProtocolHandshaker from './protocol-handshaker';
30+
import RequestMessage from './request-message';
3031

3132
let Channel;
3233
if( NodeChannel.available ) {
@@ -40,13 +41,7 @@ else {
4041
}
4142

4243

43-
// Signature bytes for each message type
44-
const INIT = 0x01; // 0000 0001 // INIT <user_agent>
45-
const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused
46-
const RESET = 0x0F; // 0000 1111 // RESET
47-
const RUN = 0x10; // 0001 0000 // RUN <statement> <parameters>
48-
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD *
49-
const PULL_ALL = 0x3F; // 0011 1111 // PULL *
44+
// Signature bytes for each response message type
5045
const SUCCESS = 0x70; // 0111 0000 // SUCCESS <metadata>
5146
const RECORD = 0x71; // 0111 0001 // RECORD <value>
5247
const IGNORED = 0x7E; // 0111 1110 // IGNORED <metadata>
@@ -97,11 +92,9 @@ class Connection {
9792
this._chunker = new Chunker( channel );
9893
this._log = log;
9994

100-
const protocolHandshaker = new ProtocolHandshaker(this._ch, this._chunker, this._disableLosslessIntegers, this._log);
101-
95+
const protocolHandshaker = new ProtocolHandshaker(this, channel, this._chunker, this._disableLosslessIntegers, this._log);
10296
// initially assume that database supports latest Bolt version, create latest packer and unpacker
103-
this._packer = protocolHandshaker.createLatestPacker();
104-
this._unpacker = protocolHandshaker.createLatestUnpacker();
97+
this._protocol = protocolHandshaker.createLatestProtocol();
10598

10699
this._currentFailure = null;
107100

@@ -128,7 +121,7 @@ class Connection {
128121
}
129122

130123
this._dechunker.onmessage = (buf) => {
131-
this._handleMessage(this._unpacker.unpack(buf));
124+
this._handleMessage(this._protocol.unpacker().unpack(buf));
132125
};
133126

134127
if (this._log.isDebugEnabled()) {
@@ -138,6 +131,45 @@ class Connection {
138131
protocolHandshaker.writeHandshakeRequest();
139132
}
140133

134+
/**
135+
* Get the Bolt protocol for the connection.
136+
* @return {BoltProtocol} the protocol.
137+
*/
138+
protocol() {
139+
return this._protocol;
140+
}
141+
142+
/**
143+
* Write a message to the network channel.
144+
* @param {RequestMessage} message the message to write.
145+
* @param {StreamObserver} observer the response observer.
146+
* @param {boolean} flush <code>true</code> if flush should happen after the message is written to the buffer.
147+
*/
148+
write(message, observer, flush) {
149+
if (message.isInitializationMessage) {
150+
observer = this._state.wrap(observer);
151+
}
152+
153+
const queued = this._queueObserver(observer);
154+
155+
if (queued) {
156+
if (this._log.isDebugEnabled()) {
157+
this._log.debug(`${this} C: ${message}`);
158+
}
159+
160+
this._protocol.packer().packStruct(
161+
message.signature,
162+
message.fields.map(field => this._packable(field)),
163+
err => this._handleFatalError(err));
164+
165+
this._chunker.messageBoundary();
166+
167+
if (flush) {
168+
this._chunker.flush();
169+
}
170+
}
171+
}
172+
141173
/**
142174
* Complete protocol initialization.
143175
* @param {BaseBuffer} buffer the handshake response buffer.
@@ -146,11 +178,8 @@ class Connection {
146178
*/
147179
_initializeProtocol(buffer, protocolHandshaker) {
148180
try {
149-
const {packer, unpacker} = protocolHandshaker.readHandshakeResponse(buffer);
150-
151-
// re-assign packer and unpacker because version might be lower than we initially assumed
152-
this._packer = packer;
153-
this._unpacker = unpacker;
181+
// re-assign the protocol because version might be lower than we initially assumed
182+
this._protocol = protocolHandshaker.readHandshakeResponse(buffer);
154183

155184
// Ok, protocol running. Simply forward all messages to the dechunker
156185
this._ch.onmessage = buf => this._dechunker.write(buf);
@@ -247,66 +276,13 @@ class Connection {
247276
}
248277
}
249278

250-
/** Queue an INIT-message to be sent to the database */
251-
initialize( clientName, token, observer ) {
252-
if (this._log.isDebugEnabled()) {
253-
this._log.debug(`${this} C: INIT ${clientName} {...}`);
254-
}
255-
const initObserver = this._state.wrap(observer);
256-
const queued = this._queueObserver(initObserver);
257-
if (queued) {
258-
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
259-
(err) => this._handleFatalError(err));
260-
this._chunker.messageBoundary();
261-
this.flush();
262-
}
263-
}
264-
265-
/** Queue a RUN-message to be sent to the database */
266-
run( statement, params, observer ) {
267-
if (this._log.isDebugEnabled()) {
268-
this._log.debug(`${this} C: RUN ${statement} ${JSON.stringify(params)}`);
269-
}
270-
const queued = this._queueObserver(observer);
271-
if (queued) {
272-
this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)],
273-
(err) => this._handleFatalError(err));
274-
this._chunker.messageBoundary();
275-
}
276-
}
277-
278-
/** Queue a PULL_ALL-message to be sent to the database */
279-
pullAll( observer ) {
280-
if (this._log.isDebugEnabled()) {
281-
this._log.debug(`${this} C: PULL_ALL`);
282-
}
283-
const queued = this._queueObserver(observer);
284-
if (queued) {
285-
this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err));
286-
this._chunker.messageBoundary();
287-
}
288-
}
289-
290-
/** Queue a DISCARD_ALL-message to be sent to the database */
291-
discardAll( observer ) {
292-
if (this._log.isDebugEnabled()) {
293-
this._log.debug(`${this} C: DISCARD_ALL`);
294-
}
295-
const queued = this._queueObserver(observer);
296-
if (queued) {
297-
this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err));
298-
this._chunker.messageBoundary();
299-
}
300-
}
301-
302279
/**
303-
* Send a RESET-message to the database. Mutes failure handling.
304-
* Message is immediately flushed to the network. Separate {@link Connection#flush()} call is not required.
280+
* Send a RESET-message to the database. Message is immediately flushed to the network.
305281
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
306282
*/
307283
resetAndFlush() {
308284
return new Promise((resolve, reject) => {
309-
this._reset({
285+
this._protocol.reset({
310286
onNext: record => {
311287
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
312288
reject(neo4jError);
@@ -328,7 +304,7 @@ class Connection {
328304
}
329305

330306
_resetOnFailure() {
331-
this._reset({
307+
this._protocol.reset({
332308
onNext: record => {
333309
this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
334310
},
@@ -342,19 +318,6 @@ class Connection {
342318
});
343319
}
344320

345-
_reset(observer) {
346-
if (this._log.isDebugEnabled()) {
347-
this._log.debug(`${this} C: RESET`);
348-
}
349-
350-
const queued = this._queueObserver(observer);
351-
if (queued) {
352-
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
353-
this._chunker.messageBoundary();
354-
this.flush();
355-
}
356-
}
357-
358321
_queueObserver(observer) {
359322
if( this._isBroken ) {
360323
if( observer && observer.onError ) {
@@ -376,7 +339,7 @@ class Connection {
376339

377340
/**
378341
* Get promise resolved when connection initialization succeed or rejected when it fails.
379-
* Connection is initialized using {@link initialize} function.
342+
* Connection is initialized using {@link BoltProtocol#initialize()} function.
380343
* @return {Promise<Connection>} the result of connection initialization.
381344
*/
382345
initializationCompleted() {
@@ -391,13 +354,6 @@ class Connection {
391354
this._currentObserver = this._pendingObservers.shift();
392355
}
393356

394-
/**
395-
* Flush all queued outgoing messages.
396-
*/
397-
flush() {
398-
this._chunker.flush();
399-
}
400-
401357
/** Check if this connection is in working condition */
402358
isOpen() {
403359
return !this._isBroken && this._ch._open;
@@ -419,7 +375,7 @@ class Connection {
419375
}
420376

421377
_packable(value) {
422-
return this._packer.packable(value, (err) => this._handleFatalError(err));
378+
return this._protocol.packer().packable(value, (err) => this._handleFatalError(err));
423379
}
424380

425381
/**
@@ -431,7 +387,7 @@ class Connection {
431387
this.server.version = serverVersion;
432388
const version = ServerVersion.fromString(serverVersion);
433389
if (version.compareTo(VERSION_3_2_0) < 0) {
434-
this._packer.disableByteArrays();
390+
this._protocol.packer().disableByteArrays();
435391
}
436392
}
437393
}

0 commit comments

Comments
 (0)