Skip to content

Commit d000511

Browse files
committed
SERVER-29127 Improve behavior of tailable awaitData cursors with readConcern:majority
Fixes a bug with awaitData cursors on readConcern:majority; they would not be awakened when a new snapshot was set to "committed". Also removed a related false comment. Fixes a bug in planExecutor which would prevent awakening when oplog visibility changed with a visibility-restricted cursor active.
1 parent c1e7921 commit d000511

File tree

5 files changed

+49
-21
lines changed

5 files changed

+49
-21
lines changed

buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@ selector:
1717
# secondaryThrottle is not set.
1818
- jstests/aggregation/sources/facet/use_cases.js # Cannot specify write concern when
1919
# secondaryThrottle is not set.
20-
21-
# TODO (SERVER-29127) The CappedInsertNotifier wakes up the change stream's collection scan before
22-
# it can see the new insert, so it will immediately see EOF and go back to sleep. This will cause
23-
# the test to fail, since it is expecting an insert to immediately wake up the change stream and
24-
# return the new result.
25-
- jstests/aggregation/sources/changeStream/only_wake_getmore_for_relevant_changes.js
2620
- jstests/aggregation/testSlave.js # Majority read on secondary requires afterOpTime.
2721
- jstests/aggregation/sources/out/*.js # Cannot specify write concern when secondaryThrottle is not set.
2822

src/mongo/db/query/plan_executor.cpp

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ const OperationContext::Decoration<bool> shouldWaitForInserts =
6565
const OperationContext::Decoration<repl::OpTime> clientsLastKnownCommittedOpTime =
6666
OperationContext::declareDecoration<repl::OpTime>();
6767

68+
struct CappedInsertNotifierData {
69+
shared_ptr<CappedInsertNotifier> notifier;
70+
uint64_t lastEOFVersion = ~0;
71+
};
72+
6873
namespace {
6974

7075
namespace {
@@ -407,30 +412,45 @@ bool PlanExecutor::shouldWaitForInserts() {
407412
return false;
408413
}
409414

410-
bool PlanExecutor::waitForInserts() {
411-
// If we cannot yield, we should retry immediately.
415+
std::shared_ptr<CappedInsertNotifier> PlanExecutor::getCappedInsertNotifier() {
416+
// If we cannot yield, we should retry immediately when we hit EOF, so do not get
417+
// a CappedInsertNotifier.
412418
if (!_yieldPolicy->canReleaseLocksDuringExecution())
413-
return true;
419+
return nullptr;
414420

415-
// We can only wait if we have a collection; otherwise retry immediately.
421+
// We can only wait if we have a collection; otherwise we should retry immediately when
422+
// we hit EOF.
416423
dassert(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IS));
417424
auto db = dbHolder().get(_opCtx, _nss.db());
418425
if (!db)
419-
return true;
426+
return nullptr;
420427
auto collection = db->getCollection(_opCtx, _nss);
421428
if (!collection)
429+
return nullptr;
430+
431+
return collection->getCappedInsertNotifier();
432+
}
433+
434+
bool PlanExecutor::waitForInserts(CappedInsertNotifierData* notifierData) {
435+
// We tested to see if we could wait when getting the CappedInsertNotifier.
436+
if (!notifierData->notifier)
422437
return true;
423438

424-
auto notifier = collection->getCappedInsertNotifier();
425-
uint64_t notifierVersion = notifier->getVersion();
439+
// The notifier wait() method will not wait unless the version passed to it matches the
440+
// current version of the notifier. Since the version passed to it is the current version
441+
// of the notifier at the time of the previous EOF, we require two EOFs in a row with no
442+
// notifier version change in order to wait. This is sufficient to ensure we never wait
443+
// when data is available.
426444
auto curOp = CurOp::get(_opCtx);
427445
curOp->pauseTimer();
428446
ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
429447
auto opCtx = _opCtx;
430-
bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifier, notifierVersion] {
448+
uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
449+
bool yieldResult = _yieldPolicy->yield(nullptr, [opCtx, notifierData] {
431450
const auto timeout = opCtx->getRemainingMaxTimeMicros();
432-
notifier->wait(notifierVersion, timeout);
451+
notifierData->notifier->wait(notifierData->lastEOFVersion, timeout);
433452
});
453+
notifierData->lastEOFVersion = currentNotifierVersion;
434454
return yieldResult;
435455
}
436456

@@ -471,6 +491,13 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
471491
// Incremented on every writeConflict, reset to 0 on any successful call to _root->work.
472492
size_t writeConflictsInARow = 0;
473493

494+
// Capped insert data; declared outside the loop so we hold a shared pointer to the capped
495+
// insert notifier the entire time we are in the loop. Holding a shared pointer to the capped
496+
// insert notifier is necessary for the notifierVersion to advance.
497+
CappedInsertNotifierData cappedInsertNotifierData;
498+
if (shouldWaitForInserts()) {
499+
cappedInsertNotifierData.notifier = getCappedInsertNotifier();
500+
}
474501
for (;;) {
475502
// These are the conditions which can cause us to yield:
476503
// 1) The yield policy's timer elapsed, or
@@ -563,7 +590,7 @@ PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut,
563590
// Fall through to yield check at end of large conditional.
564591
} else if (PlanStage::IS_EOF == code) {
565592
if (shouldWaitForInserts()) {
566-
const bool locksReacquiredAfterYield = waitForInserts();
593+
const bool locksReacquiredAfterYield = waitForInserts(&cappedInsertNotifierData);
567594
if (locksReacquiredAfterYield) {
568595
// There may be more results, try to get more data.
569596
continue;

src/mongo/db/query/plan_executor.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
namespace mongo {
4242

4343
class BSONObj;
44+
class CappedInsertNotifier;
45+
struct CappedInsertNotifierData;
4446
class Collection;
4547
class CursorManager;
4648
class PlanExecutor;
@@ -447,10 +449,14 @@ class PlanExecutor {
447449
// should be returned immediately.
448450
bool shouldWaitForInserts();
449451

452+
// Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
453+
// is not capable of yielding based on a notifier.
454+
std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier();
455+
450456
// Yields locks and waits for inserts to the collection. Returns true if there may be new
451457
// inserts, false if there is a timeout or an interrupt. If this planExecutor cannot yield,
452458
// returns true immediately.
453-
bool waitForInserts();
459+
bool waitForInserts(CappedInsertNotifierData* notifierData);
454460

455461
ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
456462

src/mongo/db/repl/replication_coordinator_external_state_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,7 @@ void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotIn
799799
auto manager = _service->getGlobalStorageEngine()->getSnapshotManager();
800800
invariant(manager); // This should never be called if there is no SnapshotManager.
801801
manager->setCommittedSnapshot(newCommitPoint.name, newCommitPoint.opTime.getTimestamp());
802+
notifyOplogMetadataWaiters(newCommitPoint.opTime);
802803
}
803804

804805
void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx,

src/mongo/db/repl/replication_coordinator_impl.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3039,7 +3039,6 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& commit
30393039

30403040
void ReplicationCoordinatorImpl::_updateCommitPoint_inlock() {
30413041
auto committedOpTime = _topCoord->getLastCommittedOpTime();
3042-
_externalState->notifyOplogMetadataWaiters(committedOpTime);
30433042

30443043
// Update the stable timestamp.
30453044
_setStableTimestampForStorage_inlock();
@@ -3060,10 +3059,11 @@ void ReplicationCoordinatorImpl::_updateCommitPoint_inlock() {
30603059

30613060
// Update committed snapshot and wake up any threads waiting on read concern or
30623061
// write concern.
3063-
//
3064-
// This function is only called on secondaries, so only threads waiting for
3065-
// committed snapshot need to be woken up.
30663062
_updateCommittedSnapshot_inlock(newSnapshot);
3063+
} else {
3064+
// Even if we have no new snapshot, we need to notify waiters that the commit point
3065+
// moved.
3066+
_externalState->notifyOplogMetadataWaiters(committedOpTime);
30673067
}
30683068
}
30693069

0 commit comments

Comments
 (0)