Skip to content

Commit c6a4b24

Browse files
committed
Avoid forgetting address twice on error in routing driver
Routing driver removes address from the routing table on error. This removal is triggered by a `StreamObserver` that observes the incoming messages. Same observer is used for RUN and PULL_ALL. It used to trigger the forget operation trice on every error. This wasn't harmful but looked weird in logs. This commit fixes the problem by making `StreamObserver` invoke the provided callback only once.
1 parent 442c0dd commit c6a4b24

File tree

3 files changed

+24
-19
lines changed

3 files changed

+24
-19
lines changed

src/v1/internal/stream-observer.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,13 @@ class StreamObserver {
110110
* @param {Object} error - An error object
111111
*/
112112
onError(error) {
113-
const transformedError = this._errorTransformer(error, this._conn);
114113
if(this._hasFailed) {
115114
return;
116115
}
117116
this._hasFailed = true;
117+
118+
const transformedError = this._errorTransformer(error, this._conn);
119+
118120
if( this._observer ) {
119121
if( this._observer.onError ) {
120122
this._observer.onError( transformedError );

src/v1/transaction.js

+8-18
Original file line numberDiff line numberDiff line change
@@ -103,21 +103,14 @@ class Transaction {
103103
}
104104

105105
_onError() {
106-
if (this.isOpen()) {
107-
// attempt to rollback, useful when Transaction#run() failed
108-
return this.rollback().catch(ignoredError => {
109-
// ignore all errors because it is best effort and transaction might already be rolled back
110-
}).then(() => {
111-
// after rollback attempt change this transaction's state to FAILED
112-
this._state = _states.FAILED;
113-
});
114-
} else {
115-
// error happened in in-active transaction, just to the cleanup and change state to FAILED
116-
this._state = _states.FAILED;
117-
this._onClose();
118-
// no async actions needed - return resolved promise
119-
return Promise.resolve();
120-
}
106+
// error will be "acknowledged" by sending a RESET message
107+
// database will then forget about this transaction and cleanup all corresponding resources
108+
// it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
109+
this._state = _states.FAILED;
110+
this._onClose();
111+
112+
// release connection back to the pool
113+
return this._connectionHolder.releaseConnection();
121114
}
122115
}
123116

@@ -126,15 +119,12 @@ class _TransactionStreamObserver extends StreamObserver {
126119
constructor(tx) {
127120
super(tx._errorTransformer || ((err) => {return err}));
128121
this._tx = tx;
129-
//this is to to avoid multiple calls to onError caused by IGNORED
130-
this._hasFailed = false;
131122
}
132123

133124
onError(error) {
134125
if (!this._hasFailed) {
135126
this._tx._onError().then(() => {
136127
super.onError(error);
137-
this._hasFailed = true;
138128
});
139129
}
140130
}

test/internal/stream-observer.test.js

+13
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ describe('StreamObserver', () => {
142142
expect(receivedMetaData).toEqual({metaDataField1: 'value1', metaDataField2: 'value2'});
143143
});
144144

145+
it('invokes error transformer only once on error', () => {
146+
const errors = [];
147+
const streamObserver = new StreamObserver(error => errors.push(error));
148+
149+
const error1 = new Error('Hello');
150+
const error2 = new Error('World');
151+
152+
streamObserver.onError(error1);
153+
streamObserver.onError(error2);
154+
155+
expect(errors).toEqual([error1]);
156+
});
157+
145158
});
146159

147160
function newStreamObserver() {

0 commit comments

Comments
 (0)