Skip to content

Commit f12bb6c

Browse files
committed
feat: add compio runtime support
Implement runtime support for compio asynchronous framework with core components: - Add CompioRuntime implementation of AsyncRuntime trait - Implement MPSC channels, oneshot, watch, and mutex primitives - Integrate with GitHub Actions workflow for CI testing - Add std::time::Instant implementation of Instant trait
1 parent 9bc9885 commit f12bb6c

File tree

11 files changed

+569
-100
lines changed

11 files changed

+569
-100
lines changed

.github/workflows/ci.yaml

Lines changed: 74 additions & 100 deletions
Large diffs are not rendered by default.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,5 @@ exclude = [
7272
"examples/raft-kv-memstore-opendal-snapshot-data",
7373
"examples/raft-kv-rocksdb",
7474
"rt-monoio",
75+
"rt-compio"
7576
]

openraft/src/instant.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,9 @@ impl Instant for tokio::time::Instant {
6969
tokio::time::Instant::now()
7070
}
7171
}
72+
73+
impl Instant for std::time::Instant {
74+
fn now() -> Self {
75+
std::time::Instant::now()
76+
}
77+
}

rt-compio/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "openraft-rt-compio"
3+
description = "compio AsyncRuntime support for Openraft"
4+
documentation = "https://docs.rs/openraft-rt-compio"
5+
readme = "README.md"
6+
version = "0.10.0"
7+
edition = "2021"
8+
authors = ["Databend Authors <[email protected]>"]
9+
categories = ["algorithms", "asynchronous", "data-structures"]
10+
homepage = "https://github.com/databendlabs/openraft"
11+
keywords = ["consensus", "raft"]
12+
license = "MIT OR Apache-2.0"
13+
repository = "https://github.com/databendlabs/openraft"
14+
15+
[dependencies]
16+
openraft = { path = "../openraft", version = "0.10.0", default-features = false, features = ["singlethreaded"] }
17+
compio = { version = "0.14.0", features = ["runtime", "time"] }
18+
tokio = { version = "1.22", features = ["sync"], default-features = false }
19+
rand = "0.9"
20+
futures = "0.3"
21+
pin-project-lite = "0.2.16"
22+
23+
[dev-dependencies]
24+
compio = { version = "0.14.0", features = ["macros"] }

rt-compio/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# openraft-rt-compio
2+
3+
compio [`AsyncRuntime`][rt_link] support for Openraft.
4+
5+
[rt_link]: https://docs.rs/openraft/latest/openraft/async_runtime/trait.AsyncRuntime.html

rt-compio/src/lib.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use std::any::Any;
2+
use std::fmt::Debug;
3+
use std::fmt::Display;
4+
use std::fmt::Error;
5+
use std::fmt::Formatter;
6+
use std::future::Future;
7+
use std::pin::Pin;
8+
use std::task::Context;
9+
use std::task::Poll;
10+
11+
pub use compio;
12+
pub use futures;
13+
use futures::FutureExt;
14+
pub use openraft;
15+
use openraft::AsyncRuntime;
16+
use openraft::OptionalSend;
17+
pub use rand;
18+
use rand::rngs::ThreadRng;
19+
20+
mod mpsc;
21+
mod mpsc_unbounded;
22+
mod mutex;
23+
mod oneshot;
24+
mod watch;
25+
26+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27+
pub struct CompioRuntime;
28+
29+
#[derive(Debug)]
30+
pub struct CompioJoinError(Box<dyn Any + Send>);
31+
32+
impl Display for CompioJoinError {
33+
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
34+
write!(f, "Spawned task panicked")
35+
}
36+
}
37+
38+
pub struct CompioJoinHandle<T>(Option<compio::runtime::JoinHandle<T>>);
39+
40+
impl<T> Drop for CompioJoinHandle<T> {
41+
fn drop(&mut self) {
42+
let Some(j) = self.0.take() else {
43+
return;
44+
};
45+
j.detach();
46+
}
47+
}
48+
49+
impl<T> Future for CompioJoinHandle<T> {
50+
type Output = Result<T, CompioJoinError>;
51+
52+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
53+
let this = self.get_mut();
54+
let task = this.0.as_mut().expect("Task has been cancelled");
55+
match task.poll_unpin(cx) {
56+
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
57+
Poll::Ready(Err(e)) => Poll::Ready(Err(CompioJoinError(e))),
58+
Poll::Pending => Poll::Pending,
59+
}
60+
}
61+
}
62+
63+
pub type BoxedFuture<T> = Pin<Box<dyn Future<Output = T>>>;
64+
65+
pin_project_lite::pin_project! {
66+
pub struct CompioTimeout<F> {
67+
#[pin]
68+
future: F,
69+
delay: BoxedFuture<()>
70+
}
71+
}
72+
73+
impl<F: Future> Future for CompioTimeout<F> {
74+
type Output = Result<F::Output, compio::time::Elapsed>;
75+
76+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77+
let this = self.project();
78+
match this.delay.poll_unpin(cx) {
79+
Poll::Ready(()) => {
80+
// The delay has elapsed, so we return an error.
81+
Poll::Ready(Err(compio::time::Elapsed))
82+
}
83+
Poll::Pending => {
84+
// The delay has not yet elapsed, so we poll the future.
85+
match this.future.poll(cx) {
86+
Poll::Ready(v) => Poll::Ready(Ok(v)),
87+
Poll::Pending => Poll::Pending,
88+
}
89+
}
90+
}
91+
}
92+
}
93+
94+
impl AsyncRuntime for CompioRuntime {
95+
type JoinError = CompioJoinError;
96+
type JoinHandle<T: OptionalSend + 'static> = CompioJoinHandle<T>;
97+
type Sleep = BoxedFuture<()>;
98+
type Instant = std::time::Instant;
99+
type TimeoutError = compio::time::Elapsed;
100+
type Timeout<R, T: Future<Output = R> + OptionalSend> = CompioTimeout<T>;
101+
type ThreadLocalRng = ThreadRng;
102+
type Mpsc = mpsc::CompioMpsc;
103+
type MpscUnbounded = mpsc_unbounded::TokioMpscUnbounded;
104+
type Watch = watch::TokioWatch;
105+
type Oneshot = oneshot::FuturesOneshot;
106+
type Mutex<T: OptionalSend + 'static> = mutex::TokioMutex<T>;
107+
108+
fn spawn<T>(fut: T) -> Self::JoinHandle<T::Output>
109+
where
110+
T: Future + OptionalSend + 'static,
111+
T::Output: OptionalSend + 'static,
112+
{
113+
CompioJoinHandle(Some(compio::runtime::spawn(fut)))
114+
}
115+
116+
fn sleep(duration: std::time::Duration) -> Self::Sleep {
117+
Box::pin(compio::time::sleep(duration))
118+
}
119+
120+
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
121+
Box::pin(compio::time::sleep_until(deadline))
122+
}
123+
124+
fn timeout<R, F: Future<Output = R> + OptionalSend>(
125+
duration: std::time::Duration,
126+
future: F,
127+
) -> Self::Timeout<R, F> {
128+
let delay = Box::pin(compio::time::sleep(duration));
129+
CompioTimeout { future, delay }
130+
}
131+
132+
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
133+
let delay = Box::pin(compio::time::sleep_until(deadline));
134+
CompioTimeout { future, delay }
135+
}
136+
137+
fn is_panic(_: &Self::JoinError) -> bool {
138+
// Task only returns `JoinError` if the spawned future panics.
139+
true
140+
}
141+
142+
fn thread_rng() -> Self::ThreadLocalRng {
143+
rand::rng()
144+
}
145+
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use openraft::testing::runtime::Suite;
150+
151+
use super::*;
152+
153+
#[test]
154+
fn test_compio_rt() {
155+
let rt = compio::runtime::Runtime::new().unwrap();
156+
rt.block_on(Suite::<CompioRuntime>::test_all());
157+
}
158+
}

rt-compio/src/mpsc.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::future::Future;
2+
3+
use futures::TryFutureExt;
4+
use openraft::async_runtime::Mpsc;
5+
use openraft::async_runtime::MpscReceiver;
6+
use openraft::async_runtime::MpscSender;
7+
use openraft::async_runtime::MpscWeakSender;
8+
use openraft::async_runtime::SendError;
9+
use openraft::async_runtime::TryRecvError;
10+
use openraft::OptionalSend;
11+
use tokio::sync::mpsc as tokio_mpsc;
12+
13+
pub struct CompioMpsc;
14+
15+
pub struct CompioMpscSender<T>(tokio_mpsc::Sender<T>);
16+
17+
impl<T> Clone for CompioMpscSender<T> {
18+
#[inline]
19+
fn clone(&self) -> Self {
20+
Self(self.0.clone())
21+
}
22+
}
23+
24+
pub struct CompioMpscReceiver<T>(tokio_mpsc::Receiver<T>);
25+
26+
pub struct CompioMpscWeakSender<T>(tokio_mpsc::WeakSender<T>);
27+
28+
impl<T> Clone for CompioMpscWeakSender<T> {
29+
#[inline]
30+
fn clone(&self) -> Self {
31+
Self(self.0.clone())
32+
}
33+
}
34+
35+
impl Mpsc for CompioMpsc {
36+
type Sender<T: OptionalSend> = CompioMpscSender<T>;
37+
type Receiver<T: OptionalSend> = CompioMpscReceiver<T>;
38+
type WeakSender<T: OptionalSend> = CompioMpscWeakSender<T>;
39+
40+
#[inline]
41+
fn channel<T: OptionalSend>(buffer: usize) -> (Self::Sender<T>, Self::Receiver<T>) {
42+
let (tx, rx) = tokio_mpsc::channel(buffer);
43+
let tx_wrapper = CompioMpscSender(tx);
44+
let rx_wrapper = CompioMpscReceiver(rx);
45+
46+
(tx_wrapper, rx_wrapper)
47+
}
48+
}
49+
50+
impl<T> MpscSender<CompioMpsc, T> for CompioMpscSender<T>
51+
where T: OptionalSend
52+
{
53+
#[inline]
54+
fn send(&self, msg: T) -> impl Future<Output = Result<(), SendError<T>>> {
55+
self.0.send(msg).map_err(|e| SendError(e.0))
56+
}
57+
58+
#[inline]
59+
fn downgrade(&self) -> <CompioMpsc as Mpsc>::WeakSender<T> {
60+
let inner = self.0.downgrade();
61+
CompioMpscWeakSender(inner)
62+
}
63+
}
64+
65+
impl<T> MpscReceiver<T> for CompioMpscReceiver<T> {
66+
#[inline]
67+
fn recv(&mut self) -> impl Future<Output = Option<T>> {
68+
self.0.recv()
69+
}
70+
71+
#[inline]
72+
fn try_recv(&mut self) -> Result<T, TryRecvError> {
73+
self.0.try_recv().map_err(|e| match e {
74+
tokio_mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
75+
tokio_mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
76+
})
77+
}
78+
}
79+
80+
impl<T> MpscWeakSender<CompioMpsc, T> for CompioMpscWeakSender<T>
81+
where T: OptionalSend
82+
{
83+
#[inline]
84+
fn upgrade(&self) -> Option<<CompioMpsc as Mpsc>::Sender<T>> {
85+
self.0.upgrade().map(CompioMpscSender)
86+
}
87+
}

rt-compio/src/mpsc_unbounded.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Unbounded MPSC channel wrapper types and their trait impl.
2+
3+
use openraft::type_config::async_runtime::mpsc_unbounded;
4+
use openraft::OptionalSend;
5+
use tokio::sync::mpsc as tokio_mpsc;
6+
7+
pub struct TokioMpscUnbounded;
8+
9+
pub struct TokioMpscUnboundedSender<T>(tokio_mpsc::UnboundedSender<T>);
10+
11+
impl<T> Clone for TokioMpscUnboundedSender<T> {
12+
#[inline]
13+
fn clone(&self) -> Self {
14+
Self(self.0.clone())
15+
}
16+
}
17+
18+
pub struct TokioMpscUnboundedReceiver<T>(tokio_mpsc::UnboundedReceiver<T>);
19+
20+
pub struct TokioMpscUnboundedWeakSender<T>(tokio_mpsc::WeakUnboundedSender<T>);
21+
22+
impl<T> Clone for TokioMpscUnboundedWeakSender<T> {
23+
#[inline]
24+
fn clone(&self) -> Self {
25+
Self(self.0.clone())
26+
}
27+
}
28+
29+
impl mpsc_unbounded::MpscUnbounded for TokioMpscUnbounded {
30+
type Sender<T: OptionalSend> = TokioMpscUnboundedSender<T>;
31+
type Receiver<T: OptionalSend> = TokioMpscUnboundedReceiver<T>;
32+
type WeakSender<T: OptionalSend> = TokioMpscUnboundedWeakSender<T>;
33+
34+
#[inline]
35+
fn channel<T: OptionalSend>() -> (Self::Sender<T>, Self::Receiver<T>) {
36+
let (tx, rx) = tokio_mpsc::unbounded_channel();
37+
let tx_wrapper = TokioMpscUnboundedSender(tx);
38+
let rx_wrapper = TokioMpscUnboundedReceiver(rx);
39+
40+
(tx_wrapper, rx_wrapper)
41+
}
42+
}
43+
44+
impl<T> mpsc_unbounded::MpscUnboundedSender<TokioMpscUnbounded, T> for TokioMpscUnboundedSender<T>
45+
where T: OptionalSend
46+
{
47+
#[inline]
48+
fn send(&self, msg: T) -> Result<(), mpsc_unbounded::SendError<T>> {
49+
self.0.send(msg).map_err(|e| mpsc_unbounded::SendError(e.0))
50+
}
51+
52+
#[inline]
53+
fn downgrade(&self) -> <TokioMpscUnbounded as mpsc_unbounded::MpscUnbounded>::WeakSender<T> {
54+
let inner = self.0.downgrade();
55+
TokioMpscUnboundedWeakSender(inner)
56+
}
57+
}
58+
59+
impl<T> mpsc_unbounded::MpscUnboundedReceiver<T> for TokioMpscUnboundedReceiver<T> {
60+
#[inline]
61+
async fn recv(&mut self) -> Option<T> {
62+
self.0.recv().await
63+
}
64+
65+
#[inline]
66+
fn try_recv(&mut self) -> Result<T, mpsc_unbounded::TryRecvError> {
67+
self.0.try_recv().map_err(|e| match e {
68+
tokio_mpsc::error::TryRecvError::Empty => mpsc_unbounded::TryRecvError::Empty,
69+
tokio_mpsc::error::TryRecvError::Disconnected => mpsc_unbounded::TryRecvError::Disconnected,
70+
})
71+
}
72+
}
73+
74+
impl<T> mpsc_unbounded::MpscUnboundedWeakSender<TokioMpscUnbounded, T> for TokioMpscUnboundedWeakSender<T>
75+
where T: OptionalSend
76+
{
77+
#[inline]
78+
fn upgrade(&self) -> Option<<TokioMpscUnbounded as mpsc_unbounded::MpscUnbounded>::Sender<T>> {
79+
self.0.upgrade().map(TokioMpscUnboundedSender)
80+
}
81+
}

rt-compio/src/mutex.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::future::Future;
2+
3+
use openraft::type_config::async_runtime::mutex;
4+
use openraft::OptionalSend;
5+
6+
pub struct TokioMutex<T>(tokio::sync::Mutex<T>);
7+
8+
impl<T> mutex::Mutex<T> for TokioMutex<T>
9+
where T: OptionalSend + 'static
10+
{
11+
type Guard<'a> = tokio::sync::MutexGuard<'a, T>;
12+
13+
#[inline]
14+
fn new(value: T) -> Self {
15+
TokioMutex(tokio::sync::Mutex::new(value))
16+
}
17+
18+
#[inline]
19+
fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend {
20+
self.0.lock()
21+
}
22+
}

0 commit comments

Comments
 (0)