-
Notifications
You must be signed in to change notification settings - Fork 654
Current thread executor #639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit extracts the scheduling logic from FuturesUnordered and moves it to a private `scheduler` module. This module will be used by the CurrentThread executor from the Tokio reform RFC.
The `CurrentThread` executor is an event loop that allows the user to spawn tasks that are guaranteed to run on the current thread. This is based on the Tokio reform RFC, but also includes some API tweaks.
e34ee3f
to
f7eeb61
Compare
src/executor/current_thread.rs
Outdated
_p: ::std::marker::PhantomData<Rc<()>>, | ||
} | ||
|
||
/// Executes dameonized tasks on the current thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and elsewhere it would be good to clarify what it means for a task to be a "daemon"-- i.e. it executes indefinitely, and it does not prevent functions like block_on_all from returning.
(nit: also, spelling)
src/executor/current_thread.rs
Outdated
} | ||
|
||
/// Execute the given future *synchronously* on the current thread, blocking | ||
/// until it (and all spawned tasks) completes and returning its result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*all non-daemon tasks.
src/executor/current_thread.rs
Outdated
/// Execute the given closure, then block until all spawned tasks complete. | ||
/// | ||
/// In more detail, this function will block until: | ||
/// - All spawned tasks are complete, or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*non-daemon
src/executor/current_thread.rs
Outdated
}) | ||
} | ||
|
||
/// Spawns a daemon, which does *not* block the pending `block_on_all` call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...which is a task that does not block....
|
||
pub struct Notify<'a, T: 'a, W: 'a>(&'a Arc<Node<T, W>>); | ||
|
||
/// Wakeup a sleeper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth clarifying what a "sleeper" is. My understanding is its something that is in charge of polling the scheduler (either a Task in the FuturesUnordered case or a thread in the current-thread executor).
use std::sync::{Arc, Weak}; | ||
use std::usize; | ||
|
||
/// A generic task-aware scheduler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a more in-depth comment? Something to the effect of: Scheduler is used to run many child tasks inside of another thing that can be woken up via a "Wakeup".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would definitely appreciate a patch that improves on the docs 👍
} | ||
|
||
unsafe fn hide_lt<T, W: Wakeup>(p: *mut ArcNode<T, W>) -> *mut UnsafeNotify { | ||
mem::transmute(p as *mut UnsafeNotify) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this transmute
call isn't actually doing anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hiding the lifetime :)
That said, I think that W
needs to be 'static
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR largely looks good, although there's at least one significant (but easy to fix) bug.
I was impressed with the way you got a modicum of safety checking around re-entrancy with the scheduler data structure; would definitely like a bit more docs laying out how this works, as it took me some time to glean it from the code.
src/scheduler.rs
Outdated
queued: AtomicBool, | ||
} | ||
|
||
pub enum Tick<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have a comment saying this is an enum for the return of tick
below.
src/scheduler.rs
Outdated
Inconsistent, | ||
} | ||
|
||
enum Dequeue<T, W> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar: short comment would be great.
@@ -0,0 +1,361 @@ | |||
//! Execute tasks on the current thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs here and elsewhere need significant expansion. We can do that in a follow-up PR if you prefer.
src/executor/current_thread.rs
Outdated
} | ||
|
||
let res = future.as_mut() | ||
.map(|f| f.poll_future_notify(thread_notify, 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This future appears to be polled outside of the scheduler being active, meaning that attempts to spawn etc. will panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would maybe be best to provide a helper to use her and below in poll_all
.
src/executor/current_thread.rs
Outdated
let mut result = None; | ||
let mut future = Some(executor::spawn(future)); | ||
|
||
while future.is_some() || self.non_daemons > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could rewrite this as a loop
and break at the end (removing the replicated condition).
src/executor/current_thread.rs
Outdated
#[derive(Debug)] | ||
struct TaskRunner { | ||
/// Number of non-daemon tasks being executed by this `TaskRunner`. | ||
non_daemons: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, this field is never actually updated. It also can't be accessed as part of CurrentRunner
, which is what you really need.
src/executor/current_thread.rs
Outdated
} | ||
|
||
/// Enter a new `TaskRunner` context | ||
fn enter<F, A>(f: F) -> Result<A::Item, A::Error> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would appreciate a bit more descriptive comments on this and the next couple top-level functions.
src/executor/current_thread.rs
Outdated
// TODO: This probably can be improved | ||
current.cancel.set(false); | ||
|
||
debug_assert!(current.scheduler.get().is_null()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert initially caught me by surprise, because I hadn't caught that the scheduler was only temporarily established in enter
. Can we make this flow more clear?
src/scheduler.rs
Outdated
/// This function should be called whenever the caller is notified via a | ||
/// wakeup. | ||
pub fn tick<F, R>(&mut self, mut f: F) -> Tick<R> | ||
where F: FnMut(&mut Self, &mut T) -> Async<R> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very clever to thread the &mut
issue by passing it back to the callback.
src/executor/current_thread.rs
Outdated
loop { | ||
let res = self.scheduler.tick(|scheduler, spawned, notify| { | ||
current.set_scheduler(scheduler, || { | ||
match spawned.inner.0.poll_future_notify(notify, 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love some commentary here about the fact that tick
gives us &mut
access back to the scheduler, which provides some amount of safety checking against re-entrancy issues.
@aturon thanks for the review. I’m currently out of town but will try to address shortly. Docs should definitely be improved before merging. I just wanted to get the code up ASAP for initial review. I should have included a doc checkbox in the original Pr. |
The current thread executor should be set to the thread-local variable before polling any futures.
Only pull in the files when the std feature is set.
444e29f
to
78e27af
Compare
I'm going to remove The original reason for Either way, it can be added back later. |
I don't love the name |
In general, we have "executor" and "execute", but |
If I saw |
@cramertj That is basically what it is. |
I have made a number of API changes in this PR compared to the RFC:
The PR just proposes a bucket of free functions. Instead, I put all of these functions on I assume that at some point, there will also be Naming polish I renamed **Merged Now there is only one blocking function. The original split makes more sense without the **Pass The RFC calls for passing I personally just picked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes (especially docs) largely look good!
I'm not sure about the "execute" vs "spawn" terminology. Spawn is more evocative of what's happening -- you're creating a new, independent task that will be concurrently executed, and that should be true of all executors. I'm not sure we should give much weight to the naming within the futures crate (and if anything that should change, IMO).
benches/current_thread_executor.rs
Outdated
|
||
#[bench] | ||
#[ignore] | ||
fn spawn_daisy(b: &mut Bencher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should finish or remove before landing.
src/executor/current_thread.rs
Outdated
@@ -1,4 +1,69 @@ | |||
//! Execute tasks on the current thread | |||
//! | |||
//! [`CurrentThread`] provides an executor that keeps spawned futures keeps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeated phrase "keeps spawned futures".
Maybe rephrase as: "is an executor that runs all spawned futures on a single thread".
src/executor/current_thread.rs
Outdated
//! [`block_with_init`]: struct.CurrentThread.html#method.block_with_init | ||
//! [`block_on_all`]: struct.CurrentThread.html#method.block_on_all | ||
//! [`CurrentThread::spawn`]: struct.CurrentThread.html#method.spawn | ||
//! [`CurrentThread::spawn_daemon`]: struct.CurrentThread.html#method.spawn_daemon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs look great 👍
src/executor/current_thread.rs
Outdated
@@ -14,13 +79,25 @@ use std::rc::Rc; | |||
use std::sync::Arc; | |||
|
|||
/// Executes tasks on the current thread. | |||
/// | |||
/// All tasks spawned using this executor will be executed on the current thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing given the inherent methods (which can spawn daemons). Maybe you want to say "when used as an Executor
?
Other than the spawn/execute issue, the other departures from the RFC are largely cosmetic and are fine by me. |
@aturon OK, this PR should be good to go (assuming CI passes). Please provide final PR review. |
The `CurrentThread` executor is an event loop that allows the user to spawn tasks that are guaranteed to run on the current thread. This is done by extracting the scheduling logic from FuturesUnordered and moving it to a `scheduler` module that can be used by both `FuturesUnordered` and `CurrentThread` executor. This is based on the Tokio reform RFC, but also includes some API tweaks.
Provides a current thread executor allowing users to spawn tasks that are guaranteed to run on the current thread.
This is based on the Tokio reform RFC but made some API tweaks.
Remaining