-
Notifications
You must be signed in to change notification settings - Fork 270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timeout support via Context #392
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! I think this is a good starting point to discuss how we want this feature to work, but I do have major concerns that make me think we shouldn't merge this:
- introducing the
async-std
dependency - the timeout mechanism is simpler than what has been described as the case for other SDKs. We might want to look more into those timeout mechanisms and decide what to do.
/// Pipeline execution context. | ||
#[derive(Clone, Debug, Default)] | ||
pub struct Context { | ||
_priv: (), | ||
timeout: Option<DateTime<Utc>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does not really resemble how timeouts work in other SDKs where timeouts form a tree and users or policies can add timeouts to the timeout tree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not clear what are we supposed to achieve 🤷 . As it is proposed here, every pipeline policy can change the timeout passed down the pipeline by simply passing a new context to the following policy.
Maybe I am still not grasping the entire concept of "timeout tree" since our pipeline is linear (there are no branches): either pass down to the next policy or not...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what @JeffreyRichter was referring to specifically in our last meeting, but timeouts in most other (all?) track 2 SDKs are built into the given context. For example, in .NET all methods take a CancellationToken
, which can be configured via a few mechanisms like a CancellationTokenSource
to timeout after a time. By default, we don't time out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a general runtime standard / common practice for either HTTP clients we can abstract, or futures we can use instead? Those same timeout mechanisms we use in other languages are also the same mechanisms for manually canceling, like hooking up to a Ctrl+C handler for console apps. How that is done may vary from language/runtime to another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please read this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been watching literally hours of videos on YouTube today about Rust async programming and I think I understand things much better now and I think some terms you've been using I have been misinterpreting.
In Rust (like in C#), when a developer implements an async method, the compiler turns that method into a state machine. The state machine yields at each ".await" point in the code. An async function returns a Future. The Future trait is described here. Yes, the Poll method continues the async method's state machine from the last yield point to the next yield point. If the state machine advances to the end, then Poll returns with the final result; else Poll returns Pending and when the async I/O that executed just before the yield point completes, the OS calls the executor's wake function to indicate that the state machine is ready to be advanced.
So, I think my first misunderstanding is that Poll isn't really polling; at least not the way I usually interpret this term. Because Poll advances the state machine, it does more than just polling. Plus, I now understand about wake() and how it is actually the thing that prevents the polling by notifying the executor when the I/O has completed.
The next confusion is about the term cancellation. I do NOT consider dropping a Future as cancelling the async function. My understanding (which may still be wrong but I don't think so) is that dropping the Future stops the async function's state machine from advancing. However, I think this would be a terrible thing to do and should NEVER be done. Here's an example as to why: The state machine opens a file, starts reading from the file, and then .awaits (yields). If the Future is now dropped, then the file will NEVER be closed. Or the state machine takes a lock and then .awaits; if the Future is dropped, then the lock is never released. Not letting a state machine run to completion can easily have disastrous effects and this is why C# doesn't even offer a way to do this!
To me, the term cancellation implies a cooperative effort between the cancelor and the cancelee. The cancelor tells the state machine code when to cancel and the state machine's code decides when and how to honor this; for example, after closing a file or after releasing a lock. Without cooperative cancellation, we have the disastrous effects. In addition, if the state machine is executing a long compute-bound loop, then you need cooperative cancellation here to tell the state machine to prematurely stop the loop. Again, this is easily accomplished with Context (by having the loop poll it) but is impossible to do if the state machine can only be stopped at the next yield point.
So, to enable cooperative cancellation, I'm back to passing the Context around to all the methods. The caller/cancelor passes it to the cancelee and the cancelee looks at it when it feels it can, do any proper cleanup (or prematurely terminate a loop) and then complete the state machine. Again, this is how C#, Go, and other languages handle cancellation, and it works extremely well.
I have tried to learn more about .until() but I am unable to. When I search for it, I get no hits whatsoever. This actually makes sense to me because I do not think it's a useful construct due to the lack of cooperative cancellation.
Also, we never discussed what the customer code looks like to determine if cancellation had occurred. So, if .until() actually does exist, how does customer know if timeout occurred or if the operation completed successfully?
If any of my understanding above is incorrect, please clear things up for me. I think it's critical that we all understand this and that we're all on the same page.
To further this conversation, I think we now need a full code example compiles & runs that exercises all this machinery and allows us to experiment with different scenarios to really understand the behavior. Specifically, we need a function that is compute-bound that can be cancelled, we need a function that is I/O bound that can be cancelled before or after the I/O (not during). we need one async function calling another and allowing the topmost function to cancel any of the state machines in the tree and percolate this up. We need customer code that calls of this so it can determine if the ALL the state machines (in the tree) ran to completion or if cancellation occurred so that customer code can take different actions (code paths) based on what happened. The customer does not have to know how much success actually occurred (which specific state machine got cancelled). We do have to ensure that any canceled state machine(s) do proper cleanup and do not leak any resources.
Can someone build this? I would do it but I don't know Rust well enough so it would take me much longer than any of you. After it's built, show it to me because I'm sure I'll have questions and additional experiments to try and then we can discuss it as a group so that we can make progress on this and move on to other things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next confusion is about the term cancellation. I do NOT consider dropping a Future as cancelling the async function. My understanding (which may still be wrong but I don't think so) is that dropping the Future stops the async function's state machine from advancing. However, I think this would be a terrible thing to do and should NEVER be done. Here's an example as to why: The state machine opens a file, starts reading from the file, and then .awaits (yields). If the Future is now dropped, then the file will NEVER be closed. Or the state machine takes a lock and then .awaits; if the Future is dropped, then the lock is never released. Not letting a state machine run to completion can easily have disastrous effects and this is why C# doesn't even offer a way to do this!
You touched a very controversial point in the Rust community. Let me try to explain why (for the sake of simplicity I'm going to simplify a bit).
In Rust, everything must have a owner. The owner is at any time always one. No more, no less. When an entity has no owner it gets destroyed (in Rust parlance it's "dropped").
This is tracked by the Rust via lifetimes. That is, the compiler always knows who is the owner of an entity, so it can call its destructor (drop) whenever necessary.
The drop (destruction) happens immediately. This is different from GC languages (such as C# or Go): their destructor will happen sometime in the future. If you need to control when the destructor gets called you have to explicitly say so (for example, C# has the using
statement and the Disposable
interface).
Of course Rust dropping an entity means its inner entities will remain without owner. That in turn cascades the drop to the inner entities. This ensures nothing gets left behind1.
This approach is pervasive in the whole Rust ecosystem. You mentioned locks: in Rust you do not explicitly "release" locks, you just make sure no one owns them and they get dropped (ie released) for you. Look at this example of a mutex: https://doc.rust-lang.org/std/sync/struct.Mutex.html#examples-6: the line:
*mutex.get_mut().unwrap() = 10;
- Locks the mutex (
get_mut().unwrap()
) - Assigns a value (
= 10
) - Releases the lock (this is not immediately evident).
The last part happens as soon as the assignment is over. That's why the example can lock the mutex immediately in the following line.
So, in theory, the problem you mentioned should not exists in Rust. As soon as the future lifetime ends (in other words nothing references it anymore) all its resources should get dropped (including sockets, file handles, locks and so on).
But - there's always a but - for threads and for extension Tokio futures, Rust allows "runaway" entities, in other words entities whose lifetime is not tied to a reference. These entities can proceed even if the code does not have a mean to stop them anymore2!
There was a discussion on this3 some time ago but (unfortunately IMHO) this is the outcome. @yoshuawuyts clever code wanted to rectify this.
Moral of the story: you are right, we should take care to cancel the hyper/reqwest outstanding HTTP request on drop. What we should decide if we want to adhere to the Rust's "cancel on drop" approach or not.
Can someone build this? I would do it but I don't know Rust well enough so it would take me much longer than any of you. After it's built, show it to me because I'm sure I'll have questions and additional experiments to try and then we can discuss it as a group so that we can make progress on this and move on to other things.
As an example, look at this code (play ground link):
use futures::future::{AbortHandle, Abortable};
use std::{future::Future, time::Duration};
async fn race_timeout<F>(duration: Duration, f: F) -> Result<(), ()>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let spawned = Abortable::new(tokio::spawn(f), abort_registration);
tokio::pin!(spawned);
tokio::select! {
_ = tokio::time::sleep(duration) => {
abort_handle.abort();
println!("timeout!");
Err(())
}
_ = spawned => {
println!("f done");
Ok(())
}
}
}
async fn other_func() {
// spinlock
loop {
println!("in other_func loop");
tokio::time::sleep(Duration::from_millis(500)).await;
//std::thread::sleep(Duration::from_millis(500)); // <-- try this one instead of the above!
}
}
async fn some_func() -> Result<(), ()> {
race_timeout(Duration::from_secs(2), other_func()).await?;
println!("some_func done!");
Ok(())
}
#[tokio::main]
async fn main() {
println!("Hello, world!");
some_func().await.unwrap();
}
- It uses explicit cancellation or the async loop will never end (even if the timeout happens). Is this similar to the Context you are referring to?
- If you replace the
tokio::time::sleep(Duration::from_millis(500)).await;
withstd::thread::sleep(Duration::from_millis(500));
the loop will never stop even if we have cancelled it properly 😢.
This PR has a naive (and probably buggy) implementation, please have a look at it (the select
part). My main purpose was to drive the conversation about the topic (on that I think I've succeeded 😄).
Footnotes
-
Unfortunately this approach does not work with circular references. You can read about it more here: https://doc.rust-lang.org/std/rc/struct.Weak.html. ↩
-
https://docs.rs/tokio/1.12.0/tokio/task/struct.JoinHandle.html ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I finally got around to reading all this. Great explanation, thank you. And thanks for the code too.
I tried the thread sleep and I see that the app never quits - I do consider this problematic. Also, if you're not using Tokio (another thread), then I suspect the loop never ends causing no more code to run at all.
I never see this line run: println!("some_func done!");
This implies to me that proper cleanup is not happening but maybe I'm still misunderstanding something?
I see why tokio::time::sleep is better than std::thread::sleep. But, if we had a compute-bound loop in the code somewhere (like serializing a bunch of entity objects to JSON to add them to an Azure Table), then it would be great if we had a way to prematurely cancel this loop. So, I still think something like Context is good here (plus Context does additional stuff).
I also tried passing 10 seconds to tokio::time::sleep hoping that it would cancel after 2 seconds but it did not. So, it seems to me that sleep is still "running". I think the same thing would happen in a network call didn't complete. I'm not sure how to deal with this even if we have Context unless there is a way to tell the HTTP stack to cancel after some period of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JeffreyRichter Using std::thread::sleep
in side of async code is not recommended and can break things. This is because it sleeps the entire OS thread which the async runtime might be running on preventing it from making any progress.
As for compute bound workloads, async runtimes typically provide a way to yield back to the runtime for cooperative tasks. Tokio, for example, has yield_now
.
I believe #457 should hopefully provide good examples of how cancellation should be handled. Perhaps we can use that as a starting part for future discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think yield_now makes me comfortable - thanks.
So now, if we're not using Context for cancellation, then its importance is greatly diminished but we still use it for the key/value thing - I wish we could come up with another way accomplish this and get Context out of the method signatures entirely. Or, if Rust had default args, then we'd probably use that but it doesn't.
I'm going to close this because we will use @yoshuawuyts's approach detailed here: https://hackmd.io/@yoshuawuyts/rkRFxhprY. |
Intro
This PR allows to specify a time-out to every pipeline operation. This is done using the
Context
structure in the core crate.The default behavior is to not have any timeout (as it was before). The SDK user can, however, specify a time-out (either a specific date time or a duration from now) that will be loosely enforced by the pipeline.
Specification details
stream
are caller's responsibility to time out.Implementation details
For this to work, the changes involve:
Context
now stores the optional time out date time.Timeout
even before the specified time to avoid needlessly waiting for an operation that would time out anyway.HTTPClient::execute_request2
(called by the transport policy) now accepts aContext
to inspect the optional time out. If present, they will wrap the http client future withasync_std::future::timeout
to ensure proper timeout signaling for the awaited network call. Since the return error can be a timeout a new simpleHttpExecutionError
has been defined.