Skip to content

Commit 100032a

Browse files
committed
Expose transaction operations on BoltProtocol
Make it expose logical operations for explicit transactions like begin, commit and rollback. These operations require different set of messages in Bolt V3. Having operations exposed like this should make it easier to override them.
1 parent 6fc0342 commit 100032a

File tree

5 files changed

+109
-19
lines changed

5 files changed

+109
-19
lines changed

src/v1/internal/bolt-protocol.js renamed to src/v1/internal/bolt-protocol-v1.js

+29
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,35 @@ export default class BoltProtocol {
5959
this._connection.write(message, observer, true);
6060
}
6161

62+
/**
63+
* Begin an explicit transaction.
64+
* @param {Bookmark} bookmark the bookmark.
65+
* @param {StreamObserver} observer the response observer.
66+
*/
67+
beginTransaction(bookmark, observer) {
68+
const runMessage = RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters());
69+
const pullAllMessage = RequestMessage.pullAll();
70+
71+
this._connection.write(runMessage, observer, false);
72+
this._connection.write(pullAllMessage, observer, false);
73+
}
74+
75+
/**
76+
* Commit the explicit transaction.
77+
* @param {StreamObserver} observer the response observer.
78+
*/
79+
commitTransaction(observer) {
80+
this.run('COMMIT', {}, observer);
81+
}
82+
83+
/**
84+
* Rollback the explicit transaction.
85+
* @param {StreamObserver} observer the response observer.
86+
*/
87+
rollbackTransaction(observer) {
88+
this.run('ROLLBACK', {}, observer);
89+
}
90+
6291
/**
6392
* Send a Cypher statement through the underlying connection.
6493
* @param {string} statement the cypher statement.

src/v1/internal/protocol-handshaker.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {alloc} from './buf';
2121
import {newError} from '../error';
2222
import * as v1 from './packstream-v1';
2323
import * as v2 from './packstream-v2';
24-
import BoltProtocol from './bolt-protocol';
24+
import BoltProtocol from './bolt-protocol-v1';
2525

2626
const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP"
2727
const BOLT_MAGIC_PREAMBLE = 0x6060B017;

src/v1/transaction.js

+18-7
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class Transaction {
4141
const streamObserver = new _TransactionStreamObserver(this);
4242

4343
this._connectionHolder.getConnection(streamObserver)
44-
.then(conn => conn.protocol().run('BEGIN', bookmark.asBeginTransactionParameters(), streamObserver))
44+
.then(conn => conn.protocol().beginTransaction(bookmark, streamObserver))
4545
.catch(error => streamObserver.onError(error));
4646

4747
this._state = _states.ACTIVE;
@@ -151,11 +151,16 @@ let _states = {
151151
//The transaction is running with no explicit success or failure marked
152152
ACTIVE: {
153153
commit: (connectionHolder, observer) => {
154-
return {result: _runPullAll("COMMIT", connectionHolder, observer),
155-
state: _states.SUCCEEDED}
154+
return {
155+
result: finishTransaction(true, connectionHolder, observer),
156+
state: _states.SUCCEEDED
157+
};
156158
},
157159
rollback: (connectionHolder, observer) => {
158-
return {result: _runPullAll("ROLLBACK", connectionHolder, observer), state: _states.ROLLED_BACK};
160+
return {
161+
result: finishTransaction(false, connectionHolder, observer),
162+
state: _states.ROLLED_BACK
163+
};
159164
},
160165
run: (connectionHolder, observer, statement, parameters) => {
161166
connectionHolder.getConnection(observer)
@@ -233,14 +238,20 @@ let _states = {
233238
}
234239
};
235240

236-
function _runPullAll(msg, connectionHolder, observer) {
241+
function finishTransaction(commit, connectionHolder, observer) {
237242
connectionHolder.getConnection(observer)
238-
.then(conn => conn.protocol().run(msg, {}, observer))
243+
.then(connection => {
244+
if (commit) {
245+
return connection.protocol().commitTransaction(observer);
246+
} else {
247+
return connection.protocol().rollbackTransaction(observer);
248+
}
249+
})
239250
.catch(error => observer.onError(error));
240251

241252
// for commit & rollback we need result that uses real connection holder and notifies it when
242253
// connection is not needed and can be safely released to the pool
243-
return new Result(observer, msg, {}, emptyMetadataSupplier, connectionHolder);
254+
return new Result(observer, '', {}, emptyMetadataSupplier, connectionHolder);
244255
}
245256

246257
/**

test/internal/bolt-protocol.test.js renamed to test/internal/bolt-protocol-v1.test.js

+60-10
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
* limitations under the License.
1818
*/
1919

20-
import BoltProtocol from '../../src/v1/internal/bolt-protocol';
20+
import BoltProtocol from '../../src/v1/internal/bolt-protocol-v1';
2121
import RequestMessage from '../../src/v1/internal/request-message';
22+
import Bookmark from '../../src/v1/internal/bookmark';
2223

2324
class MessageRecorder {
2425

@@ -55,8 +56,8 @@ describe('BoltProtocol', () => {
5556

5657
recorder.verifyMessageCount(1);
5758
verifyMessage(RequestMessage.init(clientName, authToken), recorder.messages[0]);
58-
expect(recorder.observers[0]).toBe(observer);
59-
expect(recorder.flushes[0]).toBeTruthy();
59+
expect(recorder.observers).toEqual([observer]);
60+
expect(recorder.flushes).toEqual([true]);
6061
});
6162

6263
it('should run a statement', () => {
@@ -74,11 +75,8 @@ describe('BoltProtocol', () => {
7475
verifyMessage(RequestMessage.run(statement, parameters), recorder.messages[0]);
7576
verifyMessage(RequestMessage.pullAll(), recorder.messages[1]);
7677

77-
expect(recorder.observers[0]).toBe(observer);
78-
expect(recorder.observers[1]).toBe(observer);
79-
80-
expect(recorder.flushes[0]).toBeFalsy();
81-
expect(recorder.flushes[1]).toBeTruthy();
78+
expect(recorder.observers).toEqual([observer, observer]);
79+
expect(recorder.flushes).toEqual([false, true]);
8280
});
8381

8482
it('should reset the connection', () => {
@@ -91,8 +89,60 @@ describe('BoltProtocol', () => {
9189

9290
recorder.verifyMessageCount(1);
9391
verifyMessage(RequestMessage.reset(), recorder.messages[0]);
94-
expect(recorder.observers[0]).toBe(observer);
95-
expect(recorder.flushes[0]).toBeTruthy();
92+
expect(recorder.observers).toEqual([observer]);
93+
expect(recorder.flushes).toEqual([true]);
94+
});
95+
96+
it('should begin a transaction', () => {
97+
const recorder = new MessageRecorder();
98+
const protocol = new BoltProtocol(recorder, null, null);
99+
100+
const bookmark = new Bookmark('neo4j:bookmark:v1:tx42');
101+
const observer = {};
102+
103+
protocol.beginTransaction(bookmark, observer);
104+
105+
recorder.verifyMessageCount(2);
106+
107+
verifyMessage(RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters()), recorder.messages[0]);
108+
verifyMessage(RequestMessage.pullAll(), recorder.messages[1]);
109+
110+
expect(recorder.observers).toEqual([observer, observer]);
111+
expect(recorder.flushes).toEqual([false, false]);
112+
});
113+
114+
it('should commit a transaction', () => {
115+
const recorder = new MessageRecorder();
116+
const protocol = new BoltProtocol(recorder, null, null);
117+
118+
const observer = {};
119+
120+
protocol.commitTransaction(observer);
121+
122+
recorder.verifyMessageCount(2);
123+
124+
verifyMessage(RequestMessage.run('COMMIT', {}), recorder.messages[0]);
125+
verifyMessage(RequestMessage.pullAll(), recorder.messages[1]);
126+
127+
expect(recorder.observers).toEqual([observer, observer]);
128+
expect(recorder.flushes).toEqual([false, true]);
129+
});
130+
131+
it('should rollback a transaction', () => {
132+
const recorder = new MessageRecorder();
133+
const protocol = new BoltProtocol(recorder, null, null);
134+
135+
const observer = {};
136+
137+
protocol.rollbackTransaction(observer);
138+
139+
recorder.verifyMessageCount(2);
140+
141+
verifyMessage(RequestMessage.run('ROLLBACK', {}), recorder.messages[0]);
142+
verifyMessage(RequestMessage.pullAll(), recorder.messages[1]);
143+
144+
expect(recorder.observers).toEqual([observer, observer]);
145+
expect(recorder.flushes).toEqual([false, true]);
96146
});
97147

98148
});

test/internal/protocol-handshaker.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import ProtocolHandshaker from '../../src/v1/internal/protocol-handshaker';
2121
import Logger from '../../src/v1/internal/logger';
22-
import BoltProtocol from '../../src/v1/internal/bolt-protocol';
22+
import BoltProtocol from '../../src/v1/internal/bolt-protocol-v1';
2323
import {alloc} from '../../src/v1/internal/buf';
2424

2525
describe('ProtocolHandshaker', () => {

0 commit comments

Comments
 (0)