Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add the ability to spawn futures #679

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

cuviper
Copy link
Member

@cuviper cuviper commented Aug 2, 2019

For Rust 1.36+ with std::future::Future, add a way to spawn tasks with
a returned Future. The task is immediately queued for the thread pool
to execute.

JobResult::Ok(x) => Poll::Ready(x),
JobResult::Panic(p) => {
drop(guard); // don't poison the lock
unwind::resume_unwinding(p);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is usually how we propagate panics, but maybe the future should yield Result<T, ...> instead?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to let Rayon's panic_handler handle the actual panic, but also panic here with something like panic!("the spawned task has panicked") rather than resuming with the original one.

If one were to retrieve the result of a spawned task without using futures, they would probably create a channel and send the result through it. Then, if the task panics, the sender side of the channel gets dropped, thus disconnecting it. If one attempts to receive the result from the receiver side of the channel, the receiver.recv() call panics because the channel is disconnected.

So that way spawn_future would closely match the behavior of spawn + using a channel to retrieve the result.

Over the past few months I did a lot of exploration with panic propagation strategies in asynchronous contexts and talked to people about their use cases. In the end, I figured the generally best way of handling panics is to pass them to the panic handler and raise a new panic whenever the result of a failed task is polled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be OK with that too. I was thinking of Result as an analogy from std::thread::spawn + JoinHandle::join to rayon::spawn_future + Future::poll.

@cuviper
Copy link
Member Author

cuviper commented Aug 2, 2019

The CI failure is that my new inherent methods interrupted access to the extension trait methods in rayon-future's tests -- meh.

}
}

pub fn spawn_future<F, T>(func: F) -> impl Future<Output = T>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I expected spawn_future to take a future as argument, so that you would do something like:

let future = rayon::spawn_future(async move {
    ...
});

where the returned future could then be awaited from other futures. This would be somewhat analagous to the spawn function from async.rs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I was wondering is if async-task could be useful to us here -- I still haven't fully grokked that crate. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no expert here, but I think the difference is whether we want rayon to be a full executor, or just a new source of asynchronous events, and I was thinking more of the latter.

My intention was that you could still use tokio, async-std, or whatever with all of their abstractions working with file/network IO and such, and Rayon would just add something like an abstract CPU-IO. Other executors are usually latency-oriented, but Rayon is throughput-oriented with its greedy task stealing.

Anyway, I just tried async-task a bit, and it looks fine for the simple case:

pub fn spawn_future<F, T>(future: F) -> impl Future<Output = Option<T>>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let (task, handle) = async_task::spawn(future, |task| crate::spawn(|| task.run()), ());
    task.schedule();
    handle
}

It's OK for ThreadPool::spawn_future too, just tagged to a particular Registry. But we run into trouble with Scope, since async_task::spawn is all 'static. I don't think we can safely erase lifetimes here when we don't control the implementation behind it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can safely erase lifetimes here when we don't control the implementation behind it.

I think this was exactly the case where I introduced a bit of unsafety into the prior implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes, erasing lifetimes requires unsafe, but I meant that I'm not sure we can do that safely even in the "I know better than the compiler" sense. In the prior implementation, we kept complete control, which we wouldn't have under async-task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see what you meant by this:

I don't think we can safely erase lifetimes here when we don't control the implementation behind it.

I'm not sure I totally agree, though, but definitely we would want to be very explicitly about what we are assuming and to have async-task commit (in a semver sense) to preserving those invariants.

@cuviper
Copy link
Member Author

cuviper commented Aug 29, 2019

@alexcrichton, @fitzgen, maybe you have some perspective from the WASM side? Would either of these signatures be better or worse for integrating with wasm/js futures?

  • spawn_future(impl FnOnce) -> impl Future
  • spawn_future(impl Future) -> impl Future

I suspect the latter might be problematic, having to wait for unknown input futures from rayon threads, since we can't really unwind the entire thread out of wasm.

But is the simpler FnOnce->Future feasible in that environment? e.g. Will it work for the rayon thread to call a Waker that ultimately notifies the javascript stuff?

@fitzgen
Copy link

fitzgen commented Aug 30, 2019

I believe that should work and there aren't any technical restrictions on our side (although Alex knows more about our executor in a multithreaded context).

My one concern, unrelated to wasm, is using -> impl Future means that we can't put the result in a struct without boxing into a trait object. Is there a technical reason why it can't be a named type?

@nikomatsakis
Copy link
Member

@cuviper

Definitely rayon is "throughput optimized" and may not be the best choice to use for all your futures. But where I thought the impl Future -> impl Future signature could be useful is creating a DAG of computation nodes that are interdependent. Under this setup, one could e.g. have a matrix of tasks each task is able to .await other tasks. (Although I guess i'd be a bit of a pain to set it up -- interesting problem.)

In any case, I also don't think it causes a big problem if you block on I/O events in rayon. The future would simply not be in our thread-pools until the I/O event was "wakened". It's not like it would block in the normal sense. (You might not want to combine that with rayon::scope, I suppose.)

@cuviper
Copy link
Member Author

cuviper commented Aug 30, 2019

@fitzgen

My one concern, unrelated to wasm, is using -> impl Future means that we can't put the result in a struct without boxing into a trait object. Is there a technical reason why it can't be a named type?

Just that it's a smaller API surface to consider in this PR if we don't name it. But the type is simple enough that it shouldn't be a problem to return the concrete type.

@nikomatsakis

In any case, I also don't think it causes a big problem if you block on I/O events in rayon. The future would simply not be in our thread-pools until the I/O event was "wakened".

Yeah, so this is a case where I'm not certain how these things would work. I guess if the async work leading up to our part isn't ready, we'd just return NotReady too and let some outer executor deal with it? AIUI in async-task, we'd just end up queuing an underlying spawn for each step of the future's progress.

I brought up the WASM case because I thought there were some big caveats about notifying WASM threads about javascript futures. But maybe if the actual task suspension still looks like it happens outside of the threadpool, and we're respawning to get back in, it may work OK? Not sure.

We should probably play with some real examples before we commit to anything.

@alexcrichton
Copy link

As to the effect on wasm I think the main question here would be what extra APIs rayon needs from the host. For example in our demos we have a Pool structure which only supports a run method to run a closure on the pool, and then we use rayon's ThreadPoolBuilder to use that which allows us to schedule rayon's work on our own thread pool we manage.

If impl Future is the argument here then rayon would probably need to add another API along the lines of "please schedule this future work to happen on the rayon thread pool" or something like that? If not though then I don't think this really impacts wasm all that much! If you've got a demo of the API though I can poke around the code and see if anything looks like it'd break on wasm (or try to run the demo on wasm!)

@nikomatsakis
Copy link
Member

An update:

So we talked at some length to @alexcrichton and @fitzgen on Discord and came to the conclusion that "WASM interop is not a major issue here".

We left the meeting with the conclusion that I would spend a bit of time looking into what it would take to implement a spawn_future(impl Future) -> impl Future approach. I wanted to give a brief update on that.

(Side note: I know this will shock absolutely no one, but I think this is worth moving to an RFC. There are a number of smaller details concerning the APIs worth talking over and documenting. )

The set of APIs I think we want to support are something like this:

  • spawn a future into the global scope -- spawn_future(impl Future + 'static) -> impl Future
  • spawn a future into a Rayon scope -- spawn_future(impl Future + 'scope) -> impl Future

(And probably that impl Future in the return type is really going to be RayonFuture.)

Anyway, I dug into what an implementation would take. I think if we were to reimplement everything ourselves, the primary thing we need to create is a Waker. The idea would roughly be like this:

  • When you spawn a future, we create an Arc to represent that job
    • This Arc implements the Rayon Job trait, so it can enqueued in our thread-pools
    • This Arc also serves as the Waker
    • This Arc also serves as the resulting future
    • If launched inside of a Rayon scope, this Arc also starts out holding a ref-count on that scope, which prevents the scope from completing until the future has fully executed. This is what allows it to hide the 'scope lifetime.
  • The arc has an internal state machine that looks something like this:
    • Waiting(F) -- initial state. We have the future but are not enqueued in Rayon task pool.
    • Enqueued(F) -- enqueued in Rayon task pool.
    • Executing -- currently executing the future
    • PendingWake -- if a wake occurred while we were executing
    • Complete(F::Result) -- future completed
  • There are three ways to interact with this arc:
    • As a waker: when the "wake" method is called, we can be in any state. We transition as follows:
      • If Waiting, go to Enqueued state and enqueue ourselves into a Rayon thread-pool.
      • Otherwise, ignore.
    • As a Rayon job: when the rayon job executes, we must be in the enqueued state.
      • Move to Executing state and execute future.
      • If the result is "NotReady", move to the Waiting state, unless we were moved to PendingWake, in which case we can immediately re-execute (or re-enqueue ourselves)
      • If the result is Ready, move to the Complete state and store the result
    • As a future, we can be polled:
      • If not in the complete state, we return NotReady, and stash the waker
      • Else we return the result ("take"ing it)
      • Upon entering the complete state, we wake the waker (this actually occurs in the "job" code)

I started writing this code, but it turns out that the async-task crate basically implements all of this logic already, so it really makes sense I think to build on that.

The simplest integration (at the static level, and not "peak efficiency") just looks like this:

pub fn spawn_future<R>(future: impl Future<Output = R> + Send + 'static) -> impl Future<Output = R>
where
    R: Send + 'static,
{
    let (task, handle) = async_task::spawn(
        future,
        move |task| rayon_core::spawn(move || task.run()),
        (),
    );
    task.schedule();
    async move { handle.await.unwrap() }
}

I saw "not peak efficiency" because each call to rayon_core::spawn is going to create a new allocation, but the async_task::spawn already created one behind the scenes.

I discussed with @stjepang -- if async_task::spawn offered an API for converting from a async_task::Task to some pointer (analogous to Arc::into_raw), then we could do something like this:

  • convert the task to a raw pointer *const TaskRaw or whatever
  • implement rayon::Job for TaskRaw (Job is a private trait, so we can do this without the outside world knowing about it, but it does have to be in rayon-core presently)
  • enqueue the task directly onto the rayon thread-pool; when invoked, it'll use from_raw to convert the raw pointer back and then invoke run as above

To handle the 'scope API, we need to ensure that async-task meets some basic sanity requirements. But if we are doing this development within rayon-core, we don't need to expose any new APIs publicly. We would basically follow the approach I suggested above, I think -- we would grab a ref count and then augment the future we are spawning to decrement the ref count only when its completed its work. I didn't get so far as to prototype this, in part because I was doing this work in a separate crate. I realize now that I could prototype this better if I did the work directly in rayon-core (though we should discuss if this is a feature we would want in rayon-core, as it would up our minimum rust version -- perhaps gated by a cargo feature?).

@cuviper
Copy link
Member Author

cuviper commented Sep 5, 2019

(though we should discuss if this is a feature we would want in rayon-core, as it would up our minimum rust version -- perhaps gated by a cargo feature?)

I think it should be in core. It doesn't have to affect the general minimum rust if we gate the new functionality, whether by a cargo feature or just autocfg detection, as I did here for Future.

@jClaireCodesStuff
Copy link

I'm about halfway through a straightforward port of the existing rayon_futures to 0.3, without any thought into changing the API design beyond removing Future::wait.

The library builds and feels approximately correct but I haven't ported the tests yet, thus about halfway. I think the most interesting part of that work has been how much simpler ArcWake is than the previous Notify stuff.

I don't have too many opinions about what the API should be. I do think that dropping a Future should be considered a perfectly normal way to cancel it. That likely means we need to take a bit more care to propagate panics in a useful way.

@jannickj
Copy link

jannickj commented Dec 6, 2019

Any progress on this, for someone with a mix of CPU / IO bound work what's the recommended path?

@cuviper
Copy link
Member Author

cuviper commented Dec 14, 2019

I haven't have the time lately to work on the more advanced ideas proposed, nor do I really have any expertise in async code.

I still think the very simple idea in this PR, FnOnce to Future, would be useful for simple use cases though. @jannickj, could you try this branch in your code and see how that works for you?

@ethanpailes
Copy link

@cuviper I'm also interested in being able to ergonomically offload compute-bound work (or possibly work that just still has to be done with blocking IO) in an async context. Thanks for working on this!

My feeling is that the API that @nikomatsakis proposed is probably the right one because compute bound work might want to interleave some async IO operations and a future captures that sort of computation better than a closure. That said, for now a closure-based API is good enough for me.

To hold me over while you all take the time to design a nice interface, I just threw together https://github.com/ethanpailes/rayon-future. Maybe it will be useful to someone else in the same boat.

@cuviper
Copy link
Member Author

cuviper commented Jul 5, 2020

I noticed that the new gtk-rs release includes a threadpool with push and push_future methods. The latter is like what I proposed here, just mapping a closure to a future.

pub fn push_future<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
    &self, 
    func: F
) -> Result<impl Future<Output = T>, Error>

I still like the simplicity of not trying to await future input on our part. It's also possible for us to add that in the short term, and maybe add a separate method that maps futures in&out later.

@cuviper
Copy link
Member Author

cuviper commented Apr 7, 2023

Hmm, I think I expected spawn_future to take a future as argument,

Revisiting this -- maybe the answer is to give a different name to this fn(impl FnOnce) -> impl Future?

The tokio-rayon crate provides this with a top-level spawn function, but they also have an extension trait for ThreadPool where they call it spawn_async, which seems like a decent name to use in rayon-core too. Then we can leave the door open for a spawn_future that takes a future as input.

Add a way to spawn tasks with a returned `Future`. The task is
immediately queued for the thread pool to execute.
@james7132
Copy link

Gentle poke here, there's renewed interest on Bevy's side in (re-)adopting rayon for thread pool management and task execution, particularly due to its better thread sleep management than our current use of async_executor. One of the biggest reasons we initially moved away from rayon is because of the inability to execute both async and non-async tasks in the same thread pool to try to minimize OS context switch costs where possible.

Is there still interest in supporting this use case? If so, what's needed to advance this forward? There was mention of needing an RFC, but there doesn't seem to be a PR for one in the RFCs repo, open or closed.

Right now we can complete a migration to using rayon-core and just use @nikomatsakis's non-"peak efficiency" implementation mentioned above, but it'd be desirable to eliminate the extra allocation on re-scheduling the runnable.

@cuviper
Copy link
Member Author

cuviper commented Feb 21, 2024

I think it could be built on something like this, if you want to experiment with it:

impl JobRef {
    pub(super) fn new_async(runnable: async_task::Runnable<()>) -> JobRef {
        // TODO: call registry.increment_terminate_count()?
        JobRef {
            pointer: runnable.into_raw().as_ptr(),
            execute_fn: |this| unsafe {
                // TODO: handle panics with registry.catch_unwind(..)
                // TODO: should we always use NonNull for `Job`?
                // TODO: call registry.terminate()?
                let this = NonNull::new_unchecked(this as *mut ());
                async_task::Runnable::<()>::from_raw(this).run();
            },
        }
    }
}

(I don't think there's any value in literally implementing Job for this.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants