Skip to content

Current thread executor #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Nov 16, 2017
Merged

Current thread executor #639

merged 17 commits into from
Nov 16, 2017

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented Nov 10, 2017

Provides a current thread executor allowing users to spawn tasks that are guaranteed to run on the current thread.

This is based on the Tokio reform RFC but made some API tweaks.

Remaining

  • Get CI passing.
  • More tests.
    • Fairness
    • Cancellation
  • Benchmark.
  • Docs
  • API bikeshedding.

@carllerche carllerche requested a review from aturon November 10, 2017 21:08
@carllerche carllerche changed the base branch from master to tokio-reform November 10, 2017 21:08
This commit extracts the scheduling logic from FuturesUnordered and
moves it to a private `scheduler` module. This module will be used by
the CurrentThread executor from the Tokio reform RFC.
The `CurrentThread` executor is an event loop that allows the user to
spawn tasks that are guaranteed to run on the current thread.

This is based on the Tokio reform RFC, but also includes some API
tweaks.
@carllerche carllerche force-pushed the current-thread-executor branch from e34ee3f to f7eeb61 Compare November 10, 2017 21:18
_p: ::std::marker::PhantomData<Rc<()>>,
}

/// Executes dameonized tasks on the current thread.
Copy link
Member

@cramertj cramertj Nov 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere it would be good to clarify what it means for a task to be a "daemon"-- i.e. it executes indefinitely, and it does not prevent functions like block_on_all from returning.

(nit: also, spelling)

}

/// Execute the given future *synchronously* on the current thread, blocking
/// until it (and all spawned tasks) completes and returning its result.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*all non-daemon tasks.

/// Execute the given closure, then block until all spawned tasks complete.
///
/// In more detail, this function will block until:
/// - All spawned tasks are complete, or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*non-daemon

})
}

/// Spawns a daemon, which does *not* block the pending `block_on_all` call.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...which is a task that does not block....


pub struct Notify<'a, T: 'a, W: 'a>(&'a Arc<Node<T, W>>);

/// Wakeup a sleeper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth clarifying what a "sleeper" is. My understanding is its something that is in charge of polling the scheduler (either a Task in the FuturesUnordered case or a thread in the current-thread executor).

use std::sync::{Arc, Weak};
use std::usize;

/// A generic task-aware scheduler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a more in-depth comment? Something to the effect of: Scheduler is used to run many child tasks inside of another thing that can be woken up via a "Wakeup".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely appreciate a patch that improves on the docs 👍

}

unsafe fn hide_lt<T, W: Wakeup>(p: *mut ArcNode<T, W>) -> *mut UnsafeNotify {
mem::transmute(p as *mut UnsafeNotify)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this transmute call isn't actually doing anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hiding the lifetime :)

That said, I think that W needs to be 'static.

Copy link
Member

@aturon aturon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR largely looks good, although there's at least one significant (but easy to fix) bug.

I was impressed with the way you got a modicum of safety checking around re-entrancy with the scheduler data structure; would definitely like a bit more docs laying out how this works, as it took me some time to glean it from the code.

src/scheduler.rs Outdated
queued: AtomicBool,
}

pub enum Tick<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a comment saying this is an enum for the return of tick below.

src/scheduler.rs Outdated
Inconsistent,
}

enum Dequeue<T, W> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar: short comment would be great.

@@ -0,0 +1,361 @@
//! Execute tasks on the current thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs here and elsewhere need significant expansion. We can do that in a follow-up PR if you prefer.

}

let res = future.as_mut()
.map(|f| f.poll_future_notify(thread_notify, 0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This future appears to be polled outside of the scheduler being active, meaning that attempts to spawn etc. will panic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would maybe be best to provide a helper to use her and below in poll_all.

let mut result = None;
let mut future = Some(executor::spawn(future));

while future.is_some() || self.non_daemons > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could rewrite this as a loop and break at the end (removing the replicated condition).

#[derive(Debug)]
struct TaskRunner {
/// Number of non-daemon tasks being executed by this `TaskRunner`.
non_daemons: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this field is never actually updated. It also can't be accessed as part of CurrentRunner, which is what you really need.

}

/// Enter a new `TaskRunner` context
fn enter<F, A>(f: F) -> Result<A::Item, A::Error>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would appreciate a bit more descriptive comments on this and the next couple top-level functions.

// TODO: This probably can be improved
current.cancel.set(false);

debug_assert!(current.scheduler.get().is_null());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert initially caught me by surprise, because I hadn't caught that the scheduler was only temporarily established in enter. Can we make this flow more clear?

src/scheduler.rs Outdated
/// This function should be called whenever the caller is notified via a
/// wakeup.
pub fn tick<F, R>(&mut self, mut f: F) -> Tick<R>
where F: FnMut(&mut Self, &mut T) -> Async<R>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clever to thread the &mut issue by passing it back to the callback.

loop {
let res = self.scheduler.tick(|scheduler, spawned, notify| {
current.set_scheduler(scheduler, || {
match spawned.inner.0.poll_future_notify(notify, 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love some commentary here about the fact that tick gives us &mut access back to the scheduler, which provides some amount of safety checking against re-entrancy issues.

@carllerche
Copy link
Member Author

@aturon thanks for the review. I’m currently out of town but will try to address shortly. Docs should definitely be improved before merging. I just wanted to get the code up ASAP for initial review. I should have included a doc checkbox in the original Pr.

@carllerche carllerche force-pushed the current-thread-executor branch from 444e29f to 78e27af Compare November 15, 2017 21:31
@carllerche
Copy link
Member Author

I'm going to remove CurrentThread::block_on_all for now. It is pretty similar to block_with_init, so I think we should be conservative for now with the API.

The original reason for block_on_all was to drive a single future to completion, but that has been handled with future::blocking(fut).wait().

Either way, it can be added back later.

@carllerche
Copy link
Member Author

I don't love the name cancel_all_spawned as I think we have mixed up our terminology here.

@carllerche
Copy link
Member Author

In general, we have "executor" and "execute", but CurrentThread::spawn. We probably should converge to execute and move away from spawn

@cramertj
Copy link
Member

In general, we have "executor" and "execute", but CurrentThread::spawn. We probably should converge to execute and move away from spawn

If I saw execute i'd personally expect it to be a call to the method on the Executor trait.

@carllerche
Copy link
Member Author

@cramertj That is basically what it is.

@carllerche
Copy link
Member Author

I have made a number of API changes in this PR compared to the RFC:

CurrentThread instead of free functions

The PR just proposes a bucket of free functions. Instead, I put all of these functions on executor::CurrentThread. This type also is the implementor of the Executor trait.

I assume that at some point, there will also be executor::Pool as a facade over a customizable thread pool implementation.

Naming polish

I renamed spawn -> execute to bring it inline with the Executor trait. There are probably more tweaks besides this.

**Merged block_for_all and block_with_init.

Now there is only one blocking function. The original split makes more sense without the future::blocking helper.

**Pass &mut current_thread::Context to the block_with_init closure.

The RFC calls for passing &mut executor::Enter directly, however, this isn't very future proof. Instead, I define a new type that will eventually contain &mut executor::Enter. This way, if the current thread executor wants to eventually provide more context, it is able to do so without breaking changes.

I personally just picked Context without putting much thought into it, so if there are other ideas for naming, please post them.

Copy link
Member

@aturon aturon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes (especially docs) largely look good!

I'm not sure about the "execute" vs "spawn" terminology. Spawn is more evocative of what's happening -- you're creating a new, independent task that will be concurrently executed, and that should be true of all executors. I'm not sure we should give much weight to the naming within the futures crate (and if anything that should change, IMO).


#[bench]
#[ignore]
fn spawn_daisy(b: &mut Bencher) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should finish or remove before landing.

@@ -1,4 +1,69 @@
//! Execute tasks on the current thread
//!
//! [`CurrentThread`] provides an executor that keeps spawned futures keeps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated phrase "keeps spawned futures".

Maybe rephrase as: "is an executor that runs all spawned futures on a single thread".

//! [`block_with_init`]: struct.CurrentThread.html#method.block_with_init
//! [`block_on_all`]: struct.CurrentThread.html#method.block_on_all
//! [`CurrentThread::spawn`]: struct.CurrentThread.html#method.spawn
//! [`CurrentThread::spawn_daemon`]: struct.CurrentThread.html#method.spawn_daemon
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs look great 👍

@@ -14,13 +79,25 @@ use std::rc::Rc;
use std::sync::Arc;

/// Executes tasks on the current thread.
///
/// All tasks spawned using this executor will be executed on the current thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing given the inherent methods (which can spawn daemons). Maybe you want to say "when used as an Executor?

@aturon
Copy link
Member

aturon commented Nov 16, 2017

Other than the spawn/execute issue, the other departures from the RFC are largely cosmetic and are fine by me.

@carllerche carllerche mentioned this pull request Nov 16, 2017
3 tasks
@carllerche
Copy link
Member Author

@aturon I personally prefer the spawn terminology as well, however I think that having disjoint terminology is worse. Precise and consistent terminology is valuable. I made note of the naming issue in a tracking PR (#645).

@carllerche
Copy link
Member Author

@aturon OK, this PR should be good to go (assuming CI passes). Please provide final PR review.

@aturon aturon merged commit 1a2f97f into tokio-reform Nov 16, 2017
carllerche added a commit that referenced this pull request Nov 16, 2017
The `CurrentThread` executor is an event loop that allows the user to
spawn tasks that are guaranteed to run on the current thread.

This is done by extracting the scheduling logic from FuturesUnordered
and moving it to a `scheduler` module that can be used by both
`FuturesUnordered` and `CurrentThread` executor.

This is based on the Tokio reform RFC, but also includes some API
tweaks.
@cramertj cramertj mentioned this pull request Nov 16, 2017
@carllerche carllerche deleted the current-thread-executor branch November 29, 2017 21:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants