Skip to content

feat: introduction to LeaseRead #1362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ guide/book
target
vendor
.idea
tests/_log

# File Ignores ###############################################################
**/*.rs.bk
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore-network-v2/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::BTreeMap;
use std::collections::BTreeSet;

use openraft::BasicNode;
use openraft::ReadPolicy;

use crate::app::App;
use crate::decode;
Expand All @@ -19,7 +20,7 @@ pub async fn write(app: &mut App, req: String) -> String {
pub async fn read(app: &mut App, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.ensure_linearizable().await;
let ret = app.raft.ensure_linearizable(ReadPolicy::ReadIndex).await;

let res = match ret {
Ok(_) => {
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore-opendal-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::BTreeMap;
use std::collections::BTreeSet;

use openraft::BasicNode;
use openraft::ReadPolicy;

use crate::app::App;
use crate::decode;
Expand All @@ -19,7 +20,7 @@ pub async fn write(app: &mut App, req: String) -> String {
pub async fn read(app: &mut App, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.ensure_linearizable().await;
let ret = app.raft.ensure_linearizable(ReadPolicy::ReadIndex).await;

let res = match ret {
Ok(_) => {
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore-singlethreaded/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::BTreeSet;

use openraft::error::Infallible;
use openraft::BasicNode;
use openraft::ReadPolicy;

use crate::app::App;
use crate::decode;
Expand All @@ -20,7 +21,7 @@ pub async fn write(app: &mut App, req: String) -> String {
pub async fn read(app: &mut App, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.ensure_linearizable().await;
let ret = app.raft.ensure_linearizable(ReadPolicy::ReadIndex).await;

let res = match ret {
Ok(_) => {
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-memstore/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix_web::Responder;
use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::error::RaftError;
use openraft::ReadPolicy;
use web::Json;

use crate::app::App;
Expand Down Expand Up @@ -38,7 +39,7 @@ pub async fn read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl R

#[post("/linearizable_read")]
pub async fn linearizable_read(app: Data<App>, req: Json<String>) -> actix_web::Result<impl Responder> {
let ret = app.raft.ensure_linearizable().await;
let ret = app.raft.ensure_linearizable(ReadPolicy::ReadIndex).await;

match ret {
Ok(_) => {
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-rocksdb/src/network/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use openraft::error::Infallible;
use openraft::ReadPolicy;
use tide::Body;
use tide::Request;
use tide::Response;
Expand Down Expand Up @@ -41,7 +42,7 @@ async fn read(mut req: Request<Arc<App>>) -> tide::Result {
}

async fn linearizable_read(mut req: Request<Arc<App>>) -> tide::Result {
let ret = req.state().raft.ensure_linearizable().await;
let ret = req.state().raft.ensure_linearizable(ReadPolicy::ReadIndex).await;

match ret {
Ok(_) => {
Expand Down
24 changes: 21 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use crate::raft::message::TransferLeaderRequest;
use crate::raft::responder::Responder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ReadPolicy;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::io_state::io_id::IOId;
Expand Down Expand Up @@ -246,7 +247,7 @@ where
// TODO: the second condition is such a read request can only read from state machine only when the last log it sees
// at `T1` is committed.
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(super) async fn handle_check_is_leader_request(&mut self, tx: ClientReadTx<C>) {
pub(super) async fn handle_check_is_leader_request(&mut self, read_policy: ReadPolicy, tx: ClientReadTx<C>) {
// Setup sentinel values to track when we've received majority confirmation of leadership.

let resp = {
Expand All @@ -268,6 +269,22 @@ where
(read_log_id, applied)
};

if read_policy == ReadPolicy::LeaseRead {
let now = C::now();
// Check if the lease is expired.
if let Some(last_quorum_acked_time) = self.last_quorum_acked_time() {
if now < last_quorum_acked_time + self.engine.config.timer_config.leader_lease {
let _ = tx.send(Ok(resp));
return;
}
}
tracing::debug!("{}: lease expired when do lease read", self.id);
// we may no longer leader so error out early
let err = ForwardToLeader::empty();
let _ = tx.send(Err(err.into()));
return;
}

let my_id = self.id.clone();
let my_vote = self.engine.state.vote_ref().clone();
let ttl = Duration::from_millis(self.config.heartbeat_interval);
Expand All @@ -276,6 +293,7 @@ where

let mut granted = btreeset! {my_id.clone()};

// single-node quorum, fast path, return quickly.
if eff_mem.is_quorum(granted.iter()) {
let _ = tx.send(Ok(resp));
return;
Expand Down Expand Up @@ -1172,8 +1190,8 @@ where
RaftMsg::InstallFullSnapshot { vote, snapshot, tx } => {
self.engine.handle_install_full_snapshot(vote, snapshot, tx);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
self.handle_check_is_leader_request(tx).await;
RaftMsg::CheckIsLeaderRequest { read_policy, tx } => {
self.handle_check_is_leader_request(read_policy, tx).await;
}
RaftMsg::ClientWriteRequest { app_data, tx } => {
self.write_entry(C::Entry::new_normal(LogIdOf::<C>::default(), app_data), Some(tx));
Expand Down
6 changes: 5 additions & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::error::Infallible;
use crate::error::InitializeError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ReadPolicy;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
Expand Down Expand Up @@ -73,6 +74,7 @@ where C: RaftTypeConfig
},

CheckIsLeaderRequest {
read_policy: ReadPolicy,
tx: ClientReadTx<C>,
},

Expand Down Expand Up @@ -130,7 +132,9 @@ where C: RaftTypeConfig
write!(f, "InstallFullSnapshot: vote: {}, snapshot: {}", vote, snapshot)
}
RaftMsg::ClientWriteRequest { .. } => write!(f, "ClientWriteRequest"),
RaftMsg::CheckIsLeaderRequest { .. } => write!(f, "CheckIsLeaderRequest"),
RaftMsg::CheckIsLeaderRequest { read_policy, .. } => {
write!(f, "CheckIsLeaderRequest with read policy: {}", read_policy)
}
RaftMsg::Initialize { members, .. } => {
// TODO: avoid using Debug
write!(f, "Initialize: {:?}", members)
Expand Down
18 changes: 13 additions & 5 deletions openraft/src/docs/protocol/read.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,37 @@ Openraft also use `read_log_id` instead of `read_index`.

## Ensuring linearizability

To ensure linearizability, read operations must perform a [`get_read_log_id()`] operation on the leader before proceeding.
To ensure linearizability, read operations must perform a [`get_read_log_id(read_policy)`] operation on the leader before proceeding.

This method confirms that this node is the leader at the time of invocation by sending heartbeats to a quorum of followers, and returns `(read_log_id, last_applied_log_id)`:
- `read_log_id` represents the log id up to which the state machine should apply to ensure a
linearizable read,
- `last_applied_log_id` is the last applied log id.

The policy can be one of:
- `ReadPolicy::ReadIndex`: Provides strongest consistency guarantees by confirming
leadership with a quorum before serving reads, but incurs higher latency due to network
communication.
- `ReadPolicy::LeaseRead`: Uses leadership lease to avoid network round-trips, providing
better performance but slightly weaker consistency guarantees (assumes minimal clock drift
between nodes).

The caller then wait for `last_applied_log_id` to catch up `read_log_id`, which can be done by subscribing to [`Raft::metrics`],
and at last, proceed with the state machine read.

The above steps are encapsulated in the [`ensure_linearizable()`] method.
The above steps are encapsulated in the [`ensure_linearizable(read_policy)`] method.

## Examples

```ignore
my_raft.ensure_linearizable().await?;
my_raft.ensure_linearizable(read_policy).await?;
proceed_with_state_machine_read();
```

The above snippet does the same as the following:

```ignore
let (read_log_id, applied) = self.get_read_log_id().await?;
let (read_log_id, applied) = self.get_read_log_id(read_policy).await?;

if read_log_id.index() > applied.index() {
self.wait(None).applied_index_at_least(read_log_id.index(), "").await?
Expand All @@ -60,7 +68,7 @@ the state machine contains all state upto `A`. Therefore, a linearizable read is
when `last_applied_log_id >= read_log_id`.


## Ensuring Linearizability with `read_index`
## Ensuring Linearizability with `read_index` or `lease_read`

And it is also legal by comparing `last_applied_log_id.index() >= read_log_id.index()`
due to the guarantee that committed logs will not be lost.
Expand Down
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub use crate::node::EmptyNode;
pub use crate::node::Node;
pub use crate::node::NodeId;
pub use crate::raft::Raft;
pub use crate::raft::ReadPolicy;
pub use crate::raft_state::MembershipState;
pub use crate::raft_state::RaftState;
pub use crate::raft_types::SnapshotId;
Expand Down
81 changes: 69 additions & 12 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::sync::Arc;
use std::time::Duration;

use core_state::CoreState;
use derive_more::Display;
pub use message::AppendEntriesRequest;
pub use message::AppendEntriesResponse;
pub use message::ClientWriteResponse;
Expand Down Expand Up @@ -192,6 +193,32 @@ macro_rules! declare_raft_types {
};
}

/// Policy that determines how to handle read operations in a Raft cluster.
///
/// This enum defines strategies for ensuring linearizable reads in distributed systems
/// while balancing between consistency guarantees and performance.
#[derive(Clone, Debug, Display, PartialEq, Eq)]
pub enum ReadPolicy {
/// Uses leader lease to avoid network round-trips for read operations.
///
/// With `LeaseRead`, the leader can serve reads locally without contacting followers
/// as long as it believes its leadership lease is still valid. This provides better
/// performance compared to `ReadIndex` but assumes clock drift between nodes is negligible.
///
/// Note: This offers slightly weaker consistency guarantees than `ReadIndex` in exchange
/// for lower latency.
LeaseRead,

/// Implements the ReadIndex protocol to ensure linearizable reads.
///
/// With `ReadIndex`, the leader confirms its leadership status by contacting a quorum
/// of followers before serving read requests. This ensures strong consistency but incurs
/// the cost of network communication for each read operation.
///
/// This is the safer option that provides the strongest consistency guarantees.
ReadIndex,
}

/// The Raft API.
///
/// This type implements the full Raft spec, and is the interface to a running Raft node.
Expand Down Expand Up @@ -515,19 +542,32 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self))]
pub async fn is_leader(&self) -> Result<(), RaftError<C, CheckIsLeaderError<C>>> {
let (tx, rx) = C::oneshot();
let _ = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
let _ = self
.inner
.call_core(
RaftMsg::CheckIsLeaderRequest {
read_policy: ReadPolicy::ReadIndex,
tx,
},
rx,
)
.await?;
Ok(())
}

/// Ensures a read operation performed following this method are linearizable across the
/// cluster.
/// Ensures reads performed after this method are linearizable across the cluster
/// using an explicitly provided policy.
///
/// This method is just a shorthand for calling [`get_read_log_id()`](Raft::get_read_log_id) and
/// then calling [Raft::wait].
///
/// This method confirms the node's leadership at the time of invocation by sending
/// heartbeats to a quorum of followers, and the state machine is up to date.
/// This method blocks until all these conditions are met.
/// The policy can be one of:
/// - `ReadPolicy::ReadIndex`: Provides strongest consistency guarantees by confirming
/// leadership with a quorum before serving reads, but incurs higher latency due to network
/// communication.
/// - `ReadPolicy::LeaseRead`: Uses leadership lease to avoid network round-trips, providing
/// better performance but slightly weaker consistency guarantees (assumes minimal clock drift
/// between nodes).
///
/// Returns:
/// - `Ok(read_log_id)` on successful confirmation that the node is the leader. `read_log_id`
Expand All @@ -538,13 +578,19 @@ where C: RaftTypeConfig
///
/// # Examples
/// ```ignore
/// my_raft.ensure_linearizable().await?;
/// // Proceed with the state machine read
/// // Use a strict policy for this specific critical read
/// my_raft.ensure_linearizable_with_policy(ReadPolicy::ReadIndex).await?;
/// // Or use a more performant policy when consistency requirements are less strict
/// my_raft.ensure_linearizable_with_policy(ReadPolicy::LeaseRead).await?;
/// // Then proceed with the state machine read
/// ```
/// Read more about how it works: [Read Operation](crate::docs::protocol::read)
#[tracing::instrument(level = "debug", skip(self))]
pub async fn ensure_linearizable(&self) -> Result<Option<LogIdOf<C>>, RaftError<C, CheckIsLeaderError<C>>> {
let (read_log_id, applied) = self.get_read_log_id().await?;
pub async fn ensure_linearizable(
&self,
read_policy: ReadPolicy,
) -> Result<Option<LogIdOf<C>>, RaftError<C, CheckIsLeaderError<C>>> {
let (read_log_id, applied) = self.get_read_log_id(read_policy).await?;

if read_log_id.index() > applied.index() {
self.wait(None)
Expand All @@ -561,12 +607,21 @@ where C: RaftTypeConfig
}

/// Ensures this node is leader and returns the log id up to which the state machine should
/// apply to ensure a read can be linearizable across the cluster.
/// apply to ensure a read can be linearizable across the cluster using an explicitly provided
/// policy.
///
/// The leadership is ensured by sending heartbeats to a quorum of followers.
/// Note that this is just the first step for linearizable read. The second step is to wait for
/// state machine to reach the returned `read_log_id`.
///
/// The policy can be one of:
/// - `ReadPolicy::ReadIndex`: Provides strongest consistency guarantees by confirming
/// leadership with a quorum before serving reads, but incurs higher latency due to network
/// communication.
/// - `ReadPolicy::LeaseRead`: Uses leadership lease to avoid network round-trips, providing
/// better performance but slightly weaker consistency guarantees (assumes minimal clock drift
/// between nodes).
///
/// Returns:
/// - `Ok((read_log_id, last_applied_log_id))` on successful confirmation that the node is the
/// leader. `read_log_id` represents the log id up to which the state machine should apply to
Expand Down Expand Up @@ -595,9 +650,11 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_read_log_id(
&self,
read_policy: ReadPolicy,
) -> Result<(Option<LogIdOf<C>>, Option<LogIdOf<C>>), RaftError<C, CheckIsLeaderError<C>>> {
let (tx, rx) = C::oneshot();
let (read_log_id, applied) = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
let (read_log_id, applied) =
self.inner.call_core(RaftMsg::CheckIsLeaderRequest { read_policy, tx }, rx).await?;
Ok((read_log_id, applied))
}

Expand Down
Loading
Loading