Skip to content

Commit 7216e4c

Browse files
George-Miaodrmingdrmer
authored andcommitted
feat: add compio runtime support
Implement runtime support for compio asynchronous framework with core components: - Add CompioRuntime implementation of AsyncRuntime trait - Implement MPSC channels, oneshot, watch, and mutex primitives - Integrate with GitHub Actions workflow for CI testing - Add std::time::Instant implementation of Instant trait
1 parent 9bc9885 commit 7216e4c

File tree

11 files changed

+570
-100
lines changed

11 files changed

+570
-100
lines changed

.github/workflows/ci.yaml

Lines changed: 74 additions & 100 deletions
Large diffs are not rendered by default.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,5 @@ exclude = [
7272
"examples/raft-kv-memstore-opendal-snapshot-data",
7373
"examples/raft-kv-rocksdb",
7474
"rt-monoio",
75+
"rt-compio"
7576
]

openraft/src/instant.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,9 @@ impl Instant for tokio::time::Instant {
6969
tokio::time::Instant::now()
7070
}
7171
}
72+
73+
impl Instant for std::time::Instant {
74+
fn now() -> Self {
75+
std::time::Instant::now()
76+
}
77+
}

rt-compio/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "openraft-rt-compio"
3+
description = "compio AsyncRuntime support for Openraft"
4+
documentation = "https://docs.rs/openraft-rt-compio"
5+
readme = "README.md"
6+
version = "0.10.0"
7+
edition = "2021"
8+
authors = ["Databend Authors <[email protected]>"]
9+
categories = ["algorithms", "asynchronous", "data-structures"]
10+
homepage = "https://github.com/databendlabs/openraft"
11+
keywords = ["consensus", "raft"]
12+
license = "MIT OR Apache-2.0"
13+
repository = "https://github.com/databendlabs/openraft"
14+
15+
[dependencies]
16+
openraft = { path = "../openraft", version = "0.10.0", default-features = false, features = ["singlethreaded"] }
17+
compio = { version = "0.14.0", features = ["runtime", "time"] }
18+
tokio = { version = "1.22", features = ["sync"], default-features = false }
19+
rand = "0.9"
20+
futures = "0.3"
21+
pin-project-lite = "0.2.16"
22+
23+
[dev-dependencies]
24+
compio = { version = "0.14.0", features = ["macros"] }

rt-compio/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# openraft-rt-compio
2+
3+
compio [`AsyncRuntime`][rt_link] support for Openraft.
4+
5+
[rt_link]: https://docs.rs/openraft/latest/openraft/async_runtime/trait.AsyncRuntime.html

rt-compio/src/lib.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::any::Any;
2+
use std::fmt::Debug;
3+
use std::fmt::Display;
4+
use std::fmt::Error;
5+
use std::fmt::Formatter;
6+
use std::future::Future;
7+
use std::pin::Pin;
8+
use std::task::Context;
9+
use std::task::Poll;
10+
11+
pub use compio;
12+
pub use futures;
13+
use futures::future::FusedFuture;
14+
use futures::FutureExt;
15+
pub use openraft;
16+
use openraft::AsyncRuntime;
17+
use openraft::OptionalSend;
18+
pub use rand;
19+
use rand::rngs::ThreadRng;
20+
21+
mod mpsc;
22+
mod mpsc_unbounded;
23+
mod mutex;
24+
mod oneshot;
25+
mod watch;
26+
27+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
28+
pub struct CompioRuntime;
29+
30+
#[derive(Debug)]
31+
pub struct CompioJoinError(Box<dyn Any + Send>);
32+
33+
impl Display for CompioJoinError {
34+
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
35+
write!(f, "Spawned task panicked")
36+
}
37+
}
38+
39+
pub struct CompioJoinHandle<T>(Option<compio::runtime::JoinHandle<T>>);
40+
41+
impl<T> Drop for CompioJoinHandle<T> {
42+
fn drop(&mut self) {
43+
let Some(j) = self.0.take() else {
44+
return;
45+
}
46+
j.detach();
47+
}
48+
}
49+
50+
impl<T> Future for CompioJoinHandle<T> {
51+
type Output = Result<T, CompioJoinError>;
52+
53+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54+
let this = self.get_mut();
55+
let task = this.0.as_mut().expect("Task has been cancelled");
56+
match task.poll_unpin(cx) {
57+
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
58+
Poll::Ready(Err(e)) => Poll::Ready(Err(CompioJoinError(e))),
59+
Poll::Pending => Poll::Pending,
60+
}
61+
}
62+
}
63+
64+
pub type BoxedFuture<T> = Pin<Box<dyn Future<Output = T>>>;
65+
66+
pin_project_lite::pin_project! {
67+
pub struct CompioTimeout<F> {
68+
#[pin]
69+
future: F,
70+
delay: BoxedFuture<()>
71+
}
72+
}
73+
74+
impl<F: Future> Future for CompioTimeout<F> {
75+
type Output = Result<F::Output, compio::time::Elapsed>;
76+
77+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78+
let this = self.project();
79+
match this.delay.poll_unpin(cx) {
80+
Poll::Ready(()) => {
81+
// The delay has elapsed, so we return an error.
82+
Poll::Ready(Err(compio::time::Elapsed))
83+
}
84+
Poll::Pending => {
85+
// The delay has not yet elapsed, so we poll the future.
86+
match this.future.poll(cx) {
87+
Poll::Ready(v) => Poll::Ready(Ok(v)),
88+
Poll::Pending => Poll::Pending,
89+
}
90+
}
91+
}
92+
}
93+
}
94+
95+
impl AsyncRuntime for CompioRuntime {
96+
type JoinError = CompioJoinError;
97+
type JoinHandle<T: OptionalSend + 'static> = CompioJoinHandle<T>;
98+
type Sleep = BoxedFuture<()>;
99+
type Instant = std::time::Instant;
100+
type TimeoutError = compio::time::Elapsed;
101+
type Timeout<R, T: Future<Output = R> + OptionalSend> = CompioTimeout<T>;
102+
type ThreadLocalRng = ThreadRng;
103+
type Mpsc = mpsc::CompioMpsc;
104+
type MpscUnbounded = mpsc_unbounded::TokioMpscUnbounded;
105+
type Watch = watch::TokioWatch;
106+
type Oneshot = oneshot::FuturesOneshot;
107+
type Mutex<T: OptionalSend + 'static> = mutex::TokioMutex<T>;
108+
109+
fn spawn<T>(fut: T) -> Self::JoinHandle<T::Output>
110+
where
111+
T: Future + OptionalSend + 'static,
112+
T::Output: OptionalSend + 'static,
113+
{
114+
CompioJoinHandle(Some(compio::runtime::spawn(fut)))
115+
}
116+
117+
fn sleep(duration: std::time::Duration) -> Self::Sleep {
118+
Box::pin(compio::time::sleep(duration))
119+
}
120+
121+
fn sleep_until(deadline: Self::Instant) -> Self::Sleep {
122+
Box::pin(compio::time::sleep_until(deadline))
123+
}
124+
125+
fn timeout<R, F: Future<Output = R> + OptionalSend>(
126+
duration: std::time::Duration,
127+
future: F,
128+
) -> Self::Timeout<R, F> {
129+
let delay = Box::pin(compio::time::sleep(duration));
130+
CompioTimeout { future, delay }
131+
}
132+
133+
fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F> {
134+
let delay = Box::pin(compio::time::sleep_until(deadline));
135+
CompioTimeout { future, delay }
136+
}
137+
138+
fn is_panic(_: &Self::JoinError) -> bool {
139+
// Task only returns `JoinError` if the spawned future panics.
140+
true
141+
}
142+
143+
fn thread_rng() -> Self::ThreadLocalRng {
144+
rand::rng()
145+
}
146+
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
use openraft::testing::runtime::Suite;
151+
152+
use super::*;
153+
154+
#[test]
155+
fn test_compio_rt() {
156+
let rt = compio::runtime::Runtime::new().unwrap();
157+
rt.block_on(Suite::<CompioRuntime>::test_all());
158+
}
159+
}

rt-compio/src/mpsc.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::future::Future;
2+
3+
use futures::TryFutureExt;
4+
use openraft::async_runtime::Mpsc;
5+
use openraft::async_runtime::MpscReceiver;
6+
use openraft::async_runtime::MpscSender;
7+
use openraft::async_runtime::MpscWeakSender;
8+
use openraft::async_runtime::SendError;
9+
use openraft::async_runtime::TryRecvError;
10+
use openraft::OptionalSend;
11+
use tokio::sync::mpsc as tokio_mpsc;
12+
13+
pub struct CompioMpsc;
14+
15+
pub struct CompioMpscSender<T>(tokio_mpsc::Sender<T>);
16+
17+
impl<T> Clone for CompioMpscSender<T> {
18+
#[inline]
19+
fn clone(&self) -> Self {
20+
Self(self.0.clone())
21+
}
22+
}
23+
24+
pub struct CompioMpscReceiver<T>(tokio_mpsc::Receiver<T>);
25+
26+
pub struct CompioMpscWeakSender<T>(tokio_mpsc::WeakSender<T>);
27+
28+
impl<T> Clone for CompioMpscWeakSender<T> {
29+
#[inline]
30+
fn clone(&self) -> Self {
31+
Self(self.0.clone())
32+
}
33+
}
34+
35+
impl Mpsc for CompioMpsc {
36+
type Sender<T: OptionalSend> = CompioMpscSender<T>;
37+
type Receiver<T: OptionalSend> = CompioMpscReceiver<T>;
38+
type WeakSender<T: OptionalSend> = CompioMpscWeakSender<T>;
39+
40+
#[inline]
41+
fn channel<T: OptionalSend>(buffer: usize) -> (Self::Sender<T>, Self::Receiver<T>) {
42+
let (tx, rx) = tokio_mpsc::channel(buffer);
43+
let tx_wrapper = CompioMpscSender(tx);
44+
let rx_wrapper = CompioMpscReceiver(rx);
45+
46+
(tx_wrapper, rx_wrapper)
47+
}
48+
}
49+
50+
impl<T> MpscSender<CompioMpsc, T> for CompioMpscSender<T>
51+
where T: OptionalSend
52+
{
53+
#[inline]
54+
fn send(&self, msg: T) -> impl Future<Output = Result<(), SendError<T>>> {
55+
self.0.send(msg).map_err(|e| SendError(e.0))
56+
}
57+
58+
#[inline]
59+
fn downgrade(&self) -> <CompioMpsc as Mpsc>::WeakSender<T> {
60+
let inner = self.0.downgrade();
61+
CompioMpscWeakSender(inner)
62+
}
63+
}
64+
65+
impl<T> MpscReceiver<T> for CompioMpscReceiver<T> {
66+
#[inline]
67+
fn recv(&mut self) -> impl Future<Output = Option<T>> {
68+
self.0.recv()
69+
}
70+
71+
#[inline]
72+
fn try_recv(&mut self) -> Result<T, TryRecvError> {
73+
self.0.try_recv().map_err(|e| match e {
74+
tokio_mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
75+
tokio_mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
76+
})
77+
}
78+
}
79+
80+
impl<T> MpscWeakSender<CompioMpsc, T> for CompioMpscWeakSender<T>
81+
where T: OptionalSend
82+
{
83+
#[inline]
84+
fn upgrade(&self) -> Option<<CompioMpsc as Mpsc>::Sender<T>> {
85+
self.0.upgrade().map(CompioMpscSender)
86+
}
87+
}

rt-compio/src/mpsc_unbounded.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Unbounded MPSC channel wrapper types and their trait impl.
2+
3+
use openraft::type_config::async_runtime::mpsc_unbounded;
4+
use openraft::OptionalSend;
5+
use tokio::sync::mpsc as tokio_mpsc;
6+
7+
pub struct TokioMpscUnbounded;
8+
9+
pub struct TokioMpscUnboundedSender<T>(tokio_mpsc::UnboundedSender<T>);
10+
11+
impl<T> Clone for TokioMpscUnboundedSender<T> {
12+
#[inline]
13+
fn clone(&self) -> Self {
14+
Self(self.0.clone())
15+
}
16+
}
17+
18+
pub struct TokioMpscUnboundedReceiver<T>(tokio_mpsc::UnboundedReceiver<T>);
19+
20+
pub struct TokioMpscUnboundedWeakSender<T>(tokio_mpsc::WeakUnboundedSender<T>);
21+
22+
impl<T> Clone for TokioMpscUnboundedWeakSender<T> {
23+
#[inline]
24+
fn clone(&self) -> Self {
25+
Self(self.0.clone())
26+
}
27+
}
28+
29+
impl mpsc_unbounded::MpscUnbounded for TokioMpscUnbounded {
30+
type Sender<T: OptionalSend> = TokioMpscUnboundedSender<T>;
31+
type Receiver<T: OptionalSend> = TokioMpscUnboundedReceiver<T>;
32+
type WeakSender<T: OptionalSend> = TokioMpscUnboundedWeakSender<T>;
33+
34+
#[inline]
35+
fn channel<T: OptionalSend>() -> (Self::Sender<T>, Self::Receiver<T>) {
36+
let (tx, rx) = tokio_mpsc::unbounded_channel();
37+
let tx_wrapper = TokioMpscUnboundedSender(tx);
38+
let rx_wrapper = TokioMpscUnboundedReceiver(rx);
39+
40+
(tx_wrapper, rx_wrapper)
41+
}
42+
}
43+
44+
impl<T> mpsc_unbounded::MpscUnboundedSender<TokioMpscUnbounded, T> for TokioMpscUnboundedSender<T>
45+
where T: OptionalSend
46+
{
47+
#[inline]
48+
fn send(&self, msg: T) -> Result<(), mpsc_unbounded::SendError<T>> {
49+
self.0.send(msg).map_err(|e| mpsc_unbounded::SendError(e.0))
50+
}
51+
52+
#[inline]
53+
fn downgrade(&self) -> <TokioMpscUnbounded as mpsc_unbounded::MpscUnbounded>::WeakSender<T> {
54+
let inner = self.0.downgrade();
55+
TokioMpscUnboundedWeakSender(inner)
56+
}
57+
}
58+
59+
impl<T> mpsc_unbounded::MpscUnboundedReceiver<T> for TokioMpscUnboundedReceiver<T> {
60+
#[inline]
61+
async fn recv(&mut self) -> Option<T> {
62+
self.0.recv().await
63+
}
64+
65+
#[inline]
66+
fn try_recv(&mut self) -> Result<T, mpsc_unbounded::TryRecvError> {
67+
self.0.try_recv().map_err(|e| match e {
68+
tokio_mpsc::error::TryRecvError::Empty => mpsc_unbounded::TryRecvError::Empty,
69+
tokio_mpsc::error::TryRecvError::Disconnected => mpsc_unbounded::TryRecvError::Disconnected,
70+
})
71+
}
72+
}
73+
74+
impl<T> mpsc_unbounded::MpscUnboundedWeakSender<TokioMpscUnbounded, T> for TokioMpscUnboundedWeakSender<T>
75+
where T: OptionalSend
76+
{
77+
#[inline]
78+
fn upgrade(&self) -> Option<<TokioMpscUnbounded as mpsc_unbounded::MpscUnbounded>::Sender<T>> {
79+
self.0.upgrade().map(TokioMpscUnboundedSender)
80+
}
81+
}

rt-compio/src/mutex.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::future::Future;
2+
3+
use openraft::type_config::async_runtime::mutex;
4+
use openraft::OptionalSend;
5+
6+
pub struct TokioMutex<T>(tokio::sync::Mutex<T>);
7+
8+
impl<T> mutex::Mutex<T> for TokioMutex<T>
9+
where T: OptionalSend + 'static
10+
{
11+
type Guard<'a> = tokio::sync::MutexGuard<'a, T>;
12+
13+
#[inline]
14+
fn new(value: T) -> Self {
15+
TokioMutex(tokio::sync::Mutex::new(value))
16+
}
17+
18+
#[inline]
19+
fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend {
20+
self.0.lock()
21+
}
22+
}

0 commit comments

Comments
 (0)