Skip to content

Commit 7a0a398

Browse files
committed
Expose Bolt V3 features in the API
Bolt V3 allows additional metadata to be attached to BEGIN and RUN messages. This makes it possible to expose a couple new features in the API. Auto-commit transactions executed via `Session#run()` now support bookmarks. In previous protocol versions, there was no good place in the RUN message to attach bookmarks. Auto-commit transactions will now use bookmarks given in `Driver#session(bookmarkOrBookmarks)` function and participate in causal chaining within a session. Transactions functions, auto-commit, and explicit transactions now accept a configuration object with transaction timeout and metadata. Example of a configuration object: ``` { timeout: 3000, // 3 seconds metadata: {key1: 42, key2: '42'} } ``` where timeout is specified in milliseconds and metadata is an object containing valid Cypher types. Transactions that execute longer than the configured timeout will be terminated by the database. This functionality allows limiting query/transaction execution time. Specified timeout overrides the default timeout configured in the database using `dbms.transaction.timeout` setting. Examples: Specified transaction metadata will be attached to the executing transaction and visible in the output of `dbms.listQueries` and `dbms.listTransactions` procedures. It will also get logged to the `query.log`. This functionality makes it easier to tag transactions and is equivalent to `dbms.setTXMetaData` procedure. Examples: ``` var driver = neo4j.driver(...); var session = driver.session(); var txConfig = { timeout: 5000, // 5 seconds metadata: { type: 'My Query', application: 'My App neo4j#1', sequence_number: 42 } }; // transaction configuration for auto-commit transaction session.run('RETURN $x', {x: 42}, txConfig) .then(...) .catch(...) // transaction configuration for explicit transaction var tx = session.beginTransaction(txConfig); tx.run(...); // transaction configuration for read transaction function session.readTransaction(tx => tx.run('RETURN $x', {x: 42}), txConfig) .then(...) .catch(...) // transaction configuration for write transaction function session.writeTransaction(tx => tx.run('RETURN $x', {x: 42}), txConfig) .then(...) .catch(...) ```
1 parent 6338fc2 commit 7a0a398

20 files changed

+803
-49
lines changed

src/v1/driver.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class Driver {
192192
session(mode, bookmarkOrBookmarks) {
193193
const sessionMode = Driver._validateSessionMode(mode);
194194
const connectionProvider = this._getOrCreateConnectionProvider();
195-
const bookmark = new Bookmark(bookmarkOrBookmarks);
195+
const bookmark = bookmarkOrBookmarks ? new Bookmark(bookmarkOrBookmarks) : Bookmark.empty();
196196
return new Session(sessionMode, connectionProvider, bookmark, this._config);
197197
}
198198

src/v1/internal/bolt-protocol-v1.js

+32-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
import RequestMessage from './request-message';
2020
import * as v1 from './packstream-v1';
21+
import {newError} from '../error';
22+
import Bookmark from './bookmark';
23+
import TxConfig from './tx-config';
2124

2225
export default class BoltProtocol {
2326

@@ -72,9 +75,12 @@ export default class BoltProtocol {
7275
/**
7376
* Begin an explicit transaction.
7477
* @param {Bookmark} bookmark the bookmark.
78+
* @param {TxConfig} txConfig the configuration.
7579
* @param {StreamObserver} observer the response observer.
7680
*/
77-
beginTransaction(bookmark, observer) {
81+
beginTransaction(bookmark, txConfig, observer) {
82+
assertTxConfigIsEmpty(txConfig, this._connection, observer);
83+
7884
const runMessage = RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters());
7985
const pullAllMessage = RequestMessage.pullAll();
8086

@@ -87,24 +93,29 @@ export default class BoltProtocol {
8793
* @param {StreamObserver} observer the response observer.
8894
*/
8995
commitTransaction(observer) {
90-
this.run('COMMIT', {}, observer);
96+
this.run('COMMIT', {}, Bookmark.empty(), TxConfig.empty(), observer);
9197
}
9298

9399
/**
94100
* Rollback the explicit transaction.
95101
* @param {StreamObserver} observer the response observer.
96102
*/
97103
rollbackTransaction(observer) {
98-
this.run('ROLLBACK', {}, observer);
104+
this.run('ROLLBACK', {}, Bookmark.empty(), TxConfig.empty(), observer);
99105
}
100106

101107
/**
102108
* Send a Cypher statement through the underlying connection.
103109
* @param {string} statement the cypher statement.
104110
* @param {object} parameters the statement parameters.
111+
* @param {Bookmark} bookmark the bookmark.
112+
* @param {TxConfig} txConfig the auto-commit transaction configuration.
105113
* @param {StreamObserver} observer the response observer.
106114
*/
107-
run(statement, parameters, observer) {
115+
run(statement, parameters, bookmark, txConfig, observer) {
116+
// bookmark is ignored in this version of the protocol
117+
assertTxConfigIsEmpty(txConfig, this._connection, observer);
118+
108119
const runMessage = RequestMessage.run(statement, parameters);
109120
const pullAllMessage = RequestMessage.pullAll();
110121

@@ -129,3 +140,20 @@ export default class BoltProtocol {
129140
return new v1.Unpacker(disableLosslessIntegers);
130141
}
131142
}
143+
144+
/**
145+
* @param {TxConfig} txConfig the auto-commit transaction configuration.
146+
* @param {Connection} connection the connection.
147+
* @param {StreamObserver} observer the response observer.
148+
*/
149+
function assertTxConfigIsEmpty(txConfig, connection, observer) {
150+
if (!txConfig.isEmpty()) {
151+
const error = newError('Driver is connected to the database that does not support transaction configuration. ' +
152+
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality');
153+
154+
// unsupported API was used, consider this a fatal error for the current connection
155+
connection._handleFatalError(error);
156+
observer.onError(error);
157+
throw error;
158+
}
159+
}

src/v1/internal/bolt-protocol-v3.js

+4-5
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
4747
this._connection.write(message, observer, true);
4848
}
4949

50-
beginTransaction(bookmark, observer) {
50+
beginTransaction(bookmark, txConfig, observer) {
5151
prepareToHandleSingleResponse(observer);
52-
const message = RequestMessage.begin(bookmark);
52+
const message = RequestMessage.begin(bookmark, txConfig);
5353
this._connection.write(message, observer, true);
5454
}
5555

@@ -65,9 +65,8 @@ export default class BoltProtocol extends BoltProtocolV2 {
6565
this._connection.write(message, observer, true);
6666
}
6767

68-
run(statement, parameters, observer) {
69-
const metadata = {};
70-
const runMessage = RequestMessage.runWithMetadata(statement, parameters, metadata);
68+
run(statement, parameters, bookmark, txConfig, observer) {
69+
const runMessage = RequestMessage.runWithMetadata(statement, parameters, bookmark, txConfig);
7170
const pullAllMessage = RequestMessage.pullAll();
7271

7372
this._connection.write(runMessage, observer, false);

src/v1/internal/bookmark.js

+6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ export default class Bookmark {
3636
this._maxValue = maxBookmark(this._values);
3737
}
3838

39+
static empty() {
40+
return EMPTY_BOOKMARK;
41+
}
42+
3943
/**
4044
* Check if the given bookmark is meaningful and can be send to the database.
4145
* @return {boolean} returns <code>true</code> bookmark has a value, <code>false</code> otherwise.
@@ -80,6 +84,8 @@ export default class Bookmark {
8084
}
8185
}
8286

87+
const EMPTY_BOOKMARK = new Bookmark(null);
88+
8389
/**
8490
* Converts given value to an array.
8591
* @param {string|string[]} [value=undefined] argument to convert.

src/v1/internal/connection.js

-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ export default class Connection {
225225
* failing, and the connection getting ejected from the session pool.
226226
*
227227
* @param error an error object, forwarded to all current and future subscribers
228-
* @protected
229228
*/
230229
_handleFatalError(error) {
231230
this._isBroken = true;

src/v1/internal/request-message.js

+27-4
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ export default class RequestMessage {
8888
/**
8989
* Create a new BEGIN message.
9090
* @param {Bookmark} bookmark the bookmark.
91+
* @param {TxConfig} txConfig the configuration.
9192
* @return {RequestMessage} new BEGIN message.
9293
*/
93-
static begin(bookmark) {
94-
const metadata = {bookmarks: bookmark.values()};
94+
static begin(bookmark, txConfig) {
95+
const metadata = buildTxMetadata(bookmark, txConfig);
9596
return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`);
9697
}
9798

@@ -115,15 +116,37 @@ export default class RequestMessage {
115116
* Create a new RUN message with additional metadata.
116117
* @param {string} statement the cypher statement.
117118
* @param {object} parameters the statement parameters.
118-
* @param {object} metadata the additional metadata.
119+
* @param {Bookmark} bookmark the bookmark.
120+
* @param {TxConfig} txConfig the configuration.
119121
* @return {RequestMessage} new RUN message with additional metadata.
120122
*/
121-
static runWithMetadata(statement, parameters, metadata) {
123+
static runWithMetadata(statement, parameters, bookmark, txConfig) {
124+
const metadata = buildTxMetadata(bookmark, txConfig);
122125
return new RequestMessage(RUN, [statement, parameters, metadata],
123126
() => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`);
124127
}
125128
}
126129

130+
/**
131+
* Create an object that represent transaction metadata.
132+
* @param {Bookmark} bookmark the bookmark.
133+
* @param {TxConfig} txConfig the configuration.
134+
* @return {object} a metadata object.
135+
*/
136+
function buildTxMetadata(bookmark, txConfig) {
137+
const metadata = {};
138+
if (!bookmark.isEmpty()) {
139+
metadata['bookmarks'] = bookmark.values();
140+
}
141+
if (txConfig.timeout) {
142+
metadata['tx_timeout'] = txConfig.timeout;
143+
}
144+
if (txConfig.metadata) {
145+
metadata['tx_metadata'] = txConfig.metadata;
146+
}
147+
return metadata;
148+
}
149+
127150
// constants for messages that never change
128151
const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL');
129152
const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET');

src/v1/internal/routing-util.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error';
2121
import Integer, {int} from '../integer';
2222
import {ServerVersion, VERSION_3_2_0} from './server-version';
23+
import Bookmark from './bookmark';
24+
import TxConfig from './tx-config';
2325

2426
const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers';
2527
const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable($context)';
@@ -123,7 +125,7 @@ export default class RoutingUtil {
123125
params = {};
124126
}
125127

126-
connection.protocol().run(query, params, streamObserver);
128+
connection.protocol().run(query, params, Bookmark.empty(), TxConfig.empty(), streamObserver);
127129
});
128130
}
129131
}

src/v1/internal/server-version.js

+2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ function compareInts(x, y) {
112112
const VERSION_3_1_0 = new ServerVersion(3, 1, 0);
113113
const VERSION_3_2_0 = new ServerVersion(3, 2, 0);
114114
const VERSION_3_4_0 = new ServerVersion(3, 4, 0);
115+
const VERSION_3_5_0 = new ServerVersion(3, 5, 0);
115116
const maxVer = Number.MAX_SAFE_INTEGER;
116117
const VERSION_IN_DEV = new ServerVersion(maxVer, maxVer, maxVer);
117118

@@ -120,6 +121,7 @@ export {
120121
VERSION_3_1_0,
121122
VERSION_3_2_0,
122123
VERSION_3_4_0,
124+
VERSION_3_5_0,
123125
VERSION_IN_DEV
124126
};
125127

src/v1/internal/tx-config.js

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import * as util from './util';
21+
import {int} from '../integer';
22+
import {newError} from '../error';
23+
24+
/**
25+
* Internal holder of the transaction configuration.
26+
* It performs input validation and value conversion for further serialization by the Bolt protocol layer.
27+
* Users of the driver provide transaction configuration as regular objects `{timeout: 10, metadata: {key: 'value'}}`.
28+
* Driver converts such objects to {@link TxConfig} immediately and uses converted values everywhere.
29+
*/
30+
export default class TxConfig {
31+
32+
/**
33+
* @constructor
34+
* @param {object} config the raw configuration object.
35+
*/
36+
constructor(config) {
37+
assertValidConfig(config);
38+
this.timeout = extractTimeout(config);
39+
this.metadata = extractMetadata(config);
40+
}
41+
42+
/**
43+
* Get an empty config object.
44+
* @return {TxConfig} an empty config.
45+
*/
46+
static empty() {
47+
return EMPTY_CONFIG;
48+
}
49+
50+
/**
51+
* Check if this config object is empty. I.e. has no configuration values specified.
52+
* @return {boolean} `true` if this object is empty, `false` otherwise.
53+
*/
54+
isEmpty() {
55+
return Object.values(this).every(value => value == null);
56+
}
57+
}
58+
59+
const EMPTY_CONFIG = new TxConfig({});
60+
61+
/**
62+
* @return {Integer|null}
63+
*/
64+
function extractTimeout(config) {
65+
if (util.isObject(config) && (config.timeout || config.timeout === 0)) {
66+
util.assertNumberOrInteger(config.timeout, 'Transaction timeout');
67+
const timeout = int(config.timeout);
68+
if (timeout.isZero()) {
69+
throw newError('Transaction timeout should not be zero');
70+
}
71+
if (timeout.isNegative()) {
72+
throw newError('Transaction timeout should not be negative');
73+
}
74+
return timeout;
75+
}
76+
return null;
77+
}
78+
79+
/**
80+
* @return {object|null}
81+
*/
82+
function extractMetadata(config) {
83+
if (util.isObject(config) && config.metadata) {
84+
const metadata = config.metadata;
85+
util.assertObject(metadata);
86+
if (Object.keys(metadata).length !== 0) {
87+
// not an empty object
88+
return metadata;
89+
}
90+
}
91+
return null;
92+
}
93+
94+
function assertValidConfig(config) {
95+
if (config) {
96+
util.assertObject(config, 'Transaction config');
97+
}
98+
}

src/v1/internal/util.js

+9
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ function validateStatementAndParameters(statement, parameters) {
6666
return {query, params};
6767
}
6868

69+
function assertObject(obj, objName) {
70+
if (!isObject(obj)) {
71+
throw new TypeError(objName + ' expected to be an object but was: ' + JSON.stringify(obj));
72+
}
73+
return obj;
74+
}
75+
6976
function assertString(obj, objName) {
7077
if (!isString(obj)) {
7178
throw new TypeError(objName + ' expected to be string but was: ' + JSON.stringify(obj));
@@ -118,7 +125,9 @@ function isString(str) {
118125

119126
export {
120127
isEmptyObjectOrNull,
128+
isObject,
121129
isString,
130+
assertObject,
122131
assertString,
123132
assertNumber,
124133
assertNumberOrInteger,

0 commit comments

Comments
 (0)