Skip to content

Commit a8fe1d8

Browse files
committed
SERVER-36299 Add integration test for exhaust with OP_MSG
This reverts commit 2d79aaf.
1 parent 6144771 commit a8fe1d8

File tree

1 file changed

+131
-0
lines changed

1 file changed

+131
-0
lines changed

src/mongo/rpc/op_msg_integration_test.cpp

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "mongo/platform/basic.h"
3030

3131
#include "mongo/client/dbclient_connection.h"
32+
#include "mongo/db/query/getmore_request.h"
3233
#include "mongo/rpc/get_status_from_command_result.h"
3334
#include "mongo/rpc/op_msg.h"
3435
#include "mongo/unittest/integration_test.h"
@@ -197,4 +198,134 @@ TEST(OpMsg, DocumentSequenceReturnsWork) {
197198
<< "admin"));
198199
}
199200

201+
TEST(OpMsg, ServerHandlesExhaustCorrectly) {
202+
std::string errMsg;
203+
auto conn = std::unique_ptr<DBClientBase>(
204+
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
205+
uassert(ErrorCodes::SocketException, errMsg, conn);
206+
207+
// Only test exhaust against a single server.
208+
if (conn->isReplicaSetMember() || conn->isMongos()) {
209+
return;
210+
}
211+
212+
NamespaceString nss("test", "coll");
213+
214+
conn->dropCollection(nss.toString());
215+
216+
// Insert a few documents.
217+
for (int i = 0; i < 5; i++) {
218+
conn->insert(nss.toString(), BSON("_id" << i), 0);
219+
}
220+
221+
// Issue a find request to open a cursor but return 0 documents.
222+
auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1));
223+
auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd);
224+
auto request = opMsgRequest.serialize();
225+
226+
Message reply;
227+
ASSERT(conn->call(request, reply));
228+
auto res = OpMsg::parse(reply).body;
229+
const long long cursorId = res["cursor"]["id"].numberLong();
230+
ASSERT(res["cursor"]["firstBatch"].Array().empty());
231+
ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
232+
233+
// Construct getMore request with exhaust flag. Set batch size so we will need multiple batches
234+
// to exhaust the cursor.
235+
int batchSize = 2;
236+
GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
237+
opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON());
238+
request = opMsgRequest.serialize();
239+
OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
240+
241+
// Run getMore to initiate the exhaust stream.
242+
ASSERT(conn->call(request, reply));
243+
auto lastRequestId = reply.header().getId();
244+
ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
245+
res = OpMsg::parse(reply).body;
246+
ASSERT_OK(getStatusFromCommandResult(res));
247+
ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId);
248+
std::vector<BSONElement> nextBatch = res["cursor"]["nextBatch"].Array();
249+
ASSERT_EQ(nextBatch.size(), 2U);
250+
ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 0));
251+
ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1));
252+
253+
// Receive next exhaust batch.
254+
conn->recv(reply, lastRequestId);
255+
lastRequestId = reply.header().getId();
256+
ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
257+
res = OpMsg::parse(reply).body;
258+
ASSERT_OK(getStatusFromCommandResult(res));
259+
ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId);
260+
nextBatch = res["cursor"]["nextBatch"].Array();
261+
ASSERT_EQ(nextBatch.size(), 2U);
262+
ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 2));
263+
ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 3));
264+
265+
// Receive terminal batch.
266+
ASSERT(conn->recv(reply, lastRequestId));
267+
ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
268+
res = OpMsg::parse(reply).body;
269+
ASSERT_OK(getStatusFromCommandResult(res));
270+
ASSERT_EQ(res["cursor"]["id"].numberLong(), 0);
271+
nextBatch = res["cursor"]["nextBatch"].Array();
272+
ASSERT_EQ(nextBatch.size(), 1U);
273+
ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4));
274+
}
275+
276+
TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {
277+
// This test simply tries to verify that using the exhaust option with DBClientCursor works
278+
// correctly. The externally visible behavior should technically be the same as a non-exhaust
279+
// cursor. The exhaust cursor should ideally provide a performance win over non-exhaust, but we
280+
// don't measure that here.
281+
std::string errMsg;
282+
auto conn = std::unique_ptr<DBClientBase>(
283+
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
284+
uassert(ErrorCodes::SocketException, errMsg, conn);
285+
286+
// Only test exhaust against a single server.
287+
if (conn->isReplicaSetMember() || conn->isMongos()) {
288+
return;
289+
}
290+
291+
NamespaceString nss("test", "coll");
292+
conn->dropCollection(nss.toString());
293+
294+
const int nDocs = 5;
295+
unittest::log() << "Inserting " << nDocs << " documents.";
296+
for (int i = 0; i < nDocs; i++) {
297+
auto doc = BSON("_id" << i);
298+
conn->insert(nss.toString(), doc, 0);
299+
}
300+
301+
ASSERT_EQ(conn->count(nss.toString()), size_t(nDocs));
302+
unittest::log() << "Finished document insertion.";
303+
304+
// Open an exhaust cursor.
305+
int batchSize = 2;
306+
auto cursor =
307+
conn->query(nss, Query().sort("_id", 1), 0, 0, nullptr, QueryOption_Exhaust, batchSize);
308+
309+
// Verify that the documents are returned properly. Exhaust cursors should still receive results
310+
// in batches, so we check that these batches correspond to the given specified batch size.
311+
ASSERT(cursor->more());
312+
ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 0));
313+
ASSERT(cursor->more());
314+
ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 1));
315+
ASSERT_EQ(cursor->objsLeftInBatch(), 0);
316+
317+
ASSERT(cursor->more());
318+
ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 2));
319+
ASSERT(cursor->more());
320+
ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 3));
321+
ASSERT_EQ(cursor->objsLeftInBatch(), 0);
322+
323+
ASSERT(cursor->more());
324+
ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 4));
325+
ASSERT_EQ(cursor->objsLeftInBatch(), 0);
326+
327+
// Should have consumed all documents at this point.
328+
ASSERT(!cursor->more());
329+
ASSERT(cursor->isDead());
330+
}
200331
} // namespace mongo

0 commit comments

Comments
 (0)