Skip to content

Commit 9f579e8

Browse files
committed
Fixing issues, added error messages
1 parent 5f5c774 commit 9f579e8

32 files changed

+11369
-192
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "flume2" # !!!!!!!!!!!!!!!!!!!!!! CHANGE NAME !!!!!!!!!!!!!!!!!!!!!!
2+
name = "flume"
33
version = "0.10.1"
44
authors = ["Joshua Barretto <[email protected]>"]
55
edition = "2018"
@@ -30,13 +30,14 @@ time = ["sync", "futures-timer", "futures-util"]
3030
select = ["std", "futures-util/alloc"]
3131

3232
[dependencies]
33-
spin = "0.7"
33+
spin = { version = "0.7", default-features = false }
3434
pin-project-lite = "0.2"
3535
futures-core = "0.3"
3636
futures-sink = { version = "0.3", optional = true }
3737
futures-util = { version = "0.3", optional = true }
3838
futures-timer = { version = "3.0", optional = true }
3939
pollster = { version = "0.2", optional = true }
40+
#futures-executor = "0.3"
4041

4142
[package.metadata.docs.rs]
4243
all-features = true

examples/select.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#[cfg(feature = "select")]
22
fn main() {
3-
use flume2::Selector;
3+
use flume::{Selector, unbounded};
44
use rand::prelude::*;
55

66
// Create two channels
7-
let (red_tx, red_rx) = flume2::unbounded();
8-
let (blue_tx, blue_rx) = flume2::unbounded();
7+
let (red_tx, red_rx) = unbounded();
8+
let (blue_tx, blue_rx) = unbounded();
99

1010
// To make it fair, randomise the start order
1111
let mut racers = vec![("Red", red_tx), ("Blue", blue_tx)];

src/bounded.rs

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::*;
22

33
pub(crate) struct Bounded<T> {
4+
disconnected: bool,
45
cap: usize,
56
sends: VecDeque<(Waker, Booth<T>)>,
67
queue: VecDeque<T>,
@@ -9,51 +10,61 @@ pub(crate) struct Bounded<T> {
910

1011
impl<T> Bounded<T> {
1112
pub fn new(cap: usize) -> Flavor<T> {
12-
Flavor::Bounded(Lock::new(Self {
13+
Flavor::Bounded(Self {
14+
disconnected: false,
1315
cap,
1416
sends: VecDeque::new(),
1517
queue: VecDeque::new(),
1618
recvs: VecDeque::new(),
17-
}))
19+
})
1820
}
1921

2022
pub fn disconnect(&mut self) {
23+
self.disconnected = true;
2124
self.sends.drain(..).for_each(|(w, _)| w.wake());
2225
self.recvs.drain(..).for_each(|(w, _)| w.wake());
2326
}
2427

25-
pub fn try_send(&mut self, item: T) -> Result<Option<Waker>, T> {
26-
match self.recvs.pop_front() {
27-
Some((waker, booth)) => {
28-
booth.give(item);
29-
Ok(Some(waker))
30-
},
31-
None => if self.queue.len() < self.cap {
32-
self.queue.push_back(item);
33-
Ok(None)
34-
} else {
35-
Err(item)
36-
},
28+
pub fn try_send(&mut self, mut item: T) -> Result<Option<Waker>, (T, bool)> {
29+
if self.disconnected {
30+
return Err((item, true));
31+
}
32+
33+
loop {
34+
item = match self.recvs.pop_front() {
35+
Some((waker, booth)) => match booth.give(item) {
36+
Some(item) => item,
37+
None => break Ok(Some(waker)), // Woke receiver
38+
},
39+
None => break if self.queue.len() < self.cap {
40+
self.queue.push_back(item);
41+
Ok(None) // Pushed to queue
42+
} else {
43+
Err((item, false)) // Queue is full
44+
},
45+
};
3746
}
3847
}
3948

40-
pub fn try_recv(&mut self) -> Option<(Option<Waker>, T)> {
49+
pub fn try_recv(&mut self) -> Result<(Option<Waker>, T), bool> {
4150
self.queue
4251
.pop_front()
4352
.map(|item| {
4453
// We just made some space in the queue so we need to pull the next waiting sender too
45-
loop {
54+
let w = loop {
4655
if let Some((waker, item)) = self.sends.pop_front() {
4756
// Attempt to take the item in the slot. If the item does not exist, it must be because the sender
4857
// cancelled sending (perhaps due to a timeout). In such a case, don't bother to wake the sender (because
4958
// it should already have been woken by the timeout alarm) and skip to the next entry.
5059
if let Some(item) = item.try_take() {
51-
break (Some(waker), item);
60+
self.queue.push_back(item);
61+
break Some(waker);
5262
}
5363
} else {
54-
break (None, item);
64+
break None;
5565
}
56-
}
66+
};
67+
(w, item)
5768
})
5869
.or_else(|| {
5970
while let Some((waker, item)) = self.sends.pop_front() {
@@ -66,23 +77,28 @@ impl<T> Bounded<T> {
6677
}
6778
None
6879
})
80+
.ok_or(self.disconnected)
6981
}
7082

71-
pub fn send(&mut self, item: T, waker: impl FnOnce() -> Waker) -> Result<Option<Waker>, Booth<T>> {
83+
pub fn send(&mut self, item: T, waker: impl FnOnce() -> Waker) -> Result<Option<Waker>, Result<Booth<T>, T>> {
7284
self.try_send(item)
73-
.map_err(|item| {
85+
.map_err(|(item, d)| if d {
86+
Err(item)
87+
} else {
7488
let item = Booth::full(item);
7589
self.sends.push_back((waker(), item.clone()));
76-
item
90+
Ok(item)
7791
})
7892
}
7993

80-
pub fn recv(&mut self, waker: impl FnOnce() -> Waker) -> Result<(Option<Waker>, T), Booth<T>> {
94+
pub fn recv(&mut self, waker: impl FnOnce() -> Waker) -> Result<(Option<Waker>, T), Option<Booth<T>>> {
8195
self.try_recv()
82-
.ok_or_else(|| {
96+
.map_err(|d| if d {
97+
None
98+
} else {
8399
let item = Booth::empty();
84100
self.recvs.push_back((waker(), item.clone()));
85-
item
101+
Some(item)
86102
})
87103
}
88104

0 commit comments

Comments
 (0)