Skip to content

Commit 292caef

Browse files
committed
auto merge of rust-lang#15995 : Ryman/rust/sync_spsc_peek, r=alexcrichton
The current spsc implementation doesn't enforce single-producer single-consumer usage and also allows unsafe memory use through peek & pop. For safer usage, `new` now returns a pair of owned objects which only allow consumer or producer behaviors through an `Arc`. Through restricting the mutability of the receiver to `mut` the peek and pop behavior becomes safe again, with the compiler complaining about usage which could lead to problems. To fix code broken from this, update: Queue::new(x) -> unsafe { Queue::unchecked_new(x) } [breaking-change] For an example of broken behavior, check the added test which uses the unchecked constructor.
2 parents 55b2405 + 7b817b6 commit 292caef

File tree

2 files changed

+143
-36
lines changed

2 files changed

+143
-36
lines changed

src/libsync/comm/stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ enum Message<T> {
7474
impl<T: Send> Packet<T> {
7575
pub fn new() -> Packet<T> {
7676
Packet {
77-
queue: spsc::Queue::new(128),
77+
queue: unsafe { spsc::Queue::new(128) },
7878

7979
cnt: atomics::AtomicInt::new(0),
8080
steals: 0,

src/libsync/spsc_queue.rs

+142-35
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use core::prelude::*;
4040
use alloc::boxed::Box;
4141
use core::mem;
4242
use core::cell::UnsafeCell;
43+
use alloc::arc::Arc;
4344

4445
use atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
4546

@@ -73,6 +74,39 @@ pub struct Queue<T> {
7374
cache_subtractions: AtomicUint,
7475
}
7576

77+
/// A safe abstraction for the consumer in a single-producer single-consumer
78+
/// queue.
79+
pub struct Consumer<T> {
80+
inner: Arc<Queue<T>>
81+
}
82+
83+
impl<T: Send> Consumer<T> {
84+
/// Attempts to pop the value from the head of the queue, returning `None`
85+
/// if the queue is empty.
86+
pub fn pop(&mut self) -> Option<T> {
87+
self.inner.pop()
88+
}
89+
90+
/// Attempts to peek at the head of the queue, returning `None` if the queue
91+
/// is empty.
92+
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
93+
self.inner.peek()
94+
}
95+
}
96+
97+
/// A safe abstraction for the producer in a single-producer single-consumer
98+
/// queue.
99+
pub struct Producer<T> {
100+
inner: Arc<Queue<T>>
101+
}
102+
103+
impl<T: Send> Producer<T> {
104+
/// Pushes a new value onto the queue.
105+
pub fn push(&mut self, t: T) {
106+
self.inner.push(t)
107+
}
108+
}
109+
76110
impl<T: Send> Node<T> {
77111
fn new() -> *mut Node<T> {
78112
unsafe {
@@ -84,9 +118,37 @@ impl<T: Send> Node<T> {
84118
}
85119
}
86120

121+
/// Creates a new queue with a consumer-producer pair.
122+
///
123+
/// The producer returned is connected to the consumer to push all data to
124+
/// the consumer.
125+
///
126+
/// # Arguments
127+
///
128+
/// * `bound` - This queue implementation is implemented with a linked
129+
/// list, and this means that a push is always a malloc. In
130+
/// order to amortize this cost, an internal cache of nodes is
131+
/// maintained to prevent a malloc from always being
132+
/// necessary. This bound is the limit on the size of the
133+
/// cache (if desired). If the value is 0, then the cache has
134+
/// no bound. Otherwise, the cache will never grow larger than
135+
/// `bound` (although the queue itself could be much larger.
136+
pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
137+
let q = unsafe { Queue::new(bound) };
138+
let arc = Arc::new(q);
139+
let consumer = Consumer { inner: arc.clone() };
140+
let producer = Producer { inner: arc };
141+
142+
(consumer, producer)
143+
}
144+
87145
impl<T: Send> Queue<T> {
88-
/// Creates a new queue. The producer returned is connected to the consumer
89-
/// to push all data to the consumer.
146+
/// Creates a new queue.
147+
///
148+
/// This is unsafe as the type system doesn't enforce a single
149+
/// consumer-producer relationship. It also allows the consumer to `pop`
150+
/// items while there is a `peek` active due to all methods having a
151+
/// non-mutable receiver.
90152
///
91153
/// # Arguments
92154
///
@@ -98,10 +160,10 @@ impl<T: Send> Queue<T> {
98160
/// cache (if desired). If the value is 0, then the cache has
99161
/// no bound. Otherwise, the cache will never grow larger than
100162
/// `bound` (although the queue itself could be much larger.
101-
pub fn new(bound: uint) -> Queue<T> {
163+
pub unsafe fn new(bound: uint) -> Queue<T> {
102164
let n1 = Node::new();
103165
let n2 = Node::new();
104-
unsafe { (*n1).next.store(n2, Relaxed) }
166+
(*n1).next.store(n2, Relaxed);
105167
Queue {
106168
tail: UnsafeCell::new(n2),
107169
tail_prev: AtomicPtr::new(n1),
@@ -199,6 +261,11 @@ impl<T: Send> Queue<T> {
199261

200262
/// Attempts to peek at the head of the queue, returning `None` if the queue
201263
/// has no data currently
264+
///
265+
/// # Warning
266+
/// The reference returned is invalid if it is not used before the consumer
267+
/// pops the value off the queue. If the producer then pushes another value
268+
/// onto the queue, it will overwrite the value pointed to by the reference.
202269
pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
203270
// This is essentially the same as above with all the popping bits
204271
// stripped out.
@@ -229,46 +296,84 @@ impl<T: Send> Drop for Queue<T> {
229296
mod test {
230297
use std::prelude::*;
231298

232-
use alloc::arc::Arc;
233299
use native;
234300

235-
use super::Queue;
301+
use super::{queue, Queue};
236302

237303
#[test]
238304
fn smoke() {
239-
let q = Queue::new(0);
240-
q.push(1i);
241-
q.push(2);
242-
assert_eq!(q.pop(), Some(1));
243-
assert_eq!(q.pop(), Some(2));
244-
assert_eq!(q.pop(), None);
245-
q.push(3);
246-
q.push(4);
247-
assert_eq!(q.pop(), Some(3));
248-
assert_eq!(q.pop(), Some(4));
249-
assert_eq!(q.pop(), None);
305+
let (mut consumer, mut producer) = queue(0);
306+
producer.push(1i);
307+
producer.push(2);
308+
assert_eq!(consumer.pop(), Some(1i));
309+
assert_eq!(consumer.pop(), Some(2));
310+
assert_eq!(consumer.pop(), None);
311+
producer.push(3);
312+
producer.push(4);
313+
assert_eq!(consumer.pop(), Some(3));
314+
assert_eq!(consumer.pop(), Some(4));
315+
assert_eq!(consumer.pop(), None);
316+
}
317+
318+
// This behaviour is blocked by the type system if using the safe constructor
319+
#[test]
320+
fn pop_peeked_unchecked() {
321+
let q = unsafe { Queue::new(0) };
322+
q.push(vec![1i]);
323+
q.push(vec![2]);
324+
let peeked = q.peek().unwrap();
325+
326+
assert_eq!(*peeked, vec![1]);
327+
assert_eq!(q.pop(), Some(vec![1]));
328+
329+
assert_eq!(*peeked, vec![1]);
330+
q.push(vec![7]);
331+
332+
// Note: This should actually expect 1, but this test is to highlight
333+
// the unsafety allowed by the unchecked usage. A Rust user would not
334+
// expect their peeked value to mutate like this without the type system
335+
// complaining.
336+
assert_eq!(*peeked, vec![7]);
337+
}
338+
339+
#[test]
340+
fn peek() {
341+
let (mut consumer, mut producer) = queue(0);
342+
producer.push(vec![1i]);
343+
344+
// Ensure the borrowchecker works
345+
match consumer.peek() {
346+
Some(vec) => match vec.as_slice() {
347+
// Note that `pop` is not allowed here due to borrow
348+
[1] => {}
349+
_ => return
350+
},
351+
None => unreachable!()
352+
}
353+
354+
consumer.pop();
250355
}
251356

252357
#[test]
253358
fn drop_full() {
254-
let q = Queue::new(0);
255-
q.push(box 1i);
256-
q.push(box 2i);
359+
let (_, mut producer) = queue(0);
360+
producer.push(box 1i);
361+
producer.push(box 2i);
257362
}
258363

259364
#[test]
260365
fn smoke_bound() {
261-
let q = Queue::new(1);
262-
q.push(1i);
263-
q.push(2);
264-
assert_eq!(q.pop(), Some(1));
265-
assert_eq!(q.pop(), Some(2));
266-
assert_eq!(q.pop(), None);
267-
q.push(3);
268-
q.push(4);
269-
assert_eq!(q.pop(), Some(3));
270-
assert_eq!(q.pop(), Some(4));
271-
assert_eq!(q.pop(), None);
366+
let (mut consumer, mut producer) = queue(1);
367+
producer.push(1i);
368+
producer.push(2);
369+
assert_eq!(consumer.pop(), Some(1));
370+
assert_eq!(consumer.pop(), Some(2));
371+
assert_eq!(consumer.pop(), None);
372+
producer.push(3);
373+
producer.push(4);
374+
assert_eq!(consumer.pop(), Some(3));
375+
assert_eq!(consumer.pop(), Some(4));
376+
assert_eq!(consumer.pop(), None);
272377
}
273378

274379
#[test]
@@ -277,13 +382,15 @@ mod test {
277382
stress_bound(1);
278383

279384
fn stress_bound(bound: uint) {
280-
let a = Arc::new(Queue::new(bound));
281-
let b = a.clone();
385+
let (consumer, mut producer) = queue(bound);
386+
282387
let (tx, rx) = channel();
283388
native::task::spawn(proc() {
389+
// Move the consumer to a local mutable slot
390+
let mut consumer = consumer;
284391
for _ in range(0u, 100000) {
285392
loop {
286-
match b.pop() {
393+
match consumer.pop() {
287394
Some(1i) => break,
288395
Some(_) => fail!(),
289396
None => {}
@@ -293,7 +400,7 @@ mod test {
293400
tx.send(());
294401
});
295402
for _ in range(0i, 100000) {
296-
a.push(1);
403+
producer.push(1);
297404
}
298405
rx.recv();
299406
}

0 commit comments

Comments
 (0)