Skip to content

Commit 619e25d

Browse files
authored
Merge pull request #11 from Restioson/async_stream
Add stream impl to receiver
2 parents 55a9b77 + 4af72a2 commit 619e25d

File tree

3 files changed

+91
-37
lines changed

3 files changed

+91
-37
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ readme = "README.md"
1212

1313
[features]
1414
select = []
15-
async = []
15+
async = ["futures"]
1616
default = ["select", "async"]
1717

1818
[dependencies]
1919
spin = "0.5"
20+
futures = { version = "^0.3", default-features = false, optional = true }
2021

2122
[dev-dependencies]
2223
crossbeam-channel = "0.4"

src/async.rs

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,33 @@ use std::{
66
task::{Context, Poll},
77
};
88
use crate::*;
9+
use futures::{Stream, stream::FusedStream, future::FusedFuture};
910

10-
/// A future used to receive a value from the channel.
11-
pub struct RecvFuture<'a, T> {
12-
recv: &'a mut Receiver<T>,
13-
}
14-
15-
impl<'a, T> RecvFuture<'a, T> {
16-
pub(crate) fn new(recv: &mut Receiver<T>) -> RecvFuture<T> {
17-
RecvFuture { recv }
18-
}
19-
}
20-
21-
impl<'a, T> Future for RecvFuture<'a, T> {
22-
type Output = Result<T, RecvError>;
23-
24-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
25-
// On success, set the waker to none to avoid it being woken again in case that is wrong
26-
// TODO: `poll_recv` instead to prevent even spinning?
27-
let mut buf = self.recv.buffer.borrow_mut();
11+
impl<T> Receiver<T> {
12+
#[inline]
13+
fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
14+
let mut buf = self.buffer.borrow_mut();
2815

2916
let res = if let Some(msg) = buf.pop_front() {
3017
return Poll::Ready(Ok(msg));
3118
} else {
3219
self
33-
.recv
3420
.shared
3521
.poll_inner()
3622
.map(|mut inner| self
37-
.recv
3823
.shared
39-
.try_recv(move || {
40-
// Detach the waker
41-
inner.recv_waker = None;
42-
// Inform the sender that we no longer need waking
43-
inner.listen_mode = 1;
44-
inner
45-
}, &mut buf))
24+
.try_recv(
25+
move || {
26+
// Detach the waker
27+
inner.recv_waker = None;
28+
// Inform the sender that we no longer need waking
29+
inner.listen_mode = 1;
30+
inner
31+
},
32+
&mut buf,
33+
&self.finished,
34+
)
35+
)
4636
};
4737

4838
let poll = match res {
@@ -64,3 +54,46 @@ impl<'a, T> Future for RecvFuture<'a, T> {
6454
poll
6555
}
6656
}
57+
58+
/// A future used to receive a value from the channel.
59+
pub struct RecvFuture<'a, T> {
60+
recv: &'a mut Receiver<T>,
61+
}
62+
63+
impl<'a, T> RecvFuture<'a, T> {
64+
pub(crate) fn new(recv: &mut Receiver<T>) -> RecvFuture<T> {
65+
RecvFuture { recv }
66+
}
67+
}
68+
69+
impl<'a, T> Future for RecvFuture<'a, T> {
70+
type Output = Result<T, RecvError>;
71+
72+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
73+
self.recv.poll(cx)
74+
}
75+
}
76+
77+
impl<'a, T> FusedFuture for RecvFuture<'a, T> {
78+
fn is_terminated(&self) -> bool {
79+
self.recv.finished.get()
80+
}
81+
}
82+
83+
impl<T> Stream for Receiver<T> {
84+
type Item = T;
85+
86+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87+
self.poll(cx).map(|ready| ready.ok())
88+
}
89+
90+
fn size_hint(&self) -> (usize, Option<usize>) {
91+
(self.buffer.borrow().len(), None)
92+
}
93+
}
94+
95+
impl<T> FusedStream for Receiver<T> {
96+
fn is_terminated(&self) -> bool {
97+
self.finished.get()
98+
}
99+
}

src/lib.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use std::task::Waker;
4141
use crate::select::Token;
4242
#[cfg(feature = "async")]
4343
use crate::r#async::RecvFuture;
44+
use std::cell::Cell;
4445

4546
/// An error that may be emitted when attempting to send a value into a channel on a sender.
4647
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -260,6 +261,7 @@ impl<T> Shared<T> {
260261
}
261262

262263
#[inline]
264+
#[cfg(feature = "async")]
263265
fn poll_inner(&self) -> Option<MutexGuard<'_, Inner<T>>> {
264266
#[cfg(windows)] { self.inner.try_lock().ok() }
265267
#[cfg(not(windows))] { self.inner.try_lock() }
@@ -363,6 +365,7 @@ impl<T> Shared<T> {
363365
&'a self,
364366
take_inner: impl FnOnce() -> MutexGuard<'a, Inner<T>>,
365367
buf: &mut VecDeque<T>,
368+
finished: &Cell<bool>,
366369
) -> Result<T, (MutexGuard<Inner<T>>, TryRecvError)> {
367370
// Eagerly check the buffer
368371
if let Some(msg) = buf.pop_front() {
@@ -380,8 +383,10 @@ impl<T> Shared<T> {
380383
msg
381384
},
382385
// If there's nothing more in the queue, this might be because there are no senders
383-
None if inner.sender_count == 0 =>
384-
return Err((inner, TryRecvError::Disconnected)),
386+
None if inner.sender_count == 0 => {
387+
finished.set(true);
388+
return Err((inner, TryRecvError::Disconnected));
389+
},
385390
None => return Err((inner, TryRecvError::Empty)),
386391
};
387392

@@ -412,12 +417,13 @@ impl<T> Shared<T> {
412417
fn recv(
413418
&self,
414419
buf: &mut VecDeque<T>,
420+
finished: &Cell<bool>,
415421
) -> Result<T, RecvError> {
416422
loop {
417423
// Attempt to receive a message
418424
let mut i = 0;
419425
let inner = loop {
420-
match self.try_recv(|| self.wait_inner(), buf) {
426+
match self.try_recv(|| self.wait_inner(), buf, finished) {
421427
Ok(msg) => return Ok(msg),
422428
Err((_, TryRecvError::Disconnected)) => return Err(RecvError::Disconnected),
423429
Err((inner, TryRecvError::Empty)) if i == 3 => break inner,
@@ -438,9 +444,10 @@ impl<T> Shared<T> {
438444
&self,
439445
deadline: Instant,
440446
buf: &mut VecDeque<T>,
447+
finished: &Cell<bool>,
441448
) -> Result<T, RecvTimeoutError> {
442449
// Attempt a speculative recv. If we are lucky there might be a message in the queue!
443-
let mut inner = match self.try_recv(|| self.wait_inner(), buf) {
450+
let mut inner = match self.try_recv(|| self.wait_inner(), buf, finished) {
444451
Ok(msg) => return Ok(msg),
445452
Err((_, TryRecvError::Disconnected)) => return Err(RecvTimeoutError::Disconnected),
446453
Err((inner, TryRecvError::Empty)) => inner,
@@ -466,7 +473,7 @@ impl<T> Shared<T> {
466473
}
467474

468475
// Attempt to receive a message from the queue
469-
inner = match self.try_recv(|| self.wait_inner(), buf) {
476+
inner = match self.try_recv(|| self.wait_inner(), buf, finished) {
470477
Ok(msg) => return Ok(msg),
471478
Err((inner, TryRecvError::Empty)) => inner,
472479
Err((_, TryRecvError::Disconnected)) => return Err(RecvTimeoutError::Disconnected),
@@ -555,28 +562,35 @@ pub struct Receiver<T> {
555562
/// Used to prevent Sync being implemented for this type - we never actually use it!
556563
/// TODO: impl<T> !Sync for Receiver<T> {} when negative traits are stable
557564
_phantom_cell: UnsafeCell<()>,
565+
/// Whether all receivers have disconnected and there are no messages in any buffer
566+
finished: Cell<bool>,
558567
}
559568

560569
impl<T> Receiver<T> {
561570
/// Wait for an incoming value from the channel associated with this receiver, returning an
562571
/// error if all channel senders have been dropped.
563572
pub fn recv(&self) -> Result<T, RecvError> {
564-
self.shared.recv(&mut self.buffer.borrow_mut())
573+
self.shared.recv(&mut self.buffer.borrow_mut(), &self.finished)
565574
}
566575

567576
/// Wait for an incoming value from the channel associated with this receiver, returning an
568577
/// error if all channel senders have been dropped or the timeout has expired.
569578
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
570579
self.shared.recv_deadline(
571580
Instant::now().checked_add(timeout).unwrap(),
572-
&mut self.buffer.borrow_mut()
581+
&mut self.buffer.borrow_mut(),
582+
&self.finished
573583
)
574584
}
575585

576586
/// Wait for an incoming value from the channel associated with this receiver, returning an
577587
/// error if all channel senders have been dropped or the deadline has passed.
578588
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
579-
self.shared.recv_deadline(deadline, &mut self.buffer.borrow_mut())
589+
self.shared.recv_deadline(
590+
deadline,
591+
&mut self.buffer.borrow_mut(),
592+
&self.finished
593+
)
580594
}
581595

582596
// Takes `&mut self` to avoid >1 task waiting on this channel
@@ -593,7 +607,11 @@ impl<T> Receiver<T> {
593607
pub fn try_recv(&self) -> Result<T, TryRecvError> {
594608
self
595609
.shared
596-
.try_recv(|| self.shared.wait_inner(), &mut self.buffer.borrow_mut())
610+
.try_recv(
611+
|| self.shared.wait_inner(),
612+
&mut self.buffer.borrow_mut(),
613+
&self.finished
614+
)
597615
.map_err(|(_, err)| err)
598616
}
599617

@@ -716,6 +734,7 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
716734
Receiver {
717735
shared,
718736
buffer: RefCell::new(VecDeque::new()),
737+
finished: Cell::new(false),
719738
_phantom_cell: UnsafeCell::new(())
720739
},
721740
)
@@ -750,6 +769,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
750769
Receiver {
751770
shared,
752771
buffer: RefCell::new(VecDeque::new()),
772+
finished: Cell::new(false),
753773
_phantom_cell: UnsafeCell::new(())
754774
},
755775
)

0 commit comments

Comments
 (0)