Skip to content

Commit 4da926e

Browse files
authored
feat(NODE-5019): add runCursorCommand API (#3655)
1 parent bf413e5 commit 4da926e

File tree

20 files changed

+3146
-32
lines changed

20 files changed

+3146
-32
lines changed

.evergreen/run-serverless-tests.sh

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ npx mocha \
2525
test/integration/transactions/transactions.test.ts \
2626
test/integration/versioned-api/versioned_api.spec.test.js \
2727
test/integration/load-balancers/load_balancers.spec.test.js \
28-
test/integration/client-side-encryption/client_side_encryption.spec.test.ts
28+
test/integration/client-side-encryption/client_side_encryption.spec.test.ts \
29+
test/integration/run-command/run_command.spec.test.ts

src/cursor/abstract_cursor.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ export abstract class AbstractCursor<
608608
abstract clone(): AbstractCursor<TSchema>;
609609

610610
/** @internal */
611-
abstract _initialize(
611+
protected abstract _initialize(
612612
session: ClientSession | undefined,
613613
callback: Callback<ExecutionResult>
614614
): void;

src/cursor/run_command_cursor.ts

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import type { BSONSerializeOptions, Document, Long } from '../bson';
2+
import type { Db } from '../db';
3+
import { MongoAPIError, MongoUnexpectedServerResponseError } from '../error';
4+
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
5+
import { GetMoreOperation } from '../operations/get_more';
6+
import { RunCommandOperation } from '../operations/run_command';
7+
import type { ReadConcernLike } from '../read_concern';
8+
import type { ReadPreferenceLike } from '../read_preference';
9+
import type { ClientSession } from '../sessions';
10+
import { Callback, ns } from '../utils';
11+
import { AbstractCursor } from './abstract_cursor';
12+
13+
/** @public */
14+
export type RunCursorCommandOptions = {
15+
readPreference?: ReadPreferenceLike;
16+
session?: ClientSession;
17+
} & BSONSerializeOptions;
18+
19+
/** @internal */
20+
type RunCursorCommandResponse = {
21+
cursor: { id: bigint | Long | number; ns: string; firstBatch: Document[] };
22+
ok: 1;
23+
};
24+
25+
/** @public */
26+
export class RunCommandCursor extends AbstractCursor {
27+
public readonly command: Readonly<Record<string, any>>;
28+
public readonly getMoreOptions: {
29+
comment?: any;
30+
maxAwaitTimeMS?: number;
31+
batchSize?: number;
32+
} = {};
33+
34+
/**
35+
* Controls the `getMore.comment` field
36+
* @param comment - any BSON value
37+
*/
38+
public setComment(comment: any): this {
39+
this.getMoreOptions.comment = comment;
40+
return this;
41+
}
42+
43+
/**
44+
* Controls the `getMore.maxTimeMS` field. Only valid when cursor is tailable await
45+
* @param maxTimeMS - the number of milliseconds to wait for new data
46+
*/
47+
public setMaxTimeMS(maxTimeMS: number): this {
48+
this.getMoreOptions.maxAwaitTimeMS = maxTimeMS;
49+
return this;
50+
}
51+
52+
/**
53+
* Controls the `getMore.batchSize` field
54+
* @param maxTimeMS - the number documents to return in the `nextBatch`
55+
*/
56+
public setBatchSize(batchSize: number): this {
57+
this.getMoreOptions.batchSize = batchSize;
58+
return this;
59+
}
60+
61+
/** Unsupported for RunCommandCursor */
62+
public override clone(): never {
63+
throw new MongoAPIError('Clone not supported, create a new cursor with db.runCursorCommand');
64+
}
65+
66+
/** Unsupported for RunCommandCursor: readConcern must be configured directly on command document */
67+
public override withReadConcern(_: ReadConcernLike): never {
68+
throw new MongoAPIError(
69+
'RunCommandCursor does not support readConcern it must be attached to the command being run'
70+
);
71+
}
72+
73+
/** Unsupported for RunCommandCursor: various cursor flags must be configured directly on command document */
74+
public override addCursorFlag(_: string, __: boolean): never {
75+
throw new MongoAPIError(
76+
'RunCommandCursor does not support cursor flags, they must be attached to the command being run'
77+
);
78+
}
79+
80+
/** Unsupported for RunCommandCursor: maxTimeMS must be configured directly on command document */
81+
public override maxTimeMS(_: number): never {
82+
throw new MongoAPIError(
83+
'maxTimeMS must be configured on the command document directly, to configure getMore.maxTimeMS use cursor.setMaxTimeMS()'
84+
);
85+
}
86+
87+
/** Unsupported for RunCommandCursor: batchSize must be configured directly on command document */
88+
public override batchSize(_: number): never {
89+
throw new MongoAPIError(
90+
'batchSize must be configured on the command document directly, to configure getMore.batchSize use cursor.setBatchSize()'
91+
);
92+
}
93+
94+
/** @internal */
95+
private db: Db;
96+
97+
/** @internal */
98+
constructor(db: Db, command: Document, options: RunCursorCommandOptions = {}) {
99+
super(db.s.client, ns(db.namespace), options);
100+
this.db = db;
101+
this.command = Object.freeze({ ...command });
102+
}
103+
104+
/** @internal */
105+
protected _initialize(session: ClientSession, callback: Callback<ExecutionResult>) {
106+
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
107+
...this.cursorOptions,
108+
session: session,
109+
readPreference: this.cursorOptions.readPreference
110+
});
111+
executeOperation(this.client, operation).then(
112+
response => {
113+
if (response.cursor == null) {
114+
callback(
115+
new MongoUnexpectedServerResponseError('Expected server to respond with cursor')
116+
);
117+
return;
118+
}
119+
callback(undefined, {
120+
server: operation.server,
121+
session,
122+
response
123+
});
124+
},
125+
err => callback(err)
126+
);
127+
}
128+
129+
/** @internal */
130+
override _getMore(_batchSize: number, callback: Callback<Document>) {
131+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
132+
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
133+
...this.cursorOptions,
134+
session: this.session,
135+
...this.getMoreOptions
136+
});
137+
138+
executeOperation(this.client, getMoreOperation, callback);
139+
}
140+
}

src/db.ts

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { Collection, CollectionOptions } from './collection';
55
import * as CONSTANTS from './constants';
66
import { AggregationCursor } from './cursor/aggregation_cursor';
77
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
8+
import { RunCommandCursor, type RunCursorCommandOptions } from './cursor/run_command_cursor';
89
import { MongoAPIError, MongoInvalidArgumentError } from './error';
910
import type { MongoClient, PkFactory } from './mongo_client';
1011
import type { TODO_NODE_3286 } from './mongo_types';
@@ -523,6 +524,19 @@ export class Db {
523524

524525
return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
525526
}
527+
528+
/**
529+
* A low level cursor API providing basic driver functionality:
530+
* - ClientSession management
531+
* - ReadPreference for server selection
532+
* - Running getMores automatically when a local batch is exhausted
533+
*
534+
* @param command - The command that will start a cursor on the server.
535+
* @param options - Configurations for running the command, bson options will apply to getMores
536+
*/
537+
runCursorCommand(command: Document, options?: RunCursorCommandOptions): RunCommandCursor {
538+
return new RunCommandCursor(this, command, options);
539+
}
526540
}
527541

528542
// TODO(NODE-3484): Refactor into MongoDBNamespace

src/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { AggregationCursor } from './cursor/aggregation_cursor';
88
import { FindCursor } from './cursor/find_cursor';
99
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
1010
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
11+
import type { RunCommandCursor } from './cursor/run_command_cursor';
1112
import { Db } from './db';
1213
import { GridFSBucket } from './gridfs';
1314
import { GridFSBucketReadStream } from './gridfs/download';
@@ -87,6 +88,7 @@ export {
8788
ListIndexesCursor,
8889
MongoClient,
8990
OrderedBulkOperation,
91+
RunCommandCursor,
9092
UnorderedBulkOperation
9193
};
9294

@@ -275,6 +277,7 @@ export type {
275277
ChangeStreamAggregateRawResult,
276278
ChangeStreamCursorOptions
277279
} from './cursor/change_stream_cursor';
280+
export type { RunCursorCommandOptions } from './cursor/run_command_cursor';
278281
export type { DbOptions, DbPrivate } from './db';
279282
export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps';
280283
export type { Encrypter, EncrypterOptions } from './encrypter';

test/integration/client-side-operations-timeout/.gitkeep

Whitespace-only changes.

test/integration/run-command/run_command.spec.test.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,10 @@ import { loadSpecTests } from '../../spec';
22
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
33

44
describe('RunCommand spec', () => {
5-
runUnifiedSuite(loadSpecTests('run-command'));
5+
runUnifiedSuite(loadSpecTests('run-command'), test => {
6+
if (test.description === 'does not attach $readPreference to given command on standalone') {
7+
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
8+
}
9+
return false;
10+
});
611
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { expect } from 'chai';
2+
3+
import { Db, MongoClient } from '../../mongodb';
4+
5+
describe('runCursorCommand API', () => {
6+
let client: MongoClient;
7+
let db: Db;
8+
9+
beforeEach(async function () {
10+
client = this.configuration.newClient({}, { monitorCommands: true });
11+
db = client.db();
12+
await db.dropDatabase().catch(() => null);
13+
await db
14+
.collection<{ _id: number }>('collection')
15+
.insertMany([{ _id: 0 }, { _id: 1 }, { _id: 2 }]);
16+
});
17+
18+
afterEach(async function () {
19+
await client.close();
20+
});
21+
22+
it('returns each document only once across multiple iterators', async () => {
23+
const cursor = db.runCursorCommand({ find: 'collection', filter: {}, batchSize: 1 });
24+
cursor.setBatchSize(1);
25+
26+
const a = cursor[Symbol.asyncIterator]();
27+
const b = cursor[Symbol.asyncIterator]();
28+
29+
// Interleaving calls to A and B
30+
const results = [
31+
await a.next(), // find, first doc
32+
await b.next(), // getMore, second doc
33+
34+
await a.next(), // getMore, third doc
35+
await b.next(), // getMore, no doc & exhausted id, a.k.a. done
36+
37+
await a.next(), // done
38+
await b.next() // done
39+
];
40+
41+
expect(results).to.deep.equal([
42+
{ value: { _id: 0 }, done: false },
43+
{ value: { _id: 1 }, done: false },
44+
{ value: { _id: 2 }, done: false },
45+
{ value: undefined, done: true },
46+
{ value: undefined, done: true },
47+
{ value: undefined, done: true }
48+
]);
49+
});
50+
});

test/mongodb.ts

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ export * from '../src/cursor/change_stream_cursor';
138138
export * from '../src/cursor/find_cursor';
139139
export * from '../src/cursor/list_collections_cursor';
140140
export * from '../src/cursor/list_indexes_cursor';
141+
export * from '../src/cursor/run_command_cursor';
141142
export * from '../src/db';
142143
export * from '../src/deps';
143144
export * from '../src/encrypter';

0 commit comments

Comments
 (0)