Skip to content

Commit eec9e45

Browse files
committed
Bolt V3 messaging part
Add protocol negotiation and messaging for Bolt V3. This commit does not expose new features in the API. It only makes code use new protocol version to execute existing operations.
1 parent 3c42269 commit eec9e45

13 files changed

+266
-23
lines changed

src/v1/internal/bolt-protocol-v1.js

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ export default class BoltProtocol {
4949
return this._unpacker;
5050
}
5151

52+
/**
53+
* Transform metadata received in SUCCESS message before it is passed to the handler.
54+
* @param {object} metadata the received metadata.
55+
* @return {object} transformed metadata.
56+
*/
57+
transformMetadata(metadata) {
58+
return metadata;
59+
}
60+
5261
/**
5362
* Perform initialization and authentication of the underlying connection.
5463
* @param {string} clientName the client name.

src/v1/internal/bolt-protocol-v3.js

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import BoltProtocolV2 from './bolt-protocol-v2';
20+
import RequestMessage from './request-message';
21+
22+
export default class BoltProtocol extends BoltProtocolV2 {
23+
24+
constructor(connection, chunker, disableLosslessIntegers) {
25+
super(connection, chunker, disableLosslessIntegers);
26+
}
27+
28+
transformMetadata(metadata) {
29+
if (metadata.t_first) {
30+
// Bolt V3 uses shorter key 't_first' to represent 'result_available_after'
31+
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
32+
metadata.result_available_after = metadata.t_first;
33+
delete metadata.t_first;
34+
}
35+
if (metadata.t_last) {
36+
// Bolt V3 uses shorter key 't_last' to represent 'result_consumed_after'
37+
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
38+
metadata.result_consumed_after = metadata.t_last;
39+
delete metadata.t_last;
40+
}
41+
return metadata;
42+
}
43+
44+
initialize(userAgent, authToken, observer) {
45+
prepareToHandleSingleResponse(observer);
46+
const message = RequestMessage.hello(userAgent, authToken);
47+
this._connection.write(message, observer, true);
48+
}
49+
50+
beginTransaction(bookmark, observer) {
51+
prepareToHandleSingleResponse(observer);
52+
const message = RequestMessage.begin(bookmark);
53+
this._connection.write(message, observer, true);
54+
}
55+
56+
commitTransaction(observer) {
57+
prepareToHandleSingleResponse(observer);
58+
const message = RequestMessage.commit();
59+
this._connection.write(message, observer, true);
60+
}
61+
62+
rollbackTransaction(observer) {
63+
prepareToHandleSingleResponse(observer);
64+
const message = RequestMessage.rollback();
65+
this._connection.write(message, observer, true);
66+
}
67+
68+
run(statement, parameters, observer) {
69+
const metadata = {};
70+
const runMessage = RequestMessage.runWithMetadata(statement, parameters, metadata);
71+
const pullAllMessage = RequestMessage.pullAll();
72+
73+
this._connection.write(runMessage, observer, false);
74+
this._connection.write(pullAllMessage, observer, true);
75+
}
76+
}
77+
78+
function prepareToHandleSingleResponse(observer) {
79+
if (observer && typeof observer.prepareToHandleSingleResponse === 'function') {
80+
observer.prepareToHandleSingleResponse();
81+
}
82+
}

src/v1/internal/bookmark.js

+8
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ export default class Bookmark {
5252
return this._maxValue;
5353
}
5454

55+
/**
56+
* Get all bookmark values as an array.
57+
* @return {string[]} all values.
58+
*/
59+
values() {
60+
return this._values;
61+
}
62+
5563
/**
5664
* Get this bookmark as an object for begin transaction call.
5765
* @return {object} the value of this bookmark as object.

src/v1/internal/connection.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,8 @@ export default class Connection {
267267
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`);
268268
}
269269
try {
270-
this._currentObserver.onCompleted( payload );
270+
const metadata = this._protocol.transformMetadata(payload);
271+
this._currentObserver.onCompleted(metadata);
271272
} finally {
272273
this._updateCurrentObserver();
273274
}

src/v1/internal/protocol-handshaker.js

+14-10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {alloc} from './buf';
2121
import {newError} from '../error';
2222
import BoltProtocolV1 from './bolt-protocol-v1';
2323
import BoltProtocolV2 from './bolt-protocol-v2';
24+
import BoltProtocolV3 from './bolt-protocol-v3';
2425

2526
const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP"
2627
const BOLT_MAGIC_PREAMBLE = 0x6060B017;
@@ -69,15 +70,18 @@ export default class ProtocolHandshaker {
6970
* @private
7071
*/
7172
_createProtocolWithVersion(version) {
72-
if (version === 1) {
73-
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
74-
} else if (version === 2) {
75-
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
76-
} else if (version === HTTP_MAGIC_PREAMBLE) {
77-
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
78-
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
79-
} else {
80-
throw newError('Unknown Bolt protocol version: ' + version);
73+
switch (version) {
74+
case 1:
75+
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
76+
case 2:
77+
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
78+
case 3:
79+
return new BoltProtocolV3(this._connection, this._chunker, this._disableLosslessIntegers);
80+
case HTTP_MAGIC_PREAMBLE:
81+
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
82+
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
83+
default:
84+
throw newError('Unknown Bolt protocol version: ' + version);
8185
}
8286
}
8387
}
@@ -93,10 +97,10 @@ function newHandshakeBuffer() {
9397
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE);
9498

9599
//proposed versions
100+
handshakeBuffer.writeInt32(3);
96101
handshakeBuffer.writeInt32(2);
97102
handshakeBuffer.writeInt32(1);
98103
handshakeBuffer.writeInt32(0);
99-
handshakeBuffer.writeInt32(0);
100104

101105
// reset the reader position
102106
handshakeBuffer.reset();

src/v1/internal/request-message.js

+59-3
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@
1818
*/
1919

2020
// Signature bytes for each request message type
21-
const INIT = 0x01; // 0000 0001 // INIT <user_agent>
21+
const INIT = 0x01; // 0000 0001 // INIT <user_agent> <authentication_token>
2222
const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused
2323
const RESET = 0x0F; // 0000 1111 // RESET
2424
const RUN = 0x10; // 0001 0000 // RUN <statement> <parameters>
25-
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD * - unused
26-
const PULL_ALL = 0x3F; // 0011 1111 // PULL *
25+
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD_ALL - unused
26+
const PULL_ALL = 0x3F; // 0011 1111 // PULL_ALL
27+
28+
const HELLO = 0x01; // 0000 0001 // HELLO <metadata>
29+
const BEGIN = 0x11; // 0001 0001 // BEGIN <metadata>
30+
const COMMIT = 0x12; // 0001 0010 // COMMIT
31+
const ROLLBACK = 0x13; // 0001 0011 // ROLLBACK
2732

2833
export default class RequestMessage {
2934

@@ -68,8 +73,59 @@ export default class RequestMessage {
6873
static reset() {
6974
return RESET_MESSAGE;
7075
}
76+
77+
/**
78+
* Create a new HELLO message.
79+
* @param {string} userAgent the user agent.
80+
* @param {object} authToken the authentication token.
81+
* @return {RequestMessage} new HELLO message.
82+
*/
83+
static hello(userAgent, authToken) {
84+
const metadata = Object.assign({user_agent: userAgent}, authToken);
85+
return new RequestMessage(HELLO, [metadata], () => `HELLO {user_agent: '${userAgent}', ...}`);
86+
}
87+
88+
/**
89+
* Create a new BEGIN message.
90+
* @param {Bookmark} bookmark the bookmark.
91+
* @return {RequestMessage} new BEGIN message.
92+
*/
93+
static begin(bookmark) {
94+
const metadata = {bookmarks: bookmark.values()};
95+
return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`);
96+
}
97+
98+
/**
99+
* Get a COMMIT message.
100+
* @return {RequestMessage} the COMMIT message.
101+
*/
102+
static commit() {
103+
return COMMIT_MESSAGE;
104+
}
105+
106+
/**
107+
* Get a ROLLBACK message.
108+
* @return {RequestMessage} the ROLLBACK message.
109+
*/
110+
static rollback() {
111+
return ROLLBACK_MESSAGE;
112+
}
113+
114+
/**
115+
* Create a new RUN message with additional metadata.
116+
* @param {string} statement the cypher statement.
117+
* @param {object} parameters the statement parameters.
118+
* @param {object} metadata the additional metadata.
119+
* @return {RequestMessage} new RUN message with additional metadata.
120+
*/
121+
static runWithMetadata(statement, parameters, metadata) {
122+
return new RequestMessage(RUN, [statement, parameters, metadata],
123+
() => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`);
124+
}
71125
}
72126

73127
// constants for messages that never change
74128
const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL');
75129
const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET');
130+
const COMMIT_MESSAGE = new RequestMessage(COMMIT, [], () => 'COMMIT');
131+
const ROLLBACK_MESSAGE = new RequestMessage(ROLLBACK, [], () => 'ROLLBACK');

src/v1/internal/stream-observer.js

+14
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,20 @@ class StreamObserver {
9999
this._conn = conn;
100100
}
101101

102+
/**
103+
* Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL.
104+
* Response for RUN initializes statement keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream.
105+
*
106+
* However, some operations can be represented as a single message which receives full metadata in a single response.
107+
* For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3.
108+
* Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3.
109+
*
110+
* This function prepares the observer to only handle a single response message.
111+
*/
112+
prepareToHandleSingleResponse() {
113+
this._fieldKeys = [];
114+
}
115+
102116
/**
103117
* Will be called on errors.
104118
* If user-provided observer is present, pass the error

test/internal/bolt-protocol-v1.test.js

+9
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ class MessageRecorder {
4444

4545
describe('BoltProtocolV1', () => {
4646

47+
it('should not change metadata', () => {
48+
const metadata = {result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4};
49+
const protocol = new BoltProtocolV1(new MessageRecorder(), null, false);
50+
51+
const transformedMetadata = protocol.transformMetadata(metadata);
52+
53+
expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4});
54+
});
55+
4756
it('should initialize the connection', () => {
4857
const recorder = new MessageRecorder();
4958
const protocol = new BoltProtocolV1(recorder, null, false);
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import BoltProtocolV3 from '../../src/v1/internal/bolt-protocol-v3';
21+
22+
describe('BoltProtocolV3', () => {
23+
24+
it('should update metadata', () => {
25+
const metadata = {t_first: 1, t_last: 2, db_hits: 3, some_other_key: 4};
26+
const protocol = new BoltProtocolV3(null, null, false);
27+
28+
const transformedMetadata = protocol.transformMetadata(metadata);
29+
30+
expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, db_hits: 3, some_other_key: 4});
31+
});
32+
33+
});

test/internal/bookmark.test.js

+11
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,15 @@ describe('Bookmark', () => {
118118
});
119119
});
120120

121+
it('should expose bookmark values', () => {
122+
expect(new Bookmark(undefined).values()).toEqual([]);
123+
expect(new Bookmark(null).values()).toEqual([]);
124+
125+
const bookmarkString = 'neo4j:bookmark:v1:tx123';
126+
expect(new Bookmark(bookmarkString).values()).toEqual([bookmarkString]);
127+
128+
const bookmarkStrings = ['neo4j:bookmark:v1:tx1', 'neo4j:bookmark:v1:tx2', 'neo4j:bookmark:v1:tx3'];
129+
expect(new Bookmark(bookmarkStrings).values()).toEqual(bookmarkStrings);
130+
});
131+
121132
});

test/internal/connection.test.js

+9-8
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import {ServerVersion} from '../../src/v1/internal/server-version';
2828
import lolex from 'lolex';
2929
import Logger from '../../src/v1/internal/logger';
3030
import StreamObserver from '../../src/v1/internal/stream-observer';
31-
import RequestMessage from '../../src/v1/internal/request-message';
3231
import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler';
3332
import testUtils from '../internal/test-utils';
3433

@@ -88,16 +87,17 @@ describe('Connection', () => {
8887
records.push(record);
8988
},
9089
onCompleted: () => {
91-
expect(records[0][0]).toBe(1);
90+
expect(records[0].get(0)).toBe(1);
9291
done();
9392
}
9493
};
94+
const streamObserver = new StreamObserver();
95+
streamObserver.subscribe(pullAllObserver);
9596

96-
connection._negotiateProtocol().then(() => {
97-
connection.protocol().initialize('mydriver/0.0.0', basicAuthToken());
98-
connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false);
99-
connection.write(RequestMessage.pullAll(), pullAllObserver, true);
100-
});
97+
connection.connect('mydriver/0.0.0', basicAuthToken())
98+
.then(() => {
99+
connection.protocol().run('RETURN 1.0', {}, streamObserver);
100+
});
101101
});
102102

103103
it('should write protocol handshake', () => {
@@ -107,10 +107,11 @@ describe('Connection', () => {
107107
connection._negotiateProtocol();
108108

109109
const boltMagicPreamble = '60 60 b0 17';
110+
const protocolVersion3 = '00 00 00 03';
110111
const protocolVersion2 = '00 00 00 02';
111112
const protocolVersion1 = '00 00 00 01';
112113
const noProtocolVersion = '00 00 00 00';
113-
expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `);
114+
expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `);
114115
});
115116

116117
it('should provide error message when connecting to http-port', done => {

test/internal/protocol-handshaker.test.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ describe('ProtocolHandshaker', () => {
3737
expect(writtenBuffers.length).toEqual(1);
3838

3939
const boltMagicPreamble = '60 60 b0 17';
40+
const protocolVersion3 = '00 00 00 03';
4041
const protocolVersion2 = '00 00 00 02';
4142
const protocolVersion1 = '00 00 00 01';
4243
const noProtocolVersion = '00 00 00 00';
4344

44-
expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `);
45+
expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `);
4546
});
4647

4748
it('should create protocol with valid version', () => {

0 commit comments

Comments
 (0)