Skip to content

Commit b1ed174

Browse files
committed
Rollback transaction when HTTP session is closed
HTTP driver uses transactional Cypher endpoint instead of Bolt. It used to simply submit queries via `POST db/data/transaction/commit` in auto-commit mode. This made it impossible to cancel such queries because transaction id was not known. Canceling is needed in `Session#close()` to terminate all ongoing queries and transactions. `Session#close()` is used by neo4j-browser to terminate running queries. This commit makes `Session#close()` terminate all ongoing transactions and thus cancel all running queries. `Session#run()` now does not submit query directly via `POST db/data/transaction/commit`. Instead, it first starts a transaction via `POST db/data/transaction` and gets id of the newly started transaction. Then a query is submitted via `POST db/data/transaction/XXX` where `XXX` is the transaction id. After a successful query, separate commit request is issued via `POST db/data/transaction/XXX/commit`. After a failed query, rollback is issued via `DELETE db/data/transaction/XXX`. Transaction id is thus known during the query execution and can be used by `Session#close()` to rollback the transaction via `DELETE db/data/transaction/XXX`.
1 parent f8d3b8a commit b1ed174

7 files changed

+623
-149
lines changed
+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import StreamObserver from '../stream-observer';
21+
import HttpResponseConverter from './http-response-converter';
22+
import {Neo4jError, SERVICE_UNAVAILABLE} from '../../error';
23+
24+
export default class HttpRequestRunner {
25+
26+
constructor(url, authToken) {
27+
this._url = url;
28+
this._authToken = authToken;
29+
this._converter = new HttpResponseConverter();
30+
}
31+
32+
/**
33+
* Send a HTTP request to begin a transaction.
34+
* @return {Promise<number>} promise resolved with the transaction id or rejected with an error.
35+
*/
36+
beginTransaction() {
37+
const url = beginTransactionUrl(this._url);
38+
return sendRequest('POST', url, null, this._authToken).then(responseJson => {
39+
const neo4jError = this._converter.extractError(responseJson);
40+
if (neo4jError) {
41+
throw neo4jError;
42+
}
43+
return this._converter.extractTransactionId(responseJson);
44+
});
45+
}
46+
47+
/**
48+
* Send a HTTP request to commit a transaction.
49+
* @param {number} transactionId id of the transaction to commit.
50+
* @return {Promise<void>} promise resolved if transaction got committed or rejected when commit failed.
51+
*/
52+
commitTransaction(transactionId) {
53+
const url = commitTransactionUrl(this._url, transactionId);
54+
return sendRequest('POST', url, null, this._authToken).then(responseJson => {
55+
const neo4jError = this._converter.extractError(responseJson);
56+
if (neo4jError) {
57+
throw neo4jError;
58+
}
59+
});
60+
}
61+
62+
/**
63+
* Send a HTTP request to rollback a transaction.
64+
* @param {number} transactionId id of the transaction to rollback.
65+
* @return {Promise<void>} promise resolved if transaction got rolled back or rejected when rollback failed.
66+
*/
67+
rollbackTransaction(transactionId) {
68+
const url = transactionUrl(this._url, transactionId);
69+
return sendRequest('DELETE', url, null, this._authToken).then(responseJson => {
70+
const neo4jError = this._converter.extractError(responseJson);
71+
if (neo4jError) {
72+
throw neo4jError;
73+
}
74+
});
75+
}
76+
77+
/**
78+
* Send a HTTP request to execute a query in a transaction with the given id.
79+
* @param {number} transactionId the transaction id.
80+
* @param {string} statement the cypher query.
81+
* @param {object} parameters the cypher query parameters.
82+
* @return {Promise<StreamObserver>} a promise resolved with {@link StreamObserver} containing either records or error.
83+
*/
84+
runQuery(transactionId, statement, parameters) {
85+
const streamObserver = new StreamObserver();
86+
const url = transactionUrl(this._url, transactionId);
87+
const body = createStatementJson(statement, parameters, this._converter, streamObserver);
88+
if (!body) {
89+
// unable to encode given statement and parameters, return a failed stream observer
90+
return Promise.resolve(streamObserver);
91+
}
92+
93+
return sendRequest('POST', url, body, this._authToken).then(responseJson => {
94+
processResponseJson(responseJson, this._converter, streamObserver);
95+
}).catch(error => {
96+
streamObserver.onError(error);
97+
}).then(() => {
98+
return streamObserver;
99+
});
100+
}
101+
}
102+
103+
function sendRequest(method, url, bodyString, authToken) {
104+
try {
105+
const options = {
106+
method: method,
107+
headers: createHttpHeaders(authToken),
108+
body: bodyString
109+
};
110+
111+
return new Promise((resolve, reject) => {
112+
fetch(url, options)
113+
.then(response => response.json())
114+
.then(responseJson => resolve(responseJson))
115+
.catch(error => reject(new Neo4jError(error.message, SERVICE_UNAVAILABLE)));
116+
});
117+
} catch (e) {
118+
return Promise.reject(e);
119+
}
120+
}
121+
122+
function createHttpHeaders(authToken) {
123+
const headers = new Headers();
124+
headers.append('Accept', 'application/json; charset=UTF-8');
125+
headers.append('Content-Type', 'application/json');
126+
headers.append('Authorization', 'Basic ' + btoa(authToken.principal + ':' + authToken.credentials));
127+
return headers;
128+
}
129+
130+
function createStatementJson(statement, parameters, converter, streamObserver) {
131+
try {
132+
return createStatementJsonOrThrow(statement, parameters, converter);
133+
} catch (e) {
134+
streamObserver.onError(e);
135+
return null;
136+
}
137+
}
138+
139+
function createStatementJsonOrThrow(statement, parameters, converter) {
140+
const encodedParameters = converter.encodeStatementParameters(parameters);
141+
return JSON.stringify({
142+
statements: [{
143+
statement: statement,
144+
parameters: encodedParameters,
145+
resultDataContents: ['row', 'graph'],
146+
includeStats: true
147+
}]
148+
});
149+
}
150+
151+
function processResponseJson(responseJson, converter, streamObserver) {
152+
if (!responseJson) {
153+
// request failed and there is no response
154+
return;
155+
}
156+
157+
try {
158+
processResponseJsonOrThrow(responseJson, converter, streamObserver);
159+
} catch (e) {
160+
streamObserver.onError(e);
161+
}
162+
}
163+
164+
function processResponseJsonOrThrow(responseJson, converter, streamObserver) {
165+
const neo4jError = converter.extractError(responseJson);
166+
if (neo4jError) {
167+
streamObserver.onError(neo4jError);
168+
} else {
169+
const recordMetadata = converter.extractRecordMetadata(responseJson);
170+
streamObserver.onCompleted(recordMetadata);
171+
172+
const rawRecords = converter.extractRawRecords(responseJson);
173+
rawRecords.forEach(rawRecord => streamObserver.onNext(rawRecord));
174+
175+
const statementMetadata = converter.extractStatementMetadata(responseJson);
176+
streamObserver.onCompleted(statementMetadata);
177+
}
178+
}
179+
180+
function beginTransactionUrl(baseUrl) {
181+
return createUrl(baseUrl, '/db/data/transaction');
182+
}
183+
184+
function commitTransactionUrl(baseUrl, transactionId) {
185+
return transactionUrl(baseUrl, transactionId) + '/commit';
186+
}
187+
188+
function transactionUrl(baseUrl, transactionId) {
189+
return beginTransactionUrl(baseUrl) + '/' + transactionId;
190+
}
191+
192+
function createUrl(baseUrl, path) {
193+
return `${baseUrl.scheme}://${baseUrl.host}:${baseUrl.port}${path}`;
194+
}

src/v1/internal/http/http-data-converter.js renamed to src/v1/internal/http/http-response-converter.js

+21-11
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,16 @@
1919

2020
import {isInt} from '../../integer';
2121
import {Node, Path, PathSegment, Relationship} from '../../graph-types';
22-
import {Neo4jError, SERVICE_UNAVAILABLE} from '../../error';
22+
import {Neo4jError} from '../../error';
2323

2424
const CREDENTIALS_EXPIRED_CODE = 'Neo.ClientError.Security.CredentialsExpired';
2525

26-
export default class HttpDataConverter {
26+
export default class HttpResponseConverter {
2727

2828
encodeStatementParameters(parameters) {
2929
return encodeQueryParameters(parameters);
3030
}
3131

32-
/**
33-
* Convert network error to a {@link Neo4jError}.
34-
* @param {object} error the error to convert.
35-
* @return {Neo4jError} new driver friendly error.
36-
*/
37-
convertNetworkError(error) {
38-
return new Neo4jError(error.message, SERVICE_UNAVAILABLE);
39-
}
40-
4132
/**
4233
* Attempts to extract error from transactional cypher endpoint response and convert it to {@link Neo4jError}.
4334
* @param {object} response the response.
@@ -59,6 +50,25 @@ export default class HttpDataConverter {
5950
return null;
6051
}
6152

53+
/**
54+
* Extracts transaction id from the db/data/transaction endpoint response.
55+
* @param {object} response the response.
56+
* @return {number} the transaction id.
57+
*/
58+
extractTransactionId(response) {
59+
const commitUrl = response.commit;
60+
if (commitUrl) {
61+
// extract id 42 from commit url like 'http://localhost:7474/db/data/transaction/42/commit'
62+
const url = commitUrl.replace('/commit', '');
63+
const transactionIdString = url.substring(url.lastIndexOf('/') + 1);
64+
const transactionId = parseInt(transactionIdString, 10);
65+
if (transactionId || transactionId === 0) {
66+
return transactionId;
67+
}
68+
}
69+
throw new Neo4jError(`Unable to extract transaction id from the response JSON: ${JSON.stringify(response)}`);
70+
}
71+
6272
/**
6373
* Extracts record metadata (array of column names) from transactional cypher endpoint response.
6474
* @param {object} response the response.

src/v1/internal/http/http-session.js

+50-4
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ import {WRITE} from '../../driver';
2121
import Session from '../../session';
2222
import {assertCypherStatement} from '../util';
2323
import {Neo4jError} from '../../error';
24-
import HttpStatementRunner from './http-statement-runner';
24+
import HttpRequestRunner from './http-request-runner';
25+
import {EMPTY_CONNECTION_HOLDER} from '../connection-holder';
26+
import Result from '../../result';
2527

2628
export default class HttpSession extends Session {
2729

2830
constructor(url, authToken, config) {
2931
super(WRITE, null, null, config);
30-
this._statementRunner = new HttpStatementRunner(url, authToken);
32+
this._ongoingTransactionIds = [];
33+
this._serverInfoSupplier = createServerInfoSupplier(url);
34+
this._requestRunner = new HttpRequestRunner(url, authToken);
3135
}
3236

3337
run(statement, parameters = {}) {
@@ -37,7 +41,21 @@ export default class HttpSession extends Session {
3741
}
3842
assertCypherStatement(statement);
3943

40-
return this._statementRunner.run(statement, parameters);
44+
return this._requestRunner.beginTransaction().then(transactionId => {
45+
this._ongoingTransactionIds.push(transactionId);
46+
const queryPromise = this._requestRunner.runQuery(transactionId, statement, parameters);
47+
48+
return queryPromise.then(streamObserver => {
49+
if (streamObserver.hasFailed()) {
50+
return rollbackTransactionAfterQueryFailure(transactionId, streamObserver, this._requestRunner);
51+
} else {
52+
return commitTransactionAfterQuerySuccess(transactionId, streamObserver, this._requestRunner);
53+
}
54+
}).then(streamObserver => {
55+
this._ongoingTransactionIds = this._ongoingTransactionIds.filter(id => id !== transactionId);
56+
return new Result(streamObserver, statement, parameters, this._serverInfoSupplier, EMPTY_CONNECTION_HOLDER);
57+
});
58+
});
4159
}
4260

4361
beginTransaction() {
@@ -57,10 +75,38 @@ export default class HttpSession extends Session {
5775
}
5876

5977
close(callback = (() => null)) {
60-
callback();
78+
const rollbackAllOngoingTransactions = this._ongoingTransactionIds.map(transactionId =>
79+
rollbackTransactionSilently(transactionId, this._requestRunner)
80+
);
81+
82+
Promise.all(rollbackAllOngoingTransactions).then(() => callback());
6183
}
6284
}
6385

86+
function rollbackTransactionAfterQueryFailure(transactionId, streamObserver, requestRunner) {
87+
return rollbackTransactionSilently(transactionId, requestRunner).then(() => streamObserver);
88+
}
89+
90+
function commitTransactionAfterQuerySuccess(transactionId, streamObserver, requestRunner) {
91+
return requestRunner.commitTransaction(transactionId).catch(error => {
92+
streamObserver.onError(error);
93+
}).then(() => {
94+
return streamObserver;
95+
});
96+
}
97+
98+
function rollbackTransactionSilently(transactionId, requestRunner) {
99+
return requestRunner.rollbackTransaction(transactionId).catch(error => {
100+
// ignore all rollback errors
101+
});
102+
}
103+
104+
function createServerInfoSupplier(url) {
105+
return () => {
106+
return {server: {address: url.hostAndPort}};
107+
};
108+
}
109+
64110
function throwTransactionsNotSupported() {
65111
throw new Neo4jError('Experimental HTTP driver does not support transactions');
66112
}

0 commit comments

Comments
 (0)