Skip to content

Commit 48b437f

Browse files
committed
Extract protocol negotiation into a dedicated class
It encapsulates all the logic related to the Bolt protocol negotiation and versioned packstream components.
1 parent cb9ca48 commit 48b437f

File tree

4 files changed

+149
-114
lines changed

4 files changed

+149
-114
lines changed

src/v1/internal/connector.js

+31-51
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020
import WebSocketChannel from './ch-websocket';
2121
import NodeChannel from './ch-node';
2222
import {Chunker, Dechunker} from './chunking';
23-
import packStreamUtil from './packstream-util';
24-
import {alloc} from './buf';
2523
import {newError, PROTOCOL_ERROR} from './../error';
2624
import ChannelConfig from './ch-config';
2725
import urlUtil from './url-util';
2826
import StreamObserver from './stream-observer';
2927
import {ServerVersion, VERSION_3_2_0} from './server-version';
3028
import Logger from './logger';
29+
import ProtocolHandshaker from './protocol-handshaker';
3130

3231
let Channel;
3332
if( NodeChannel.available ) {
@@ -41,21 +40,17 @@ else {
4140
}
4241

4342

44-
let
4543
// Signature bytes for each message type
46-
INIT = 0x01, // 0000 0001 // INIT <user_agent>
47-
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE - unused
48-
RESET = 0x0F, // 0000 1111 // RESET
49-
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
50-
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
51-
PULL_ALL = 0x3F, // 0011 1111 // PULL *
52-
SUCCESS = 0x70, // 0111 0000 // SUCCESS <metadata>
53-
RECORD = 0x71, // 0111 0001 // RECORD <value>
54-
IGNORED = 0x7E, // 0111 1110 // IGNORED <metadata>
55-
FAILURE = 0x7F, // 0111 1111 // FAILURE <metadata>
56-
57-
//sent before version negotiation
58-
MAGIC_PREAMBLE = 0x6060B017;
44+
const INIT = 0x01; // 0000 0001 // INIT <user_agent>
45+
const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused
46+
const RESET = 0x0F; // 0000 1111 // RESET
47+
const RUN = 0x10; // 0001 0000 // RUN <statement> <parameters>
48+
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD *
49+
const PULL_ALL = 0x3F; // 0011 1111 // PULL *
50+
const SUCCESS = 0x70; // 0111 0000 // SUCCESS <metadata>
51+
const RECORD = 0x71; // 0111 0001 // RECORD <value>
52+
const IGNORED = 0x7E; // 0111 1110 // IGNORED <metadata>
53+
const FAILURE = 0x7F; // 0111 1111 // FAILURE <metadata>
5954

6055
function NO_OP(){}
6156

@@ -102,9 +97,11 @@ class Connection {
10297
this._chunker = new Chunker( channel );
10398
this._log = log;
10499

100+
const protocolHandshaker = new ProtocolHandshaker(this._ch, this._chunker, this._disableLosslessIntegers, this._log);
101+
105102
// initially assume that database supports latest Bolt version, create latest packer and unpacker
106-
this._packer = packStreamUtil.createLatestPacker(this._chunker);
107-
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
103+
this._packer = protocolHandshaker.createLatestPacker();
104+
this._unpacker = protocolHandshaker.createLatestUnpacker();
108105

109106
this._currentFailure = null;
110107

@@ -116,17 +113,7 @@ class Connection {
116113
// TODO: Using `onmessage` and `onerror` came from the WebSocket API,
117114
// it reads poorly and has several annoying drawbacks. Swap to having
118115
// Channel extend EventEmitter instead, then we can use `on('data',..)`
119-
this._ch.onmessage = (buf) => {
120-
const proposed = buf.readInt32();
121-
if (proposed == 1 || proposed == 2) {
122-
this._initializeProtocol(proposed, buf);
123-
} else if (proposed == 1213486160) {//server responded 1213486160 == 0x48545450 == "HTTP"
124-
this._handleFatalError(newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
125-
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'));
126-
} else {
127-
this._handleFatalError(newError('Unknown Bolt protocol version: ' + proposed));
128-
}
129-
};
116+
this._ch.onmessage = buffer => this._initializeProtocol(buffer, protocolHandshaker);
130117

131118
// Listen to connection errors. Important note though;
132119
// In some cases we will get a channel that is already broken (for instance,
@@ -148,38 +135,31 @@ class Connection {
148135
this._log.debug(`${this} created towards ${hostPort}`);
149136
}
150137

151-
let handshake = alloc( 5 * 4 );
152-
//magic preamble
153-
handshake.writeInt32( MAGIC_PREAMBLE );
154-
//proposed versions
155-
handshake.writeInt32( 2 );
156-
handshake.writeInt32( 1 );
157-
handshake.writeInt32( 0 );
158-
handshake.writeInt32( 0 );
159-
handshake.reset();
160-
this._ch.write( handshake );
138+
protocolHandshaker.writeHandshakeRequest();
161139
}
162140

163141
/**
164142
* Complete protocol initialization.
165-
* @param {number} version the selected protocol version.
166143
* @param {BaseBuffer} buffer the handshake response buffer.
144+
* @param {ProtocolHandshaker} protocolHandshaker the handshaker utility.
167145
* @private
168146
*/
169-
_initializeProtocol(version, buffer) {
170-
if (this._log.isDebugEnabled()) {
171-
this._log.debug(`${this} negotiated protocol version ${version}`);
172-
}
147+
_initializeProtocol(buffer, protocolHandshaker) {
148+
try {
149+
const {packer, unpacker} = protocolHandshaker.readHandshakeResponse(buffer);
173150

174-
// re-create packer and unpacker because version might be lower than we initially assumed
175-
this._packer = packStreamUtil.createPackerForProtocolVersion(version, this._chunker);
176-
this._unpacker = packStreamUtil.createUnpackerForProtocolVersion(version, this._disableLosslessIntegers);
151+
// re-assign packer and unpacker because version might be lower than we initially assumed
152+
this._packer = packer;
153+
this._unpacker = unpacker;
177154

178-
// Ok, protocol running. Simply forward all messages to the dechunker
179-
this._ch.onmessage = buf => this._dechunker.write(buf);
155+
// Ok, protocol running. Simply forward all messages to the dechunker
156+
this._ch.onmessage = buf => this._dechunker.write(buf);
180157

181-
if (buffer.hasRemaining()) {
182-
this._dechunker.write(buffer.readSlice(buffer.remaining()));
158+
if (buffer.hasRemaining()) {
159+
this._dechunker.write(buffer.readSlice(buffer.remaining()));
160+
}
161+
} catch (e) {
162+
this._handleFatalError(e);
183163
}
184164
}
185165

src/v1/internal/packstream-util.js

-55
This file was deleted.
+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {alloc} from './buf';
21+
import {newError} from '../error';
22+
import * as v1 from './packstream-v1';
23+
import * as v2 from './packstream-v2';
24+
25+
const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP"
26+
const BOLT_MAGIC_PREAMBLE = 0x6060B017;
27+
28+
const PACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Packer, v2.Packer];
29+
const UNPACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Unpacker, v2.Unpacker];
30+
31+
export default class ProtocolHandshaker {
32+
33+
constructor(channel, chunker, disableLosslessIntegers, log) {
34+
this._channel = channel;
35+
this._chunker = chunker;
36+
this._disableLosslessIntegers = disableLosslessIntegers;
37+
this._log = log;
38+
}
39+
40+
createLatestPacker() {
41+
return this._createPackerForProtocolVersion(PACKER_CONSTRUCTORS_BY_VERSION.length - 1);
42+
}
43+
44+
createLatestUnpacker() {
45+
return this._createUnpackerForProtocolVersion(UNPACKER_CONSTRUCTORS_BY_VERSION.length - 1);
46+
}
47+
48+
writeHandshakeRequest() {
49+
this._channel.write(newHandshakeBuffer());
50+
}
51+
52+
readHandshakeResponse(buffer) {
53+
const proposedVersion = buffer.readInt32();
54+
55+
if (proposedVersion === 1 || proposedVersion === 2) {
56+
return this._createPackerAndUnpackerForProtocolVersion(proposedVersion);
57+
} else if (proposedVersion === HTTP_MAGIC_PREAMBLE) {
58+
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
59+
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
60+
} else {
61+
throw newError('Unknown Bolt protocol version: ' + proposedVersion);
62+
}
63+
}
64+
65+
_createPackerAndUnpackerForProtocolVersion(version) {
66+
if (this._log.isDebugEnabled()) {
67+
this._log.debug(`${this} negotiated protocol version ${version}`);
68+
}
69+
const packer = this._createPackerForProtocolVersion(version);
70+
const unpacker = this._createUnpackerForProtocolVersion(version);
71+
return {packer, unpacker};
72+
}
73+
74+
_createPackerForProtocolVersion(version) {
75+
const packerConstructor = PACKER_CONSTRUCTORS_BY_VERSION[version];
76+
if (!packerConstructor) {
77+
throw new Error(`Packer can't be created for protocol version ${version}`);
78+
}
79+
return new packerConstructor(this._chunker);
80+
}
81+
82+
_createUnpackerForProtocolVersion(version) {
83+
const unpackerConstructor = UNPACKER_CONSTRUCTORS_BY_VERSION[version];
84+
if (!unpackerConstructor) {
85+
throw new Error(`Unpacker can't be created for protocol version ${version}`);
86+
}
87+
return new unpackerConstructor(this._disableLosslessIntegers);
88+
}
89+
}
90+
91+
function newHandshakeBuffer() {
92+
const handshakeBuffer = alloc(5 * 4);
93+
94+
//magic preamble
95+
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE);
96+
97+
//proposed versions
98+
handshakeBuffer.writeInt32(2);
99+
handshakeBuffer.writeInt32(1);
100+
handshakeBuffer.writeInt32(0);
101+
handshakeBuffer.writeInt32(0);
102+
103+
// reset the reader position
104+
handshakeBuffer.reset();
105+
106+
return handshakeBuffer;
107+
}

test/internal/packstream-util.test.js renamed to test/internal/protocol-handshaker.test.js

+11-8
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,37 @@
1717
* limitations under the License.
1818
*/
1919

20-
import packStreamUtil from '../../src/v1/internal/packstream-util';
2120
import * as v1 from '../../src/v1/internal/packstream-v1';
2221
import * as v2 from '../../src/v1/internal/packstream-v2';
22+
import ProtocolHandshaker from '../../src/v1/internal/protocol-handshaker';
23+
import Logger from '../../src/v1/internal/logger';
2324

24-
describe('packstream-util', () => {
25+
describe('ProtocolHandshaker', () => {
26+
27+
const protocolHandshaker = new ProtocolHandshaker(null, false, Logger.noOp());
2528

2629
it('should create packer of the specified version', () => {
27-
const packer1 = packStreamUtil.createPackerForProtocolVersion(1, null);
30+
const packer1 = protocolHandshaker._createPackerForProtocolVersion(1);
2831
expect(packer1 instanceof v1.Packer).toBeTruthy();
2932

30-
const packer2 = packStreamUtil.createPackerForProtocolVersion(2, null);
33+
const packer2 = protocolHandshaker._createPackerForProtocolVersion(2);
3134
expect(packer2 instanceof v2.Packer).toBeTruthy();
3235
});
3336

3437
it('should create unpacker of the specified version', () => {
35-
const unpacker1 = packStreamUtil.createUnpackerForProtocolVersion(1, null);
38+
const unpacker1 = protocolHandshaker._createUnpackerForProtocolVersion(1);
3639
expect(unpacker1 instanceof v1.Unpacker).toBeTruthy();
3740

38-
const unpacker2 = packStreamUtil.createUnpackerForProtocolVersion(2, null);
41+
const unpacker2 = protocolHandshaker._createUnpackerForProtocolVersion(2);
3942
expect(unpacker2 instanceof v2.Unpacker).toBeTruthy();
4043
});
4144

4245
it('should fail to create packer for unknown version', () => {
43-
expect(() => packStreamUtil.createPackerForProtocolVersion(42, null)).toThrow();
46+
expect(() => protocolHandshaker._createPackerForProtocolVersion(42)).toThrow();
4447
});
4548

4649
it('should fail to create unpacker for unknown version', () => {
47-
expect(() => packStreamUtil.createUnpackerForProtocolVersion(42, null)).toThrow();
50+
expect(() => protocolHandshaker._createUnpackerForProtocolVersion(42)).toThrow();
4851
});
4952

5053
});

0 commit comments

Comments
 (0)