Skip to content

Commit 4ce2092

Browse files
committed
Merge branch 'master' into mpmc
# Conflicts: # benches/basic.rs # src/lib.rs
2 parents ec26e24 + 02598fe commit 4ce2092

File tree

5 files changed

+170
-74
lines changed

5 files changed

+170
-74
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "flume"
3-
version = "0.7.0"
3+
version = "0.7.1"
44
authors = ["Joshua Barretto <[email protected]>"]
55
edition = "2018"
66
description = "A blazingly fast multi-producer channel"
@@ -21,6 +21,7 @@ futures = { version = "^0.3", default-features = false, optional = true }
2121

2222
[dev-dependencies]
2323
crossbeam-channel = "0.4"
24+
crossbeam-utils = "0.7"
2425
criterion = "0.3.1"
2526
async-std = { version = "1.5", features = ["attributes"] }
2627

benches/basic.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
fmt::Debug,
88
};
99
use criterion::{Criterion, Bencher, black_box};
10+
use std::time::Instant;
1011

1112
trait Sender: Clone + Send + Sized + 'static {
1213
type Item: Debug + Default;
@@ -264,6 +265,70 @@ fn test_robin_b<S: Sender>(b: &mut Bencher, thread_num: usize, msg_num: usize) {
264265
});
265266
}
266267

268+
fn test_mpsc_bounded_no_wait<S: Sender>(b: &mut Bencher, thread_num: u64) {
269+
b.iter_custom(|iters| {
270+
let iters = iters * 1000;
271+
let (mut tx, mut rx) = S::bounded(iters as usize);
272+
let start = Instant::now();
273+
274+
crossbeam_utils::thread::scope(|scope| {
275+
for _ in 0..thread_num {
276+
let tx = tx.clone();
277+
scope.spawn(move |_| {
278+
for i in 0..iters / thread_num {
279+
tx.send(Default::default());
280+
}
281+
});
282+
}
283+
284+
for i in 0..iters - ((iters / thread_num) * thread_num) {
285+
tx.send(Default::default());
286+
}
287+
288+
for _ in 0..iters {
289+
black_box(rx.recv());
290+
}
291+
})
292+
.unwrap();
293+
294+
start.elapsed()
295+
})
296+
}
297+
298+
fn test_mpsc_bounded<S: Sender>(b: &mut Bencher, bound: usize, thread_num: usize) {
299+
b.iter_custom(|iters| {
300+
let (mut tx, mut rx) = S::bounded(bound);
301+
let start = Instant::now();
302+
303+
crossbeam_utils::thread::scope(|scope| {
304+
let msgs = iters as usize * bound;
305+
306+
for _ in 0..thread_num {
307+
let tx = tx.clone();
308+
scope.spawn(move |_| {
309+
for _ in 0..msgs / thread_num as usize {
310+
tx.send(Default::default());
311+
}
312+
});
313+
}
314+
315+
scope.spawn(move |_| {
316+
// Remainder
317+
for _ in 0..msgs - (msgs / thread_num as usize * thread_num) {
318+
tx.send(Default::default());
319+
}
320+
});
321+
322+
for _ in 0..msgs {
323+
black_box(rx.recv());
324+
}
325+
})
326+
.unwrap();
327+
328+
start.elapsed()
329+
})
330+
}
331+
267332
fn create(b: &mut Criterion) {
268333
b.bench_function("create-flume", |b| test_create::<flume::Sender<u32>>(b));
269334
b.bench_function("create-crossbeam", |b| test_create::<crossbeam_channel::Sender<u32>>(b));
@@ -351,11 +416,30 @@ fn robin_b_4t_1000m(b: &mut Criterion) {
351416
b.bench_function("robin-b-4t-1000m-std", |b| test_robin_b::<mpsc::Sender<u32>>(b, 4, 1000));
352417
}
353418

419+
fn mpsc_bounded_no_wait_4t(b: &mut Criterion) {
420+
b.bench_function("mpsc-bounded-no-wait-4t-flume", |b| test_mpsc_bounded_no_wait::<flume::Sender<u32>>(b, 4));
421+
b.bench_function("mpsc-bounded-no-wait-4t-crossbeam", |b| test_mpsc_bounded_no_wait::<crossbeam_channel::Sender<u32>>(b, 4));
422+
b.bench_function("mpsc-bounded-no-wait-4t-std", |b| test_mpsc_bounded_no_wait::<mpsc::Sender<u32>>(b, 4));
423+
}
424+
425+
fn mpsc_bounded_4t(b: &mut Criterion) {
426+
for bound in &[1, 10, 50, 10_000] {
427+
let text = format!("mpsc-bounded-small-4t-{}m-", bound);
428+
let bound = *bound;
429+
430+
b.bench_function(&format!("{}{}", text, "flume"), |b| test_mpsc_bounded::<flume::Sender<u32>>(b, bound, 4));
431+
b.bench_function(&format!("{}{}", text, "crossbeam"), |b| test_mpsc_bounded::<crossbeam_channel::Sender<u32>>(b, bound, 4));
432+
b.bench_function(&format!("{}{}", text, "std"), |b| test_mpsc_bounded::<mpsc::Sender<u32>>(b, bound, 4));
433+
}
434+
}
435+
354436
criterion_group!(
355437
compare,
356438
create,
357439
oneshot,
358440
inout,
441+
mpsc_bounded_no_wait_4t,
442+
mpsc_bounded_4t,
359443
hydra_1t_1000m,
360444
hydra_32t_1m,
361445
hydra_32t_1000m,

0 commit comments

Comments
 (0)