Skip to content

Commit b727b77

Browse files
committed
Merge branch 'master' into mpmc
# Conflicts: # src/async.rs # src/lib.rs
2 parents 39092e2 + 619e25d commit b727b77

File tree

3 files changed

+82
-30
lines changed

3 files changed

+82
-30
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ readme = "README.md"
1212

1313
[features]
1414
select = []
15-
async = []
15+
async = ["futures"]
1616
default = ["select", "async"]
1717

1818
[dependencies]
1919
spin = "0.5"
20+
futures = { version = "^0.3", default-features = false, optional = true }
2021

2122
[dev-dependencies]
2223
crossbeam-channel = "0.4"

src/async.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,26 @@ use std::{
66
task::{Context, Poll},
77
};
88
use crate::*;
9+
use futures::{Stream, stream::FusedStream, future::FusedFuture};
910

10-
/// A future used to receive a value from the channel.
11-
pub struct RecvFuture<'a, T> {
12-
recv: &'a mut Receiver<T>,
13-
}
14-
15-
impl<'a, T> RecvFuture<'a, T> {
16-
pub(crate) fn new(recv: &mut Receiver<T>) -> RecvFuture<T> {
17-
RecvFuture { recv }
18-
}
19-
}
20-
21-
impl<'a, T> Future for RecvFuture<'a, T> {
22-
type Output = Result<T, RecvError>;
23-
24-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
25-
// On success, set the waker to none to avoid it being woken again in case that is wrong
26-
// TODO: `poll_recv` instead to prevent even spinning?
27-
let mut buf = self.recv.buffer.borrow_mut();
11+
impl<T> Receiver<T> {
12+
#[inline]
13+
fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
14+
let mut buf = self.buffer.borrow_mut();
2815

2916
let res = if let Some(msg) = buf.pop_front() {
3017
return Poll::Ready(Ok(msg));
3118
} else {
3219
self
33-
.recv
3420
.shared
3521
.poll_inner()
3622
.map(|inner| self
37-
.recv
3823
.shared
3924
.try_recv(
4025
move || inner,
4126
&mut buf,
42-
self.recv.mpmc_mode.get()
27+
&self.finished,
28+
self.mpmc_mode.get(),
4329
)
4430
)
4531
};
@@ -62,3 +48,46 @@ impl<'a, T> Future for RecvFuture<'a, T> {
6248
poll
6349
}
6450
}
51+
52+
/// A future used to receive a value from the channel.
53+
pub struct RecvFuture<'a, T> {
54+
recv: &'a mut Receiver<T>,
55+
}
56+
57+
impl<'a, T> RecvFuture<'a, T> {
58+
pub(crate) fn new(recv: &mut Receiver<T>) -> RecvFuture<T> {
59+
RecvFuture { recv }
60+
}
61+
}
62+
63+
impl<'a, T> Future for RecvFuture<'a, T> {
64+
type Output = Result<T, RecvError>;
65+
66+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67+
self.recv.poll(cx)
68+
}
69+
}
70+
71+
impl<'a, T> FusedFuture for RecvFuture<'a, T> {
72+
fn is_terminated(&self) -> bool {
73+
self.recv.finished.get()
74+
}
75+
}
76+
77+
impl<T> Stream for Receiver<T> {
78+
type Item = T;
79+
80+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81+
self.poll(cx).map(|ready| ready.ok())
82+
}
83+
84+
fn size_hint(&self) -> (usize, Option<usize>) {
85+
(self.buffer.borrow().len(), None)
86+
}
87+
}
88+
89+
impl<T> FusedStream for Receiver<T> {
90+
fn is_terminated(&self) -> bool {
91+
self.finished.get()
92+
}
93+
}

src/lib.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ impl<T> Shared<T> {
252252
}
253253

254254
#[inline]
255+
#[cfg(feature = "async")]
255256
fn poll_inner(&self) -> Option<MutexGuard<'_, Inner<T>>> {
256257
#[cfg(windows)] { self.inner.try_lock().ok() }
257258
#[cfg(not(windows))] { self.inner.try_lock() }
@@ -356,6 +357,7 @@ impl<T> Shared<T> {
356357
&'a self,
357358
take_inner: impl FnOnce() -> MutexGuard<'a, Inner<T>>,
358359
buf: &mut VecDeque<T>,
360+
finished: &Cell<bool>,
359361
mpmc_mode: bool,
360362
) -> Result<T, (MutexGuard<Inner<T>>, TryRecvError)> {
361363
// Eagerly check the buffer
@@ -374,8 +376,10 @@ impl<T> Shared<T> {
374376
msg
375377
},
376378
// If there's nothing more in the queue, this might be because there are no senders
377-
None if inner.sender_count == 0 =>
378-
return Err((inner, TryRecvError::Disconnected)),
379+
None if inner.sender_count == 0 => {
380+
finished.set(true);
381+
return Err((inner, TryRecvError::Disconnected));
382+
},
379383
None => return Err((inner, TryRecvError::Empty)),
380384
};
381385

@@ -412,13 +416,14 @@ impl<T> Shared<T> {
412416
fn recv(
413417
&self,
414418
buf: &mut VecDeque<T>,
419+
finished: &Cell<bool>,
415420
mpmc_mode: bool,
416421
) -> Result<T, RecvError> {
417422
loop {
418423
// Attempt to receive a message
419424
let mut i = 0;
420425
let inner = loop {
421-
match self.try_recv(|| self.wait_inner(), buf, mpmc_mode) {
426+
match self.try_recv(|| self.wait_inner(), buf, finished, mpmc_mode) {
422427
Ok(msg) => return Ok(msg),
423428
Err((_, TryRecvError::Disconnected)) => return Err(RecvError::Disconnected),
424429
Err((inner, TryRecvError::Empty)) if i == 3 => break inner,
@@ -439,10 +444,11 @@ impl<T> Shared<T> {
439444
&self,
440445
deadline: Instant,
441446
buf: &mut VecDeque<T>,
447+
finished: &Cell<bool>,
442448
mpmc_mode: bool,
443449
) -> Result<T, RecvTimeoutError> {
444450
// Attempt a speculative recv. If we are lucky there might be a message in the queue!
445-
let mut inner = match self.try_recv(|| self.wait_inner(), buf, mpmc_mode) {
451+
let mut inner = match self.try_recv(|| self.wait_inner(), buf, finished, mpmc_mode) {
446452
Ok(msg) => return Ok(msg),
447453
Err((_, TryRecvError::Disconnected)) => return Err(RecvTimeoutError::Disconnected),
448454
Err((inner, TryRecvError::Empty)) => inner,
@@ -468,7 +474,7 @@ impl<T> Shared<T> {
468474
}
469475

470476
// Attempt to receive a message from the queue
471-
inner = match self.try_recv(|| self.wait_inner(), buf, mpmc_mode) {
477+
inner = match self.try_recv(|| self.wait_inner(), buf, finished, mpmc_mode) {
472478
Ok(msg) => return Ok(msg),
473479
Err((inner, TryRecvError::Empty)) => inner,
474480
Err((_, TryRecvError::Disconnected)) => return Err(RecvTimeoutError::Disconnected),
@@ -555,13 +561,15 @@ pub struct Receiver<T> {
555561
mpmc_mode: Cell<bool>,
556562
/// Buffer for messages (only uses when mpmc_mode is false)
557563
buffer: RefCell<VecDeque<T>>,
564+
/// Whether all receivers have disconnected and there are no messages in any buffer
565+
finished: Cell<bool>,
558566
}
559567

560568
impl<T> Receiver<T> {
561569
/// Wait for an incoming value from the channel associated with this receiver, returning an
562570
/// error if all channel senders have been dropped.
563571
pub fn recv(&self) -> Result<T, RecvError> {
564-
self.shared.recv(&mut self.buffer.borrow_mut(), self.mpmc_mode.get())
572+
self.shared.recv(&mut self.buffer.borrow_mut(), &self.finished, self.mpmc_mode.get())
565573
}
566574

567575
/// Wait for an incoming value from the channel associated with this receiver, returning an
@@ -570,14 +578,20 @@ impl<T> Receiver<T> {
570578
self.shared.recv_deadline(
571579
Instant::now().checked_add(timeout).unwrap(),
572580
&mut self.buffer.borrow_mut(),
581+
&self.finished,
573582
self.mpmc_mode.get(),
574583
)
575584
}
576585

577586
/// Wait for an incoming value from the channel associated with this receiver, returning an
578587
/// error if all channel senders have been dropped or the deadline has passed.
579588
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
580-
self.shared.recv_deadline(deadline, &mut self.buffer.borrow_mut(), self.mpmc_mode.get())
589+
self.shared.recv_deadline(
590+
deadline,
591+
&mut self.buffer.borrow_mut(),
592+
&self.finished,
593+
self.mpmc_mode.get()
594+
)
581595
}
582596

583597
// Takes `&mut self` to avoid >1 task waiting on this channel
@@ -594,7 +608,12 @@ impl<T> Receiver<T> {
594608
pub fn try_recv(&self) -> Result<T, TryRecvError> {
595609
self
596610
.shared
597-
.try_recv(|| self.shared.wait_inner(), &mut self.buffer.borrow_mut(), self.mpmc_mode.get())
611+
.try_recv(
612+
|| self.shared.wait_inner(),
613+
&mut self.buffer.borrow_mut(),
614+
&self.finished,
615+
self.mpmc_mode.get()
616+
)
598617
.map_err(|(_, err)| err)
599618
}
600619

@@ -645,6 +664,7 @@ impl<T> Clone for Receiver<T> {
645664
shared: self.shared.clone(),
646665
mpmc_mode: Cell::new(true),
647666
buffer: RefCell::new(VecDeque::new()),
667+
finished: Cell::new(false),
648668
}
649669
}
650670
}
@@ -745,6 +765,7 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
745765
shared,
746766
mpmc_mode: Cell::new(false),
747767
buffer: RefCell::new(VecDeque::new()),
768+
finished: Cell::new(false),
748769
},
749770
)
750771
}
@@ -779,6 +800,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
779800
shared,
780801
mpmc_mode: Cell::new(false),
781802
buffer: RefCell::new(VecDeque::new()),
803+
finished: Cell::new(false),
782804
},
783805
)
784806
}

0 commit comments

Comments
 (0)