Skip to content

Add command timeout #3008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/command-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ try {
}
```


## Timeout

This option is similar to the Abort Signal one, but provides an easier way to set timeout for commands. Again, this applies to commands that haven't been written to the socket yet.

```javascript
const client = createClient({
commandOptions: {
timeout: 1000
}
})
```

## ASAP

Commands that are executed in the "asap" mode are added to the beginning of the "to sent" queue.
Expand Down
56 changes: 46 additions & 10 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { AbortError, ErrorReply, TimeoutError } from '../errors';
import { MonitorCallback } from '.';

export interface CommandOptions<T = TypeMapping> {
Expand All @@ -14,6 +14,10 @@ export interface CommandOptions<T = TypeMapping> {
* Maps between RESP and JavaScript types
*/
typeMapping?: T;
/**
* Timeout for the command in milliseconds
*/
timeout?: number;
}

export interface CommandToWrite extends CommandWaitingForReply {
Expand All @@ -23,6 +27,10 @@ export interface CommandToWrite extends CommandWaitingForReply {
signal: AbortSignal;
listener: () => unknown;
} | undefined;
timeout: {
signal: AbortSignal;
listener: () => unknown;
} | undefined;
}

interface CommandWaitingForReply {
Expand Down Expand Up @@ -80,7 +88,7 @@ export default class RedisCommandsQueue {
#onPush(push: Array<any>) {
// TODO: type
if (this.#pubSub.handleMessageReply(push)) return true;

const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
const channel = push[1].toString();
Expand Down Expand Up @@ -153,12 +161,26 @@ export default class RedisCommandsQueue {
args,
chainId: options?.chainId,
abort: undefined,
timeout: undefined,
resolve,
reject,
channelsCounter: undefined,
typeMapping: options?.typeMapping
};

const timeout = options?.timeout;
if (timeout) {
const signal = AbortSignal.timeout(timeout);
value.timeout = {
signal,
listener: () => {
this.#toWrite.remove(node);
value.reject(new TimeoutError());
}
};
signal.addEventListener('abort', value.timeout.listener, { once: true });
}

const signal = options?.abortSignal;
if (signal) {
value.abort = {
Expand All @@ -181,6 +203,7 @@ export default class RedisCommandsQueue {
args: command.args,
chainId,
abort: undefined,
timeout: undefined,
resolve() {
command.resolve();
resolve();
Expand All @@ -202,7 +225,7 @@ export default class RedisCommandsQueue {
this.decoder.onReply = (reply => {
if (Array.isArray(reply)) {
if (this.#onPush(reply)) return;

if (PONG.equals(reply[0] as Buffer)) {
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
Expand Down Expand Up @@ -250,7 +273,7 @@ export default class RedisCommandsQueue {
if (!this.#pubSub.isActive) {
this.#resetDecoderCallbacks();
}

resolve();
};
}
Expand Down Expand Up @@ -299,6 +322,7 @@ export default class RedisCommandsQueue {
args: ['MONITOR'],
chainId: options?.chainId,
abort: undefined,
timeout: undefined,
// using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply
resolve: () => {
// after running `MONITOR` only `MONITOR` and `RESET` replies are expected
Expand All @@ -317,7 +341,7 @@ export default class RedisCommandsQueue {
reject,
channelsCounter: undefined,
typeMapping
}, options?.asap);
}, options?.asap);
});
}

Expand All @@ -340,18 +364,19 @@ export default class RedisCommandsQueue {
this.#resetDecoderCallbacks();
this.#resetFallbackOnReply = undefined;
this.#pubSub.reset();

this.#waitingForReply.shift()!.resolve(reply);
return;
}

this.#resetFallbackOnReply!(reply);
}) as Decoder['onReply'];

this.#toWrite.push({
args: ['RESET'],
chainId,
abort: undefined,
timeout: undefined,
resolve,
reject,
channelsCounter: undefined,
Expand All @@ -376,16 +401,20 @@ export default class RedisCommandsQueue {
continue;
}

// TODO reuse `toSend` or create new object?
// TODO reuse `toSend` or create new object?
(toSend as any).args = undefined;
if (toSend.abort) {
RedisCommandsQueue.#removeAbortListener(toSend);
toSend.abort = undefined;
}
if (toSend.timeout) {
RedisCommandsQueue.#removeTimeoutListener(toSend);
toSend.timeout = undefined;
}
this.#chainInExecution = toSend.chainId;
toSend.chainId = undefined;
this.#waitingForReply.push(toSend);

yield encoded;
toSend = this.#toWrite.shift();
}
Expand All @@ -402,11 +431,18 @@ export default class RedisCommandsQueue {
command.abort!.signal.removeEventListener('abort', command.abort!.listener);
}

static #removeTimeoutListener(command: CommandToWrite) {
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
}

static #flushToWrite(toBeSent: CommandToWrite, err: Error) {
if (toBeSent.abort) {
RedisCommandsQueue.#removeAbortListener(toBeSent);
}

if (toBeSent.timeout) {
RedisCommandsQueue.#removeTimeoutListener(toBeSent);
}

toBeSent.reject(err);
}

Expand Down
118 changes: 96 additions & 22 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisClient, { RedisClientOptions, RedisClientType } from '.';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, TimeoutError, WatchError } from '../errors';
import { defineScript } from '../lua-script';
import { spy } from 'sinon';
import { spy, stub } from 'sinon';
import { once } from 'node:events';
import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec';
import { RESP_TYPES } from '../RESP/decoder';
Expand Down Expand Up @@ -239,30 +239,84 @@ describe('Client', () => {
assert.equal(await client.sendCommand(['PING']), 'PONG');
}, GLOBAL.SERVERS.OPEN);

describe('AbortController', () => {
before(function () {
if (!global.AbortController) {
this.skip();
}
testUtils.testWithClient('Unactivated AbortController should not abort', async client => {
await client.sendCommand(['PING'], {
abortSignal: new AbortController().signal
});
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('success', async client => {
await client.sendCommand(['PING'], {
abortSignal: new AbortController().signal
});
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('AbortError', async client => {
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
abortSignal: AbortSignal.timeout(5)
}), AbortError);
})
}, GLOBAL.SERVERS.OPEN);

testUtils.testWithClient('AbortError', client => {
const controller = new AbortController();
controller.abort();
testUtils.testWithClient('Timeout with custom timeout config', async client => {
await blockSetImmediate(async () => {
await assert.rejects(client.sendCommand(['PING'], {
timeout: 5
}), TimeoutError);
})
}, GLOBAL.SERVERS.OPEN);

return assert.rejects(
client.sendCommand(['PING'], {
abortSignal: controller.signal
}),
AbortError
);
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithCluster('Timeout with custom timeout config (cluster)', async cluster => {
await blockSetImmediate(async () => {
await assert.rejects(cluster.sendCommand(undefined, true, ['PING'], {
timeout: 5
}), TimeoutError);
})
}, GLOBAL.CLUSTERS.OPEN);

testUtils.testWithClientSentinel('Timeout with custom timeout config (sentinel)', async sentinel => {
await blockSetImmediate(async () => {
await assert.rejects(sentinel.sendCommand(true, ['PING'], {
timeout: 5
}), TimeoutError);
})
}, GLOBAL.CLUSTERS.OPEN);

testUtils.testWithClient('Timeout with global timeout config', async client => {
await blockSetImmediate(async () => {
await assert.rejects(client.ping(), TimeoutError);
await assert.rejects(client.sendCommand(['PING']), TimeoutError);
});
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
commandOptions: {
timeout: 5
}
}
});

testUtils.testWithCluster('Timeout with global timeout config (cluster)', async cluster => {
await blockSetImmediate(async () => {
await assert.rejects(cluster.HSET('key', 'foo', 'value'), TimeoutError);
await assert.rejects(cluster.sendCommand(undefined, true, ['PING']), TimeoutError);
});
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
commandOptions: {
timeout: 5
}
}
});

testUtils.testWithClientSentinel('Timeout with global timeout config (sentinel)', async sentinel => {
await blockSetImmediate(async () => {
await assert.rejects(sentinel.HSET('key', 'foo', 'value'), TimeoutError);
await assert.rejects(sentinel.sendCommand(true, ['PING']), TimeoutError);
});
}, {
...GLOBAL.SENTINEL.OPEN,
clientOptions: {
commandOptions: {
timeout: 5
}
}
});

testUtils.testWithClient('undefined and null should not break the client', async client => {
Expand Down Expand Up @@ -900,3 +954,23 @@ describe('Client', () => {
}, GLOBAL.SERVERS.OPEN);
});
});

/**
* Executes the provided function in a context where setImmediate is stubbed to not do anything.
* This blocks setImmediate callbacks from executing
*/
async function blockSetImmediate(fn: () => Promise<unknown>) {
let setImmediateStub: any;

try {
setImmediateStub = stub(global, 'setImmediate');
setImmediateStub.callsFake(() => {
//Dont call the callback, effectively blocking execution
});
await fn();
} finally {
if (setImmediateStub) {
setImmediateStub.restore();
}
}
}
12 changes: 9 additions & 3 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ export default class RedisClient<
async #handshake(chainId: symbol, asap: boolean) {
const promises = [];
const commandsWithErrorHandlers = await this.#getHandshakeCommands();

if (asap) commandsWithErrorHandlers.reverse()

for (const { cmd, errorHandler } of commandsWithErrorHandlers) {
Expand Down Expand Up @@ -632,7 +632,7 @@ export default class RedisClient<
// since they could be connected to an older version that doesn't support them.
}
});

commands.push({
cmd: [
'CLIENT',
Expand Down Expand Up @@ -889,7 +889,13 @@ export default class RedisClient<
return Promise.reject(new ClientOfflineError());
}

const promise = this._self.#queue.addCommand<T>(args, options);
// Merge global options with provided options
const opts = {
...this._self._commandOptions,
...options
}

const promise = this._self.#queue.addCommand<T>(args, opts);
this._self.#scheduleWrite();
return promise;
}
Expand Down
Loading
Loading