From 7dc3e43fe25ac7fa179c91620d1730b68d21e75b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 29 Jun 2023 19:05:21 +0000 Subject: [PATCH 1/2] rt: fix nesting block_in_place with block_on This patch fixes a bug where nesting `block_in_place` with a `block_on` between could lead to a panic. This happened because the nested `block_in_place` would try to acquire a core on return when it should not attempt to do so. The `block_on` between the two nested `block_in_place` altered the thread-local state to lead to the incorrect behavior. The fix is for each call to `block_in_place` to track if it needs to try to steal a core back. Fixes #5239 --- .../runtime/scheduler/multi_thread/worker.rs | 27 ++++++++++++++----- tokio/tests/rt_threaded.rs | 26 ++++++++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7fc335f5165..6ae11463373 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -323,26 +323,32 @@ where F: FnOnce() -> R, { // Try to steal the worker core back - struct Reset(coop::Budget); + struct Reset { + take_core: bool, + budget: coop::Budget, + } impl Drop for Reset { fn drop(&mut self) { with_current(|maybe_cx| { if let Some(cx) = maybe_cx { - let core = cx.worker.core.take(); - let mut cx_core = cx.core.borrow_mut(); - assert!(cx_core.is_none()); - *cx_core = core; + if self.take_core { + let core = cx.worker.core.take(); + let mut cx_core = cx.core.borrow_mut(); + assert!(cx_core.is_none()); + *cx_core = core; + } // Reset the task budget as we are re-entering the // runtime. - coop::set(self.0); + coop::set(self.budget); } }); } } let mut had_entered = false; + let mut take_core = false; let setup_result = with_current(|maybe_cx| { match ( @@ -394,6 +400,10 @@ where None => return Ok(()), }; + // We are taking the core from the context and sending it to another + // thread. + take_core = true; + // The parker should be set here assert!(core.park.is_some()); @@ -420,7 +430,10 @@ where if had_entered { // Unset the current task's budget. Blocking sections are not // constrained by task budgets. - let _reset = Reset(coop::stop()); + let _reset = Reset { + take_core, + budget: coop::stop(), + }; crate::runtime::context::exit_runtime(f) } else { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 6631768c35e..82bd413e544 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -588,6 +588,32 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } +// Repro for tokio-rs/tokio#5239 +#[test] +fn test_nested_block_in_place_with_block_on_between() { + let rt = runtime::Builder::new_multi_thread() + .worker_threads(1) + // Needs to be more than 0 + .max_blocking_threads(1) + .build() + .unwrap(); + + // Triggered by a race condition, so run a few times to make sure it is OK. + for _ in 0..100 { + let h = rt.handle().clone(); + + rt.block_on(async move { + tokio::spawn(async move { + tokio::task::block_in_place(|| { + h.block_on(async { + tokio::task::block_in_place(|| {}); + }); + }) + }).await.unwrap() + }); + } +} + // Testing the tuning logic is tricky as it is inherently timing based, and more // of a heuristic than an exact behavior. This test checks that the interval // changes over time based on load factors. There are no assertions, completion From 73136d72c3611971031528a6f854e01e51d11216 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 29 Jun 2023 19:09:21 +0000 Subject: [PATCH 2/2] fmt --- tokio/tests/rt_threaded.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 82bd413e544..69b186947bd 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -609,7 +609,9 @@ fn test_nested_block_in_place_with_block_on_between() { tokio::task::block_in_place(|| {}); }); }) - }).await.unwrap() + }) + .await + .unwrap() }); } }