Skip to content

Commit 6e990eb

Browse files
authored
rt: fix nesting block_in_place with block_on (#5837)
This patch fixes a bug where nesting `block_in_place` with a `block_on` between could lead to a panic. This happened because the nested `block_in_place` would try to acquire a core on return when it should not attempt to do so. The `block_on` between the two nested `block_in_place` altered the thread-local state to lead to the incorrect behavior. The fix is for each call to `block_in_place` to track if it needs to try to steal a core back. Fixes #5239
1 parent b573adc commit 6e990eb

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

tokio/src/runtime/scheduler/multi_thread/worker.rs

+20-7
Original file line numberDiff line numberDiff line change
@@ -323,26 +323,32 @@ where
323323
F: FnOnce() -> R,
324324
{
325325
// Try to steal the worker core back
326-
struct Reset(coop::Budget);
326+
struct Reset {
327+
take_core: bool,
328+
budget: coop::Budget,
329+
}
327330

328331
impl Drop for Reset {
329332
fn drop(&mut self) {
330333
with_current(|maybe_cx| {
331334
if let Some(cx) = maybe_cx {
332-
let core = cx.worker.core.take();
333-
let mut cx_core = cx.core.borrow_mut();
334-
assert!(cx_core.is_none());
335-
*cx_core = core;
335+
if self.take_core {
336+
let core = cx.worker.core.take();
337+
let mut cx_core = cx.core.borrow_mut();
338+
assert!(cx_core.is_none());
339+
*cx_core = core;
340+
}
336341

337342
// Reset the task budget as we are re-entering the
338343
// runtime.
339-
coop::set(self.0);
344+
coop::set(self.budget);
340345
}
341346
});
342347
}
343348
}
344349

345350
let mut had_entered = false;
351+
let mut take_core = false;
346352

347353
let setup_result = with_current(|maybe_cx| {
348354
match (
@@ -394,6 +400,10 @@ where
394400
None => return Ok(()),
395401
};
396402

403+
// We are taking the core from the context and sending it to another
404+
// thread.
405+
take_core = true;
406+
397407
// The parker should be set here
398408
assert!(core.park.is_some());
399409

@@ -420,7 +430,10 @@ where
420430
if had_entered {
421431
// Unset the current task's budget. Blocking sections are not
422432
// constrained by task budgets.
423-
let _reset = Reset(coop::stop());
433+
let _reset = Reset {
434+
take_core,
435+
budget: coop::stop(),
436+
};
424437

425438
crate::runtime::context::exit_runtime(f)
426439
} else {

tokio/tests/rt_threaded.rs

+28
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,34 @@ async fn test_block_in_place4() {
588588
tokio::task::block_in_place(|| {});
589589
}
590590

591+
// Repro for tokio-rs/tokio#5239
592+
#[test]
593+
fn test_nested_block_in_place_with_block_on_between() {
594+
let rt = runtime::Builder::new_multi_thread()
595+
.worker_threads(1)
596+
// Needs to be more than 0
597+
.max_blocking_threads(1)
598+
.build()
599+
.unwrap();
600+
601+
// Triggered by a race condition, so run a few times to make sure it is OK.
602+
for _ in 0..100 {
603+
let h = rt.handle().clone();
604+
605+
rt.block_on(async move {
606+
tokio::spawn(async move {
607+
tokio::task::block_in_place(|| {
608+
h.block_on(async {
609+
tokio::task::block_in_place(|| {});
610+
});
611+
})
612+
})
613+
.await
614+
.unwrap()
615+
});
616+
}
617+
}
618+
591619
// Testing the tuning logic is tricky as it is inherently timing based, and more
592620
// of a heuristic than an exact behavior. This test checks that the interval
593621
// changes over time based on load factors. There are no assertions, completion

0 commit comments

Comments
 (0)