-
-
Notifications
You must be signed in to change notification settings - Fork 652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
upgrade to Tokio v1.x ecosystem #11475
Conversation
This compiles but has some tests failing in |
Prerequisite is a release of the |
use ouroboros::self_referencing; | ||
|
||
#[self_referencing] | ||
struct OwnedChild { | ||
child: Box<Child>, | ||
#[borrows(mut child)] | ||
#[not_covariant] | ||
exit_stream: BoxStream<'this, Result<ChildOutput, std::io::Error>>, | ||
} | ||
|
||
impl Stream for OwnedChild { | ||
type Item = Result<ChildOutput, std::io::Error>; | ||
|
||
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> Poll<Option<Self::Item>> { | ||
self.with_exit_stream_mut(|es| Pin::new(es).poll_next(cx)) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::process::Child
no longer implements Future
in Tokio 1.0. Instead, Child
exposes a wait
instance method. The problem is that wait
takes &mut self
and so the original version of exit_stream
thus mutably borrows self
and so cannot be returned from run_in_workdir
since the borrow checker rejects the code since Child
does not survive the call to run_in_workdir
.
The solution is to have Child
survive that call. This code uses the ouroboros
crate which has a proc macro that allows for a limited form of self-referential structs. The Child
struct is owned by this struct and this struct is what comprises part of the stream returned from run_in_workdir
. The exit_stream
part of the return value from run_in_workdir
is also embedded in this struct using the ouroboros
crate.
Finally, the solution implements Stream
on OwnedChild
so that it exposes the same stream as exit_stream
but as a first-class owner of the embedded Child
.
cc @stuhood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, afaik, the suggested way to do this is to call child.stdout.take()
(which is identical to how you do this with the stdlib
Child
type). See the stdout
/stderr
fields in the stdlib docs https://doc.rust-lang.org/std/process/struct.Child.html for more info. You can then call wait
safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is not with child.stdout
nor with child.stderr
. The stdout_stream
and stderr_stream
streams take ownership of those respective values and are not triggering the borrow checker. See the construction of stdout_stream
and stderr_stream
where the code already calls child.stdout.take().unwrap()
and child.stderr.take().unwrap()
respectively.
The issue is with exit_stream
. In tokio v0.2.x, Child
implements Future
and can be used directly to await the exit status of the child process. The borrow checker was not triggered in run_in_workdir
previously because exit_stream
took ownership of the underlying Future
a/k/a the Child
struct. Thus returning exit_stream
from run_in_workdir
implicitly passed ownership of Child
out of run_in_workdir
because exit_stream
owned it.
In tokio v1.x, that is not the case, Child
no longer implements Future
and one must call .wait
to obtain a Future that can be used to await the child's exit status. .wait
takes &mut self
and thus borrows Child
mutably within the call to run_in_workdir
. Unlike in tokio v0.2.x, the borrow checker triggers in this case because exit_stream
now only has the &mut self
reference to Child
(via .wait
) and not ownership of Child
like it did previously. Thus the borrow checker triggers because the Child
is dropped when run_in_workdir
returns.
The solution is to let Child
escape run_in_workdir
. However, the code should also keep the Future returned from .wait
alive (and its &mut self
reference) so that we only construct exit_stream
once. This is accomplished by using the ouroboros
crate to safely maintain the self-reference between Child
and exit_stream
.
And to make things easier, the OwnedChild
struct implements Stream so that the details are hidden of how exit_stream
and the Child
struct are stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Ok. I expect that moving forward we should move away from the "unified stream of stdout and exit code" interface here, as TODO'd in: https://github.com/pantsbuild/pants/pull/11370/files#r547623793. nails
did to ease lifetime issues, and it seemed to work fine. Happy to land this as is if we preserve the TODO though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine by me. I'll preserve the TODO.
// TODO(tokio-1.0): Figure out thread count configuration in Pants. | ||
pub fn new_owned(_core_threads: usize, num_threads: usize) -> Result<Executor, String> { | ||
let runtime = Builder::new_multi_thread() | ||
.worker_threads(num_threads) | ||
.max_blocking_threads(num_threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread configuration for the multi-thread runtime changed in v1.0. Tokio no longer dynamically scales the number of threads as it does in prior versions. Instead, it will just spin up worker_threads
threads and ensure that only max_blocking_threads
are used for blocking work. (At least that is my naive take on the matter.)
The questions are:
-
What API should we be exposing for PyExecutor and related types?
-
The default number of threads now is the number of available cores. Should we just trust the Tokio defaults now and not perform manual configuration in the general case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the discussion on tokio-rs/tokio#2802 this is now worker_threads
+ max_blocking_threads
? Commented to confirm though: tokio-rs/tokio#2802 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your understanding was confirmed: tokio-rs/tokio#2802 (comment)
We should probably change the API here to just pass-through the underlying concepts. Although we might want to make setting the number of threads optional so that we can allow Tokio to just use the number of cores.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly. I think that one issue with passing these concepts through directly is that we may shift "what" is blocking vs async over time, and so I don't know how we would explain what needs blocking threads.
A punt for now might be to set worker_threads
to our existing "core threads", and compute max_blocking_threads
from the difference between our core
and max
, with a minimum value of 1 blocking thread. Not ideal, but.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should we express the concept of letting Tokio choose the worker_threads
value while still retaining the ability to configure the number of blocking threads? Just do max-core
and only use that value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tokio no longer dynamically scales the number of threads as it does in prior versions.
To be clear, the only thing that has changed here is how you set the upper limit on blocking threads. Just like in the previous versions, the number of core threads is fixed, and the number of blocking threads is dynamicly scaled until it reaches the limit.
cc @stuhood @Eric-Arellano @illicitonion Low priority. The PR has some questions to be answered in the coming week, which will require some thought on our part, so not marking for review yet. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot!
use ouroboros::self_referencing; | ||
|
||
#[self_referencing] | ||
struct OwnedChild { | ||
child: Box<Child>, | ||
#[borrows(mut child)] | ||
#[not_covariant] | ||
exit_stream: BoxStream<'this, Result<ChildOutput, std::io::Error>>, | ||
} | ||
|
||
impl Stream for OwnedChild { | ||
type Item = Result<ChildOutput, std::io::Error>; | ||
|
||
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> Poll<Option<Self::Item>> { | ||
self.with_exit_stream_mut(|es| Pin::new(es).poll_next(cx)) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, afaik, the suggested way to do this is to call child.stdout.take()
(which is identical to how you do this with the stdlib
Child
type). See the stdout
/stderr
fields in the stdlib docs https://doc.rust-lang.org/std/process/struct.Child.html for more info. You can then call wait
safely.
// TODO(tokio-1.0): Figure out thread count configuration in Pants. | ||
pub fn new_owned(_core_threads: usize, num_threads: usize) -> Result<Executor, String> { | ||
let runtime = Builder::new_multi_thread() | ||
.worker_threads(num_threads) | ||
.max_blocking_threads(num_threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the discussion on tokio-rs/tokio#2802 this is now worker_threads
+ max_blocking_threads
? Commented to confirm though: tokio-rs/tokio#2802 (comment)
@stuhood: Regarding the use of I'm trying to see if there is a way to work around the lack of |
1be7ed1
to
2090210
Compare
Rebased and update PR for latest |
2090210
to
0a4b387
Compare
0a4b387
to
af8b85b
Compare
Rebased again and updated Tokio to be a minimum of v1.4 which includes the change to add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! One block_on
to restore, but pre-shipping for once that is fixed.
[ci skip-build-wheels]
[ci skip-build-wheels]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again!
[ci skip-build-wheels]
Problem
Stay current on Tokio / Tonic / Prost / Hyper ecosystem.
Solution
Upgrade to the Tokio v1.x ecosystem. A minimum of Tokio v1.4 is required in order to have
Handle::block_on
available for use by thetask_executor
crate.Result
Existing tests pass.