|
38 | 38 | #include "mongo/db/op_observer.h" |
39 | 39 | #include "mongo/db/operation_context.h" |
40 | 40 | #include "mongo/db/repl/repl_client_info.h" |
| 41 | +#include "mongo/db/repl/speculative_majority_read_info.h" |
41 | 42 | #include "mongo/db/s/sharding_state.h" |
42 | 43 | #include "mongo/db/server_options.h" |
43 | 44 | #include "mongo/db/server_parameters.h" |
@@ -295,6 +296,17 @@ MONGO_REGISTER_SHIM(waitForReadConcern) |
295 | 296 | // It is not used for atClusterTime because waitUntilOpTimeForRead handles waiting for |
296 | 297 | // the majority snapshot in that case. |
297 | 298 |
|
| 299 | + // Handle speculative majority reads. |
| 300 | + if (readConcernArgs.getMajorityReadMechanism() == |
| 301 | + repl::ReadConcernArgs::MajorityReadMechanism::kSpeculative) { |
| 302 | + // We read from a local snapshot, so there is no need to set an explicit read source. |
| 303 | + // Mark down that we need to block after the command is done to satisfy majority read |
| 304 | + // concern, though. |
| 305 | + auto& speculativeReadInfo = repl::SpeculativeMajorityReadInfo::get(opCtx); |
| 306 | + speculativeReadInfo.setIsSpeculativeRead(); |
| 307 | + return Status::OK(); |
| 308 | + } |
| 309 | + |
298 | 310 | const int debugLevel = serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 1 : 2; |
299 | 311 |
|
300 | 312 | LOG(debugLevel) << "Waiting for 'committed' snapshot to be available for reading: " |
@@ -365,4 +377,44 @@ MONGO_REGISTER_SHIM(waitForLinearizableReadConcern)(OperationContext* opCtx)->St |
365 | 377 | return awaitReplResult.status; |
366 | 378 | } |
367 | 379 |
|
| 380 | +MONGO_REGISTER_SHIM(waitForSpeculativeMajorityReadConcern) |
| 381 | +(OperationContext* opCtx, repl::SpeculativeMajorityReadInfo speculativeReadInfo)->Status { |
| 382 | + invariant(speculativeReadInfo.isSpeculativeRead()); |
| 383 | + |
| 384 | + // Select the optime to wait on. A command may have selected a specific optime to wait on. If |
| 385 | + // not, then we just wait on the most recent optime written on this node i.e. lastApplied. |
| 386 | + auto replCoord = repl::ReplicationCoordinator::get(opCtx); |
| 387 | + repl::OpTime waitOpTime; |
| 388 | + auto lastApplied = replCoord->getMyLastAppliedOpTime(); |
| 389 | + auto speculativeReadOpTime = speculativeReadInfo.getSpeculativeReadOpTime(); |
| 390 | + if (speculativeReadOpTime) { |
| 391 | + // The optime provided must not be greater than the current lastApplied. |
| 392 | + invariant(*speculativeReadOpTime <= lastApplied); |
| 393 | + waitOpTime = *speculativeReadOpTime; |
| 394 | + } else { |
| 395 | + waitOpTime = lastApplied; |
| 396 | + } |
| 397 | + |
| 398 | + // Block to make sure returned data is majority committed. |
| 399 | + LOG(1) << "Servicing speculative majority read, waiting for optime " << waitOpTime |
| 400 | + << " to become committed, current commit point: " << replCoord->getLastCommittedOpTime(); |
| 401 | + |
| 402 | + if (!opCtx->hasDeadline()) { |
| 403 | + // TODO (SERVER-38727): Replace this with a user specified timeout value, to address the |
| 404 | + // fact that getMore commands do not respect maxTimeMS properly. Currently, this hard-coded |
| 405 | + // value represents the maximum time we are ever willing to wait for an optime to majority |
| 406 | + // commit when doing a speculative majority read. We make this value rather conservative. |
| 407 | + auto timeout = Seconds(15); |
| 408 | + opCtx->setDeadlineAfterNowBy(timeout, ErrorCodes::MaxTimeMSExpired); |
| 409 | + } |
| 410 | + Timer t; |
| 411 | + auto waitStatus = replCoord->awaitOpTimeCommitted(opCtx, waitOpTime); |
| 412 | + if (waitStatus.isOK()) { |
| 413 | + LOG(1) << "Optime " << waitOpTime << " became majority committed, waited " << t.millis() |
| 414 | + << "ms for speculative majority read to be satisfied."; |
| 415 | + } |
| 416 | + return waitStatus; |
| 417 | +} |
| 418 | + |
| 419 | + |
368 | 420 | } // namespace mongo |
0 commit comments