Skip to content

refactor: no more C::Responder = OneshotResponder for change_membership() #1368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::error::InitializeError;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
use crate::impls::OneshotResponder;
use crate::log_id::option_raft_log_id_ext::OptionRaftLogIdExt;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::RaftDataMetrics;
Expand All @@ -66,6 +67,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::message::TransferLeaderRequest;
use crate::raft::responder::either::OneshotOrUserDefined;
use crate::raft::responder::Responder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
Expand Down Expand Up @@ -159,7 +161,7 @@ where
pub(crate) engine: Engine<C>,

/// Channels to send result back to client when logs are applied.
pub(crate) client_resp_channels: BTreeMap<u64, ResponderOf<C>>,
pub(crate) client_resp_channels: BTreeMap<u64, OneshotOrUserDefined<C>>,

/// A mapping of node IDs the replication state of the target node.
pub(crate) replications: BTreeMap<C::NodeId, ReplicationHandle<C>>,
Expand Down Expand Up @@ -442,7 +444,7 @@ where
// membership logs. And it does not need to wait for the previous membership log to commit
// to propose the new membership log.
#[tracing::instrument(level = "debug", skip(self, tx))]
pub(super) fn change_membership(&mut self, changes: ChangeMembers<C>, retain: bool, tx: ResponderOf<C>) {
pub(super) fn change_membership(&mut self, changes: ChangeMembers<C>, retain: bool, tx: OneshotResponder<C>) {
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
let new_membership = match res {
Ok(x) => x,
Expand All @@ -453,7 +455,7 @@ where
};

let ent = C::Entry::new_membership(LogIdOf::<C>::default(), new_membership);
self.write_entry(ent, Some(tx));
self.write_entry(ent, Some(OneshotOrUserDefined::Oneshot(tx)));
}

/// Write a log entry to the cluster through raft protocol.
Expand All @@ -463,8 +465,14 @@ where
///
/// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`.
/// The calling side may not receive a result from `resp_tx`, if raft is shut down.
///
/// The responder `R` for `resp_tx` is either [`C::Responder`] (application-defined) or
/// [`OneshotResponder`] (general-purpose); the former is for application-defined
/// entries like user data, the latter is for membership configuration changes.
///
/// [`OneshotResponder`]: crate::raft::responder::OneshotResponder
#[tracing::instrument(level = "debug", skip_all, fields(id = display(&self.id)))]
pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ResponderOf<C>>) {
pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<OneshotOrUserDefined<C>>) {
tracing::debug!(payload = display(&entry), "write_entry");

let Some((mut lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) else {
Expand Down Expand Up @@ -497,7 +505,7 @@ where
pub(crate) fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool {
tracing::debug!(now = display(C::now().display()), "send_heartbeat");

let Some((mut lh, _)) = self.engine.get_leader_handler_or_reject(None) else {
let Some((mut lh, _)) = self.engine.get_leader_handler_or_reject(None::<ResponderOf<C>>) else {
tracing::debug!(
now = display(C::now().display()),
"{} failed to send heartbeat, not a Leader",
Expand Down Expand Up @@ -1194,7 +1202,10 @@ where
self.handle_check_is_leader_request(read_policy, tx).await;
}
RaftMsg::ClientWriteRequest { app_data, tx } => {
self.write_entry(C::Entry::new_normal(LogIdOf::<C>::default(), app_data), Some(tx));
self.write_entry(
C::Entry::new_normal(LogIdOf::<C>::default(), app_data),
Some(OneshotOrUserDefined::UserDefined(tx)),
);
}
RaftMsg::Initialize { members, tx } => {
tracing::info!(
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::core::raft_msg::external_command::ExternalCommand;
use crate::error::CheckIsLeaderError;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::impls::OneshotResponder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ReadPolicy;
Expand Down Expand Up @@ -90,7 +91,7 @@ where C: RaftTypeConfig
/// config will be converted into learners, otherwise they will be removed.
retain: bool,

tx: ResponderOf<C>,
tx: OneshotResponder<C>,
},

ExternalCoreRequest {
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::fmt::Debug;
use std::fmt::Formatter;

use crate::base::BoxAny;
use crate::raft::responder::either::OneshotOrUserDefined;
use crate::raft_state::IOId;
use crate::storage::Snapshot;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::RaftTypeConfig;

Expand Down Expand Up @@ -45,7 +45,7 @@ where C: RaftTypeConfig
/// The last log id to apply, inclusive.
last: LogIdOf<C>,

client_resp_channels: BTreeMap<u64, ResponderOf<C>>,
client_resp_channels: BTreeMap<u64, OneshotOrUserDefined<C>>,
},

/// Apply a custom function to the state machine.
Expand Down Expand Up @@ -83,7 +83,7 @@ where C: RaftTypeConfig
pub(crate) fn apply(
first: LogIdOf<C>,
last: LogIdOf<C>,
client_resp_channels: BTreeMap<u64, ResponderOf<C>>,
client_resp_channels: BTreeMap<u64, OneshotOrUserDefined<C>>,
) -> Self {
Command::Apply {
first,
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySliceExt;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::raft::responder::either::OneshotOrUserDefined;
use crate::raft::responder::Responder;
use crate::raft::ClientWriteResponse;
#[cfg(doc)]
Expand All @@ -28,7 +29,6 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
Expand Down Expand Up @@ -173,7 +173,7 @@ where
&mut self,
first: LogIdOf<C>,
last: LogIdOf<C>,
client_resp_channels: &mut BTreeMap<u64, ResponderOf<C>>,
client_resp_channels: &mut BTreeMap<u64, OneshotOrUserDefined<C>>,
) -> Result<ApplyResult<C>, StorageError<C>> {
// TODO: prepare response before apply,
// so that an Entry does not need to be Clone,
Expand Down
16 changes: 10 additions & 6 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,16 @@ where C: RaftTypeConfig
/// Get a LeaderHandler for handling leader's operation. If it is not a leader, it sends back a
/// ForwardToLeader error through the tx.
///
/// If tx is None, no response will be sent.
/// If `tx` is None, no response will be sent.
///
/// The `tx` is a [`Responder`] instance, but it does not have to be the [`C::Responder`].
/// The generic `R` allows any responder type to be used, while [`C::Responder`] is specifically
/// designed for client write operations.
///
/// [`C::Responder`]: RaftTypeConfig::Responder
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn get_leader_handler_or_reject(
&mut self,
tx: Option<ResponderOf<C>>,
) -> Option<(LeaderHandler<C>, Option<ResponderOf<C>>)> {
pub(crate) fn get_leader_handler_or_reject<R>(&mut self, tx: Option<R>) -> Option<(LeaderHandler<C>, Option<R>)>
where R: Responder<C> {
let res = self.leader_handler();
let forward_err = match res {
Ok(lh) => {
Expand Down Expand Up @@ -609,7 +613,7 @@ where C: RaftTypeConfig
pub(crate) fn trigger_transfer_leader(&mut self, to: C::NodeId) {
tracing::info!(to = display(&to), "{}", func_name!());

let Some((mut lh, _)) = self.get_leader_handler_or_reject(None) else {
let Some((mut lh, _)) = self.get_leader_handler_or_reject(None::<ResponderOf<C>>) else {
tracing::info!(
to = display(to),
"{}: this node is not a Leader, ignore transfer Leader",
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft/api/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::ReadPolicy;

/// Provides application-facing APIs for interacting with the Raft system.
///
/// This struct contains methods for client operations such as linearizable reads
/// and writes.
#[since(version = "0.10.0")]
pub(crate) struct AppApi<'a, C>
where C: RaftTypeConfig
Expand Down
14 changes: 6 additions & 8 deletions openraft/src/raft/api/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use crate::LogIdOptionExt;
use crate::RaftMetrics;
use crate::RaftTypeConfig;

/// Provides management APIs for the Raft system.
///
/// This struct contains methods for managing the Raft cluster, including
/// membership changes and node additions.
#[since(version = "0.10.0")]
pub(crate) struct ManagementApi<'a, C>
where C: RaftTypeConfig
Expand Down Expand Up @@ -57,10 +61,7 @@ where C: RaftTypeConfig
&self,
members: impl Into<ChangeMembers<C>>,
retain: bool,
) -> Result<ClientWriteResult<C>, Fatal<C>>
where
C: RaftTypeConfig<Responder = OneshotResponder<C>>,
{
) -> Result<ClientWriteResult<C>, Fatal<C>> {
let changes: ChangeMembers<C> = members.into();

tracing::info!(
Expand Down Expand Up @@ -127,10 +128,7 @@ where C: RaftTypeConfig
id: C::NodeId,
node: C::Node,
blocking: bool,
) -> Result<ClientWriteResult<C>, Fatal<C>>
where
C: RaftTypeConfig<Responder = OneshotResponder<C>>,
{
) -> Result<ClientWriteResult<C>, Fatal<C>> {
let (tx, rx) = oneshot_channel::<C>();

let msg = RaftMsg::ChangeMembership {
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/raft/impl_raft_blocking_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use crate::error::into_raft_result::IntoRaftResult;
use crate::error::ClientWriteError;
use crate::error::RaftError;
use crate::raft::responder::OneshotResponder;
use crate::raft::ClientWriteResponse;
#[cfg(doc)]
use crate::raft::ManagementApi;
Expand All @@ -16,7 +15,7 @@ use crate::RaftTypeConfig;
/// Implement blocking mode write operations those reply on oneshot channel for communication
/// between Raft core and client.
impl<C> Raft<C>
where C: RaftTypeConfig<Responder = OneshotResponder<C>>
where C: RaftTypeConfig
{
/// Propose a cluster configuration change.
///
Expand Down Expand Up @@ -67,7 +66,7 @@ where C: RaftTypeConfig<Responder = OneshotResponder<C>>
///
/// If the node to add is already a voter or learner, it will still re-add it.
///
/// A `node` is able to store the network address of a node. Thus an application does not
/// A `node` is able to store the network address of a node. Thus, an application does not
/// need another store for mapping node-id to ip-addr when implementing the RaftNetwork.
#[tracing::instrument(level = "debug", skip(self, id), fields(target=display(&id)))]
pub async fn add_learner(
Expand Down
32 changes: 32 additions & 0 deletions openraft/src/raft/responder/either.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::impls::OneshotResponder;
use crate::raft::responder::Responder;
use crate::raft::ClientWriteResult;
use crate::RaftTypeConfig;

/// Either an oneshot responder or a user-defined responder.
///
/// It is used in RaftCore to enqueue responder to client.
pub(crate) enum OneshotOrUserDefined<C>
where C: RaftTypeConfig
{
Oneshot(OneshotResponder<C>),
UserDefined(C::Responder),
}

impl<C> Responder<C> for OneshotOrUserDefined<C>
where C: RaftTypeConfig
{
fn send(self, res: ClientWriteResult<C>) {
match self {
Self::Oneshot(responder) => responder.send(res),
Self::UserDefined(responder) => responder.send(res),
}
}

type Receiver = ();

fn from_app_data(_app_data: <C as RaftTypeConfig>::D) -> (<C as RaftTypeConfig>::D, Self, Self::Receiver)
where Self: Sized {
unimplemented!("OneshotOrUserDefined is just a wrapper and does not support building from app_data")
}
}
1 change: 1 addition & 0 deletions openraft/src/raft/responder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! API to consumer a response when a client write request is completed.

pub(crate) mod either;
pub(crate) mod impls;
pub use impls::OneshotResponder;

Expand Down
Loading