@@ -83,16 +83,15 @@ Status _applyOperationsForTransaction(OperationContext* opCtx,
8383 * Helper that will read the entire sequence of oplog entries for the transaction and apply each of
8484 * them.
8585 *
86- * Currently used for oplog application of a commitTransaction oplog entry during recovery, rollback
87- * and initial sync .
86+ * Currently used for oplog application of a commitTransaction oplog entry during recovery and
87+ * rollback .
8888 */
8989Status _applyTransactionFromOplogChain (OperationContext* opCtx,
9090 const OplogEntry& entry,
9191 repl::OplogApplication::Mode mode,
9292 Timestamp commitTimestamp,
9393 Timestamp durableTimestamp) {
94- invariant (mode == repl::OplogApplication::Mode::kRecovering ||
95- mode == repl::OplogApplication::Mode::kInitialSync );
94+ invariant (mode == repl::OplogApplication::Mode::kRecovering );
9695
9796 repl::MultiApplier::Operations ops;
9897 {
@@ -151,76 +150,88 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
151150Status applyCommitTransaction (OperationContext* opCtx,
152151 const OplogEntry& entry,
153152 repl::OplogApplication::Mode mode) {
154- // Return error if run via applyOps command.
155- uassert (50987 ,
156- " commitTransaction is only used internally by secondaries." ,
157- mode != repl::OplogApplication::Mode::kApplyOpsCmd );
158-
159153 IDLParserErrorContext ctx (" commitTransaction" );
160154 auto commitOplogEntryOpTime = entry.getOpTime ();
161155 auto commitCommand = CommitTransactionOplogObject::parse (ctx, entry.getObject ());
162156 invariant (commitCommand.getCommitTimestamp ());
163157
164- if (mode == repl::OplogApplication::Mode::kRecovering ) {
165- return _applyTransactionFromOplogChain (opCtx,
166- entry,
167- mode,
168- *commitCommand.getCommitTimestamp (),
169- commitOplogEntryOpTime.getTimestamp ());
158+ switch (mode) {
159+ case repl::OplogApplication::Mode::kRecovering : {
160+ return _applyTransactionFromOplogChain (opCtx,
161+ entry,
162+ mode,
163+ *commitCommand.getCommitTimestamp (),
164+ commitOplogEntryOpTime.getTimestamp ());
165+ }
166+ case repl::OplogApplication::Mode::kInitialSync : {
167+ // Initial sync should never apply 'commitTransaction' since it unpacks committed
168+ // transactions onto various applier threads.
169+ MONGO_UNREACHABLE;
170+ }
171+ case repl::OplogApplication::Mode::kApplyOpsCmd : {
172+ // Return error if run via applyOps command.
173+ uasserted (50987 , " commitTransaction is only used internally by secondaries." );
174+ }
175+ case repl::OplogApplication::Mode::kSecondary : {
176+ // Transaction operations are in its own batch, so we can modify their opCtx.
177+ invariant (entry.getSessionId ());
178+ invariant (entry.getTxnNumber ());
179+ opCtx->setLogicalSessionId (*entry.getSessionId ());
180+ opCtx->setTxnNumber (*entry.getTxnNumber ());
181+
182+ // The write on transaction table may be applied concurrently, so refreshing state
183+ // from disk may read that write, causing starting a new transaction on an existing
184+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
185+ MongoDOperationContextSessionWithoutRefresh sessionCheckout (opCtx);
186+
187+ auto transaction = TransactionParticipant::get (opCtx);
188+ invariant (transaction);
189+ transaction.unstashTransactionResources (opCtx, " commitTransaction" );
190+ transaction.commitPreparedTransaction (
191+ opCtx, *commitCommand.getCommitTimestamp (), commitOplogEntryOpTime);
192+ return Status::OK ();
193+ }
170194 }
171-
172- invariant (mode == repl::OplogApplication::Mode::kSecondary );
173-
174- // Transaction operations are in its own batch, so we can modify their opCtx.
175- invariant (entry.getSessionId ());
176- invariant (entry.getTxnNumber ());
177- opCtx->setLogicalSessionId (*entry.getSessionId ());
178- opCtx->setTxnNumber (*entry.getTxnNumber ());
179-
180- // The write on transaction table may be applied concurrently, so refreshing state
181- // from disk may read that write, causing starting a new transaction on an existing
182- // txnNumber. Thus, we start a new transaction without refreshing state from disk.
183- MongoDOperationContextSessionWithoutRefresh sessionCheckout (opCtx);
184-
185- auto transaction = TransactionParticipant::get (opCtx);
186- invariant (transaction);
187- transaction.unstashTransactionResources (opCtx, " commitTransaction" );
188- transaction.commitPreparedTransaction (
189- opCtx, *commitCommand.getCommitTimestamp (), commitOplogEntryOpTime);
190- return Status::OK ();
195+ MONGO_UNREACHABLE;
191196}
192197
193198Status applyAbortTransaction (OperationContext* opCtx,
194199 const OplogEntry& entry,
195200 repl::OplogApplication::Mode mode) {
196- // Return error if run via applyOps command.
197- uassert (50972 ,
198- " abortTransaction is only used internally by secondaries." ,
199- mode != repl::OplogApplication::Mode::kApplyOpsCmd );
200-
201- // We don't put transactions into the prepare state until the end of recovery and initial sync,
202- // so there is no transaction to abort.
203- if (mode == repl::OplogApplication::Mode::kRecovering ||
204- mode == repl::OplogApplication::Mode::kInitialSync ) {
205- return Status::OK ();
201+ switch (mode) {
202+ case repl::OplogApplication::Mode::kRecovering : {
203+ // We don't put transactions into the prepare state until the end of recovery,
204+ // so there is no transaction to abort.
205+ return Status::OK ();
206+ }
207+ case repl::OplogApplication::Mode::kInitialSync : {
208+ // We don't put transactions into the prepare state until the end of initial sync,
209+ // so there is no transaction to abort.
210+ return Status::OK ();
211+ }
212+ case repl::OplogApplication::Mode::kApplyOpsCmd : {
213+ // Return error if run via applyOps command.
214+ uasserted (50972 , " abortTransaction is only used internally by secondaries." );
215+ }
216+ case repl::OplogApplication::Mode::kSecondary : {
217+ // Transaction operations are in its own batch, so we can modify their opCtx.
218+ invariant (entry.getSessionId ());
219+ invariant (entry.getTxnNumber ());
220+ opCtx->setLogicalSessionId (*entry.getSessionId ());
221+ opCtx->setTxnNumber (*entry.getTxnNumber ());
222+
223+ // The write on transaction table may be applied concurrently, so refreshing state
224+ // from disk may read that write, causing starting a new transaction on an existing
225+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
226+ MongoDOperationContextSessionWithoutRefresh sessionCheckout (opCtx);
227+
228+ auto transaction = TransactionParticipant::get (opCtx);
229+ transaction.unstashTransactionResources (opCtx, " abortTransaction" );
230+ transaction.abortActiveTransaction (opCtx);
231+ return Status::OK ();
232+ }
206233 }
207-
208- invariant (mode == repl::OplogApplication::Mode::kSecondary );
209-
210- // Transaction operations are in its own batch, so we can modify their opCtx.
211- invariant (entry.getSessionId ());
212- invariant (entry.getTxnNumber ());
213- opCtx->setLogicalSessionId (*entry.getSessionId ());
214- opCtx->setTxnNumber (*entry.getTxnNumber ());
215- // The write on transaction table may be applied concurrently, so refreshing state
216- // from disk may read that write, causing starting a new transaction on an existing
217- // txnNumber. Thus, we start a new transaction without refreshing state from disk.
218- MongoDOperationContextSessionWithoutRefresh sessionCheckout (opCtx);
219-
220- auto transaction = TransactionParticipant::get (opCtx);
221- transaction.unstashTransactionResources (opCtx, " abortTransaction" );
222- transaction.abortActiveTransaction (opCtx);
223- return Status::OK ();
234+ MONGO_UNREACHABLE;
224235}
225236
226237repl::MultiApplier::Operations readTransactionOperationsFromOplogChain (
@@ -291,7 +302,7 @@ namespace {
291302 */
292303Status _applyPrepareTransaction (OperationContext* opCtx,
293304 const OplogEntry& entry,
294- repl::OplogApplication::Mode oplogApplicationMode ) {
305+ repl::OplogApplication::Mode mode ) {
295306
296307 // The operations here are reconstructed at their prepare time. However, that time will
297308 // be ignored because there is an outer write unit of work during their application.
@@ -301,8 +312,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
301312 return readTransactionOperationsFromOplogChain (opCtx, entry, {}, boost::none);
302313 }();
303314
304- if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
305- oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync ) {
315+ if (mode == repl::OplogApplication::Mode::kRecovering ||
316+ mode == repl::OplogApplication::Mode::kInitialSync ) {
306317 // We might replay a prepared transaction behind oldest timestamp. Note that since this is
307318 // scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly
308319 // abandons the storage transaction when it releases the global lock, this must be done
@@ -339,7 +350,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
339350 auto transaction = TransactionParticipant::get (opCtx);
340351 transaction.unstashTransactionResources (opCtx, " prepareTransaction" );
341352
342- auto status = _applyOperationsForTransaction (opCtx, ops, oplogApplicationMode );
353+ auto status = _applyOperationsForTransaction (opCtx, ops, mode );
343354 fassert (31137 , status);
344355
345356 if (MONGO_FAIL_POINT (applyOpsHangBeforePreparingTransaction)) {
@@ -355,16 +366,21 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
355366}
356367
357368/* *
358- * Apply a prepared transaction during recovery .
369+ * Apply a prepared transaction when we are reconstructing prepared transactions .
359370 */
360- Status applyRecoveredPrepareTransaction (OperationContext* opCtx,
361- const OplogEntry& entry,
362- repl::OplogApplication::Mode mode) {
363- // Snapshot transactions never conflict with the PBWM lock.
364- invariant (!opCtx->lockState ()->shouldConflictWithSecondaryBatchApplication ());
371+ void _reconstructPreparedTransaction (OperationContext* opCtx,
372+ const OplogEntry& prepareEntry,
373+ repl::OplogApplication::Mode mode) {
374+ repl::UnreplicatedWritesBlock uwb (opCtx);
375+
376+ // Snapshot transaction can never conflict with the PBWM lock.
377+ opCtx->lockState ()->setShouldConflictWithSecondaryBatchApplication (false );
378+
365379 // We might replay a prepared transaction behind oldest timestamp.
366380 opCtx->recoveryUnit ()->setRoundUpPreparedTimestamps (true );
367- return _applyPrepareTransaction (opCtx, entry, mode);
381+
382+ // Checks out the session, applies the operations and prepares the transaction.
383+ uassertStatusOK (_applyPrepareTransaction (opCtx, prepareEntry, mode));
368384}
369385} // namespace
370386
@@ -376,33 +392,37 @@ Status applyRecoveredPrepareTransaction(OperationContext* opCtx,
376392 */
377393Status applyPrepareTransaction (OperationContext* opCtx,
378394 const OplogEntry& entry,
379- repl::OplogApplication::Mode oplogApplicationMode ) {
380- // Don't apply the operations from the prepared transaction until either we see a commit
381- // transaction oplog entry during recovery or are at the end of recovery.
382- if (oplogApplicationMode == repl::OplogApplication::Mode:: kRecovering ) {
383- if (!serverGlobalParams. enableMajorityReadConcern ) {
384- error () << " Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
395+ repl::OplogApplication::Mode mode ) {
396+ switch (mode) {
397+ case repl::OplogApplication::Mode:: kRecovering : {
398+ if (!serverGlobalParams. enableMajorityReadConcern ) {
399+ error ()
400+ << " Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
385401 " set to false. Restart the server with --enableMajorityReadConcern=true "
386402 " to complete recovery." ;
387- }
388- fassert (51146 , serverGlobalParams.enableMajorityReadConcern );
389- return Status::OK ();
390- }
403+ fassertFailed (51146 );
404+ }
391405
392- // Don't apply the operations from the prepared transaction until either we see a commit
393- // transaction oplog entry during the oplog application phase of initial sync or are at the end
394- // of initial sync.
395- if (oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync ) {
396- return Status::OK ();
406+ // Don't apply the operations from the prepared transaction until either we see a commit
407+ // transaction oplog entry during recovery or are at the end of recovery.
408+ return Status::OK ();
409+ }
410+ case repl::OplogApplication::Mode::kInitialSync : {
411+ // Don't apply the operations from the prepared transaction until either we see a commit
412+ // transaction oplog entry during the oplog application phase of initial sync or are at
413+ // the end of initial sync.
414+ return Status::OK ();
415+ }
416+ case repl::OplogApplication::Mode::kApplyOpsCmd : {
417+ // Return error if run via applyOps command.
418+ uasserted (51145 ,
419+ " prepare applyOps oplog entry is only used internally by secondaries." );
420+ }
421+ case repl::OplogApplication::Mode::kSecondary : {
422+ return _applyPrepareTransaction (opCtx, entry, repl::OplogApplication::Mode::kSecondary );
423+ }
397424 }
398-
399- // Return error if run via applyOps command.
400- uassert (51145 ,
401- " prepare applyOps oplog entry is only used internally by secondaries." ,
402- oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd );
403-
404- invariant (oplogApplicationMode == repl::OplogApplication::Mode::kSecondary );
405- return _applyPrepareTransaction (opCtx, entry, oplogApplicationMode);
425+ MONGO_UNREACHABLE;
406426}
407427
408428void reconstructPreparedTransactions (OperationContext* opCtx, repl::OplogApplication::Mode mode) {
@@ -443,14 +463,7 @@ void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplica
443463 opCtx->getServiceContext ()->makeClient (" reconstruct-prepared-transactions" );
444464 AlternativeClientRegion acr (newClient);
445465 const auto newOpCtx = cc ().makeOperationContext ();
446- repl::UnreplicatedWritesBlock uwb (newOpCtx.get ());
447-
448- // Snapshot transaction can never conflict with the PBWM lock.
449- newOpCtx->lockState ()->setShouldConflictWithSecondaryBatchApplication (false );
450-
451- // Checks out the session, applies the operations and prepares the transaction.
452- uassertStatusOK (
453- applyRecoveredPrepareTransaction (newOpCtx.get (), prepareOplogEntry, mode));
466+ _reconstructPreparedTransaction (newOpCtx.get (), prepareOplogEntry, mode);
454467 }
455468 }
456469}
0 commit comments