Skip to content

Commit 65a12d5

Browse files
feat(client): add command timeout option (redis#3008)
Co-authored-by: Florian Schunk <[email protected]>
1 parent 79749f2 commit 65a12d5

File tree

8 files changed

+212
-76
lines changed

8 files changed

+212
-76
lines changed

docs/command-options.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,19 @@ try {
3737
}
3838
```
3939

40+
41+
## Timeout
42+
43+
This option is similar to the Abort Signal one, but provides an easier way to set timeout for commands. Again, this applies to commands that haven't been written to the socket yet.
44+
45+
```javascript
46+
const client = createClient({
47+
commandOptions: {
48+
timeout: 1000
49+
}
50+
})
51+
```
52+
4053
## ASAP
4154

4255
Commands that are executed in the "asap" mode are added to the beginning of the "to sent" queue.

packages/client/lib/client/commands-queue.ts

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply } from '../errors';
6+
import { AbortError, ErrorReply, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
88

99
export interface CommandOptions<T = TypeMapping> {
@@ -14,6 +14,10 @@ export interface CommandOptions<T = TypeMapping> {
1414
* Maps between RESP and JavaScript types
1515
*/
1616
typeMapping?: T;
17+
/**
18+
* Timeout for the command in milliseconds
19+
*/
20+
timeout?: number;
1721
}
1822

1923
export interface CommandToWrite extends CommandWaitingForReply {
@@ -23,6 +27,10 @@ export interface CommandToWrite extends CommandWaitingForReply {
2327
signal: AbortSignal;
2428
listener: () => unknown;
2529
} | undefined;
30+
timeout: {
31+
signal: AbortSignal;
32+
listener: () => unknown;
33+
} | undefined;
2634
}
2735

2836
interface CommandWaitingForReply {
@@ -80,7 +88,7 @@ export default class RedisCommandsQueue {
8088
#onPush(push: Array<any>) {
8189
// TODO: type
8290
if (this.#pubSub.handleMessageReply(push)) return true;
83-
91+
8492
const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
8593
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
8694
const channel = push[1].toString();
@@ -153,12 +161,26 @@ export default class RedisCommandsQueue {
153161
args,
154162
chainId: options?.chainId,
155163
abort: undefined,
164+
timeout: undefined,
156165
resolve,
157166
reject,
158167
channelsCounter: undefined,
159168
typeMapping: options?.typeMapping
160169
};
161170

171+
const timeout = options?.timeout;
172+
if (timeout) {
173+
const signal = AbortSignal.timeout(timeout);
174+
value.timeout = {
175+
signal,
176+
listener: () => {
177+
this.#toWrite.remove(node);
178+
value.reject(new TimeoutError());
179+
}
180+
};
181+
signal.addEventListener('abort', value.timeout.listener, { once: true });
182+
}
183+
162184
const signal = options?.abortSignal;
163185
if (signal) {
164186
value.abort = {
@@ -181,6 +203,7 @@ export default class RedisCommandsQueue {
181203
args: command.args,
182204
chainId,
183205
abort: undefined,
206+
timeout: undefined,
184207
resolve() {
185208
command.resolve();
186209
resolve();
@@ -202,7 +225,7 @@ export default class RedisCommandsQueue {
202225
this.decoder.onReply = (reply => {
203226
if (Array.isArray(reply)) {
204227
if (this.#onPush(reply)) return;
205-
228+
206229
if (PONG.equals(reply[0] as Buffer)) {
207230
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
208231
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
@@ -250,7 +273,7 @@ export default class RedisCommandsQueue {
250273
if (!this.#pubSub.isActive) {
251274
this.#resetDecoderCallbacks();
252275
}
253-
276+
254277
resolve();
255278
};
256279
}
@@ -299,6 +322,7 @@ export default class RedisCommandsQueue {
299322
args: ['MONITOR'],
300323
chainId: options?.chainId,
301324
abort: undefined,
325+
timeout: undefined,
302326
// using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply
303327
resolve: () => {
304328
// after running `MONITOR` only `MONITOR` and `RESET` replies are expected
@@ -317,7 +341,7 @@ export default class RedisCommandsQueue {
317341
reject,
318342
channelsCounter: undefined,
319343
typeMapping
320-
}, options?.asap);
344+
}, options?.asap);
321345
});
322346
}
323347

@@ -340,18 +364,19 @@ export default class RedisCommandsQueue {
340364
this.#resetDecoderCallbacks();
341365
this.#resetFallbackOnReply = undefined;
342366
this.#pubSub.reset();
343-
367+
344368
this.#waitingForReply.shift()!.resolve(reply);
345369
return;
346370
}
347-
371+
348372
this.#resetFallbackOnReply!(reply);
349373
}) as Decoder['onReply'];
350374

351375
this.#toWrite.push({
352376
args: ['RESET'],
353377
chainId,
354378
abort: undefined,
379+
timeout: undefined,
355380
resolve,
356381
reject,
357382
channelsCounter: undefined,
@@ -376,16 +401,20 @@ export default class RedisCommandsQueue {
376401
continue;
377402
}
378403

379-
// TODO reuse `toSend` or create new object?
404+
// TODO reuse `toSend` or create new object?
380405
(toSend as any).args = undefined;
381406
if (toSend.abort) {
382407
RedisCommandsQueue.#removeAbortListener(toSend);
383408
toSend.abort = undefined;
384409
}
410+
if (toSend.timeout) {
411+
RedisCommandsQueue.#removeTimeoutListener(toSend);
412+
toSend.timeout = undefined;
413+
}
385414
this.#chainInExecution = toSend.chainId;
386415
toSend.chainId = undefined;
387416
this.#waitingForReply.push(toSend);
388-
417+
389418
yield encoded;
390419
toSend = this.#toWrite.shift();
391420
}
@@ -402,11 +431,18 @@ export default class RedisCommandsQueue {
402431
command.abort!.signal.removeEventListener('abort', command.abort!.listener);
403432
}
404433

434+
static #removeTimeoutListener(command: CommandToWrite) {
435+
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
436+
}
437+
405438
static #flushToWrite(toBeSent: CommandToWrite, err: Error) {
406439
if (toBeSent.abort) {
407440
RedisCommandsQueue.#removeAbortListener(toBeSent);
408441
}
409-
442+
if (toBeSent.timeout) {
443+
RedisCommandsQueue.#removeTimeoutListener(toBeSent);
444+
}
445+
410446
toBeSent.reject(err);
411447
}
412448

packages/client/lib/client/index.spec.ts

Lines changed: 96 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { strict as assert } from 'node:assert';
22
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
33
import RedisClient, { RedisClientOptions, RedisClientType } from '.';
4-
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors';
4+
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, TimeoutError, WatchError } from '../errors';
55
import { defineScript } from '../lua-script';
6-
import { spy } from 'sinon';
6+
import { spy, stub } from 'sinon';
77
import { once } from 'node:events';
88
import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec';
99
import { RESP_TYPES } from '../RESP/decoder';
@@ -239,30 +239,84 @@ describe('Client', () => {
239239
assert.equal(await client.sendCommand(['PING']), 'PONG');
240240
}, GLOBAL.SERVERS.OPEN);
241241

242-
describe('AbortController', () => {
243-
before(function () {
244-
if (!global.AbortController) {
245-
this.skip();
246-
}
242+
testUtils.testWithClient('Unactivated AbortController should not abort', async client => {
243+
await client.sendCommand(['PING'], {
244+
abortSignal: new AbortController().signal
247245
});
246+
}, GLOBAL.SERVERS.OPEN);
248247

249-
testUtils.testWithClient('success', async client => {
250-
await client.sendCommand(['PING'], {
251-
abortSignal: new AbortController().signal
252-
});
253-
}, GLOBAL.SERVERS.OPEN);
248+
testUtils.testWithClient('AbortError', async client => {
249+
await blockSetImmediate(async () => {
250+
await assert.rejects(client.sendCommand(['PING'], {
251+
abortSignal: AbortSignal.timeout(5)
252+
}), AbortError);
253+
})
254+
}, GLOBAL.SERVERS.OPEN);
254255

255-
testUtils.testWithClient('AbortError', client => {
256-
const controller = new AbortController();
257-
controller.abort();
256+
testUtils.testWithClient('Timeout with custom timeout config', async client => {
257+
await blockSetImmediate(async () => {
258+
await assert.rejects(client.sendCommand(['PING'], {
259+
timeout: 5
260+
}), TimeoutError);
261+
})
262+
}, GLOBAL.SERVERS.OPEN);
258263

259-
return assert.rejects(
260-
client.sendCommand(['PING'], {
261-
abortSignal: controller.signal
262-
}),
263-
AbortError
264-
);
265-
}, GLOBAL.SERVERS.OPEN);
264+
testUtils.testWithCluster('Timeout with custom timeout config (cluster)', async cluster => {
265+
await blockSetImmediate(async () => {
266+
await assert.rejects(cluster.sendCommand(undefined, true, ['PING'], {
267+
timeout: 5
268+
}), TimeoutError);
269+
})
270+
}, GLOBAL.CLUSTERS.OPEN);
271+
272+
testUtils.testWithClientSentinel('Timeout with custom timeout config (sentinel)', async sentinel => {
273+
await blockSetImmediate(async () => {
274+
await assert.rejects(sentinel.sendCommand(true, ['PING'], {
275+
timeout: 5
276+
}), TimeoutError);
277+
})
278+
}, GLOBAL.CLUSTERS.OPEN);
279+
280+
testUtils.testWithClient('Timeout with global timeout config', async client => {
281+
await blockSetImmediate(async () => {
282+
await assert.rejects(client.ping(), TimeoutError);
283+
await assert.rejects(client.sendCommand(['PING']), TimeoutError);
284+
});
285+
}, {
286+
...GLOBAL.SERVERS.OPEN,
287+
clientOptions: {
288+
commandOptions: {
289+
timeout: 5
290+
}
291+
}
292+
});
293+
294+
testUtils.testWithCluster('Timeout with global timeout config (cluster)', async cluster => {
295+
await blockSetImmediate(async () => {
296+
await assert.rejects(cluster.HSET('key', 'foo', 'value'), TimeoutError);
297+
await assert.rejects(cluster.sendCommand(undefined, true, ['PING']), TimeoutError);
298+
});
299+
}, {
300+
...GLOBAL.CLUSTERS.OPEN,
301+
clusterConfiguration: {
302+
commandOptions: {
303+
timeout: 5
304+
}
305+
}
306+
});
307+
308+
testUtils.testWithClientSentinel('Timeout with global timeout config (sentinel)', async sentinel => {
309+
await blockSetImmediate(async () => {
310+
await assert.rejects(sentinel.HSET('key', 'foo', 'value'), TimeoutError);
311+
await assert.rejects(sentinel.sendCommand(true, ['PING']), TimeoutError);
312+
});
313+
}, {
314+
...GLOBAL.SENTINEL.OPEN,
315+
clientOptions: {
316+
commandOptions: {
317+
timeout: 5
318+
}
319+
}
266320
});
267321

268322
testUtils.testWithClient('undefined and null should not break the client', async client => {
@@ -900,3 +954,23 @@ describe('Client', () => {
900954
}, GLOBAL.SERVERS.OPEN);
901955
});
902956
});
957+
958+
/**
959+
* Executes the provided function in a context where setImmediate is stubbed to not do anything.
960+
* This blocks setImmediate callbacks from executing
961+
*/
962+
async function blockSetImmediate(fn: () => Promise<unknown>) {
963+
let setImmediateStub: any;
964+
965+
try {
966+
setImmediateStub = stub(global, 'setImmediate');
967+
setImmediateStub.callsFake(() => {
968+
//Dont call the callback, effectively blocking execution
969+
});
970+
await fn();
971+
} finally {
972+
if (setImmediateStub) {
973+
setImmediateStub.restore();
974+
}
975+
}
976+
}

packages/client/lib/client/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ export default class RedisClient<
526526
async #handshake(chainId: symbol, asap: boolean) {
527527
const promises = [];
528528
const commandsWithErrorHandlers = await this.#getHandshakeCommands();
529-
529+
530530
if (asap) commandsWithErrorHandlers.reverse()
531531

532532
for (const { cmd, errorHandler } of commandsWithErrorHandlers) {
@@ -632,7 +632,7 @@ export default class RedisClient<
632632
// since they could be connected to an older version that doesn't support them.
633633
}
634634
});
635-
635+
636636
commands.push({
637637
cmd: [
638638
'CLIENT',
@@ -889,7 +889,13 @@ export default class RedisClient<
889889
return Promise.reject(new ClientOfflineError());
890890
}
891891

892-
const promise = this._self.#queue.addCommand<T>(args, options);
892+
// Merge global options with provided options
893+
const opts = {
894+
...this._self._commandOptions,
895+
...options
896+
}
897+
898+
const promise = this._self.#queue.addCommand<T>(args, opts);
893899
this._self.#scheduleWrite();
894900
return promise;
895901
}

0 commit comments

Comments
 (0)