Skip to content

Commit acc7ade

Browse files
committed
Make remaining core channel spinlock opt-in
Signed-off-by: Johannes Löthberg <[email protected]>
1 parent 80d19c4 commit acc7ade

File tree

3 files changed

+52
-18
lines changed

3 files changed

+52
-18
lines changed

src/async.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
use crate::*;
1111
use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
1212
use futures_sink::Sink;
13+
use spin1::Mutex as Spinlock;
1314

1415
struct AsyncSignal {
1516
waker: Spinlock<Waker>,

src/lib.rs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use std::{
4747
fmt,
4848
};
4949

50+
#[cfg(feature = "spin")]
5051
use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
5152
use crate::signal::{Signal, SyncSignal};
5253

@@ -256,55 +257,86 @@ enum TryRecvTimeoutError {
256257
}
257258

258259
// TODO: Investigate some sort of invalidation flag for timeouts
260+
#[cfg(feature = "spin")]
259261
struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);
260262

263+
#[cfg(not(feature = "spin"))]
264+
struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);
265+
266+
#[cfg(feature = "spin")]
261267
impl<T, S: ?Sized + Signal> Hook<T, S> {
262-
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> where S: Sized {
268+
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
269+
where
270+
S: Sized,
271+
{
263272
Arc::new(Self(Some(Spinlock::new(msg)), signal))
264273
}
265274

266-
pub fn trigger(signal: S) -> Arc<Self> where S: Sized {
267-
Arc::new(Self(None, signal))
275+
fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
276+
self.0.as_ref().map(|s| s.lock())
268277
}
278+
}
269279

270-
pub fn signal(&self) -> &S {
271-
&self.1
280+
#[cfg(not(feature = "spin"))]
281+
impl<T, S: ?Sized + Signal> Hook<T, S> {
282+
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
283+
where
284+
S: Sized,
285+
{
286+
Arc::new(Self(Some(Mutex::new(msg)), signal))
272287
}
273288

274-
pub fn fire_nothing(&self) -> bool {
275-
self.signal().fire()
289+
fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
290+
self.0.as_ref().map(|s| s.lock().unwrap())
276291
}
292+
}
277293

294+
impl<T, S: ?Sized + Signal> Hook<T, S> {
278295
pub fn fire_recv(&self) -> (T, &S) {
279-
let msg = self.0.as_ref().unwrap().lock().take().unwrap();
296+
let msg = self.lock().unwrap().take().unwrap();
280297
(msg, self.signal())
281298
}
282299

283300
pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
284-
let ret = match &self.0 {
285-
Some(hook) => {
286-
*hook.lock() = Some(msg);
301+
let ret = match self.lock() {
302+
Some(mut lock) => {
303+
*lock = Some(msg);
287304
None
288-
},
305+
}
289306
None => Some(msg),
290307
};
291308
(ret, self.signal())
292309
}
293310

294311
pub fn is_empty(&self) -> bool {
295-
self.0.as_ref().map(|s| s.lock().is_none()).unwrap_or(true)
312+
self.lock().map(|s| s.is_none()).unwrap_or(true)
296313
}
297314

298315
pub fn try_take(&self) -> Option<T> {
299-
self.0.as_ref().and_then(|s| s.lock().take())
316+
self.lock().unwrap().take()
317+
}
318+
319+
pub fn trigger(signal: S) -> Arc<Self>
320+
where
321+
S: Sized,
322+
{
323+
Arc::new(Self(None, signal))
324+
}
325+
326+
pub fn signal(&self) -> &S {
327+
&self.1
328+
}
329+
330+
pub fn fire_nothing(&self) -> bool {
331+
self.signal().fire()
300332
}
301333
}
302334

303335
impl<T> Hook<T, SyncSignal> {
304336
pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
305337
loop {
306338
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
307-
let msg = self.0.as_ref().unwrap().lock().take();
339+
let msg = self.lock().unwrap().take();
308340
if let Some(msg) = msg {
309341
break Some(msg);
310342
} else if disconnected {
@@ -319,7 +351,7 @@ impl<T> Hook<T, SyncSignal> {
319351
pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
320352
loop {
321353
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
322-
let msg = self.0.as_ref().unwrap().lock().take();
354+
let msg = self.lock().unwrap().take();
323355
if let Some(msg) = msg {
324356
break Ok(msg);
325357
} else if disconnected {
@@ -335,7 +367,7 @@ impl<T> Hook<T, SyncSignal> {
335367
pub fn wait_send(&self, abort: &AtomicBool) {
336368
loop {
337369
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
338-
if disconnected || self.0.as_ref().unwrap().lock().is_none() {
370+
if disconnected || self.lock().unwrap().is_none() {
339371
break;
340372
}
341373

@@ -347,7 +379,7 @@ impl<T> Hook<T, SyncSignal> {
347379
pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
348380
loop {
349381
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
350-
if self.0.as_ref().unwrap().lock().is_none() {
382+
if self.lock().unwrap().is_none() {
351383
break Ok(());
352384
} else if disconnected {
353385
break Err(false);

src/select.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface.
22
33
use crate::*;
4+
use spin1::Mutex as Spinlock;
45
use std::{any::Any, marker::PhantomData};
56

67
#[cfg(feature = "eventual-fairness")]

0 commit comments

Comments
 (0)