Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync::watch misbehaves in multi-threaded context #3168

Closed
tijsvd opened this issue Nov 23, 2020 · 6 comments · Fixed by #3234
Closed

sync::watch misbehaves in multi-threaded context #3168

tijsvd opened this issue Nov 23, 2020 · 6 comments · Fixed by #3234
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync

Comments

@tijsvd
Copy link
Contributor

tijsvd commented Nov 23, 2020

Version
tokio v0.3.4

Platform
Linux 4.9.0-13-amd64 #1 SMP Debian 4.9.228-1 (2020-07-05) x86_64 GNU/Linux

Description
I'm having some trouble with sync::watch under high-ish load. Maybe related to sync::Notify. I think there may be multiple problems.

Situation 1: regular multi-threaded runtime

Minimal example:

async fn sender(snd: watch::Sender<i32>) {
        let mut n = 0;
        loop {
                tokio::time::sleep(std::time::Duration::from_millis(1)).await;
                for _ in 0..1000 {
                        n += 1;
                        snd.send(n).ok();
                }
                println!("snd {}", n);
        }
}

async fn receiver(mut rcv: watch::Receiver<i32>) {
        loop {
                rcv.changed().await.unwrap();
                let n = *rcv.borrow();
                if n % 1000 == 0 {
                        println!("rcv {}", n);
                }
        }
}

#[tokio::main(flavor="multi_thread")]
async fn main() {
        let (snd, rcv) = watch::channel(0);
        tokio::spawn(sender(snd));
        tokio::spawn(receiver(rcv));
        loop {
                tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
        }
}

I expected this to run forever. However, it hangs after a varying number of iterations (typically ~45000 on my machine). Intestingly, it blocks in the sender. If I remove the receiver altogether, it still hangs. If I remove the call to send(), it continues smoothly.

According to gdb, a number of threads are waiting in pthread_cond_wait under what seems like normal idle thread conditions. One thread is in epoll_wait with a timeout of 3407803. No thread seems to be in a timed-wait situation and it doesn't look like there is a deadlock. But when I interrupt in gdb and then continue, the application continues a bit further before hanging again -- indicates a timeout is calculated incorrectly somewhere?

In a single-threaded environment (flavor="current_thread"), it's all fine apparently.

Situation 2: multiple single-threaded runtimes

Running in an explicit multi-threaded environment with multiple runtimes:

fn spawn_thread<B,F>(task: B)
        where B: Send + 'static + FnOnce()->F,
                  F: std::future::Future<Output=()> + 'static
{
        std::thread::spawn(move|| {
                let rt = tokio::runtime::Builder::new_current_thread()
                        .enable_all()
                        .build().unwrap();
                rt.block_on(task());
        });
}

...
spawn_thread(move || sender(snd));
spawn_thread(move || receiver(rcv));

Results in:

thread '<unnamed>' panicked at '[bug] failed to observe change after notificaton.', <home>/.cargo/registry/src/github.ghproxy.top-1ecc6299db9ec823/tokio-0.3.4/src/sync/watch.rs:256:14

The code on that location indicates spurious notifications. I expected code involving multiple runtimes in different threads to work fine, as it did in tokio 0.2. By the way, the error message itself has a spelling error (misssing an i in notificaton).

@tijsvd tijsvd added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Nov 23, 2020
@Darksonn Darksonn added the M-sync Module: tokio/sync label Nov 24, 2020
@tijsvd
Copy link
Contributor Author

tijsvd commented Nov 28, 2020

Ok, I've dug into this a bit further. The first situation, i.e. multi-threaded runtime, can be reduced to a timer ordering problem, or a race condition when adding a new timer. This is fairly serious.

#[tokio::main(flavor="multi_thread")]
async fn main() {
    tokio::spawn(async {
        for i in 0.. {
            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
            println!("slept {}", i);
        }
    });

    loop {
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
    }
}

This will print a continuous series of numbers on my terminal, but from time to time it will hang for a few (up to 10) seconds. When I change the second timer to a large number, it will just hang. Perhaps this should be a separate issue?

The second situation (multiple explicit threads) can be reduced to spurious wakeups in Notify::notify_waiters().

use tokio::sync::Notify;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize,Ordering::*};

fn spawn_thread<B,F>(task: B)
    where B: Send + 'static + FnOnce()->F,
          F: std::future::Future<Output=()> + 'static
{
    std::thread::spawn(move|| {
        let rt = tokio::runtime::Builder::new_current_thread()
            //.enable_all()
            .enable_time()
            .build().unwrap();
        rt.block_on(task());
    });
}

struct Stuff {
    x: AtomicUsize,
    n: Notify,
}

async fn sender(s: Arc<Stuff>) {
    loop {
        tokio::time::sleep(std::time::Duration::from_millis(1)).await;
        let mut prev = 0;
        for _ in 0..1000 {
            prev = s.x.fetch_add(1, SeqCst);
            s.n.notify_waiters();
        }
        println!("snd {}", prev + 1);
    }
}

async fn receiver(s: Arc<Stuff>) {
    let mut last = s.x.load(SeqCst);
    loop {
        let n = s.n.notified();
        let current = s.x.load(SeqCst);
        if current == last {
            n.await;
        }
        let nw = s.x.load(SeqCst);
        assert!(nw != last, "fail nw=last={}", nw);
        last = nw;
        if nw % 1000 == 0 {
            println!("rcv {}", nw);
        }
    }
}

#[tokio::main(flavor="multi_thread")]
async fn main() {
    let s = Arc::new(Stuff {
        x: AtomicUsize::new(0),
        n: Notify::new(),
    });
    let snd = s.clone();
    let rcv = s;
    spawn_thread(move || sender(snd));
    spawn_thread(move || receiver(rcv));
    loop {
        tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
    }
}

This asserts after a few 100 iterations. The logic is the same as what sync::watch does. Sender increases the counter, then notifies, while receiver registers for notification, then checks counter, and awaits if it hasn't changed yet. Reading the code of Notification, it was meant to work correctly this way.

It's interesting how this happens only in the case of two runtimes in different threads. Running both tasks in a single multi-threaded runtime (changing sleep to yield_now to avoid first issue) does not hit the assert.

@tijsvd
Copy link
Contributor Author

tijsvd commented Nov 28, 2020

I've reduced the interaction between watch and Notify to a series of atomic updates. The following asserts as well:

use std::sync::{Arc};
use std::sync::atomic::{AtomicUsize,Ordering::*};

struct Stuff {
    x: AtomicUsize, // watch version
    n: AtomicUsize, // notified counter
}

fn sender(s: Arc<Stuff>) {
    loop {
        std::thread::sleep(std::time::Duration::from_millis(1));
        let mut prev = 0;
        for _ in 0..1000 {
            prev = s.x.fetch_add(1, SeqCst);
            s.n.fetch_add(1, SeqCst);
        }
        println!("snd {}", prev + 1);
    }
}

fn receiver(s: Arc<Stuff>) {
    let mut last = s.x.load(SeqCst);
    loop {
        let n = s.n.load(SeqCst);
        let mut m = n;
        let current = s.x.load(SeqCst);
        if current == last {
            while n == m {
                m = s.n.load(SeqCst);
            }
        }
        let nw = s.x.load(SeqCst);
        assert!(nw != last, "fail nw=last={}, current={}, n={}, m={}", nw, current, n, m);
        last = nw;
        if nw % 1000 == 0 {
            println!("rcv {}", nw);
        }
    }
}

fn main() {
    let s = Arc::new(Stuff {
        x: AtomicUsize::new(0),
        n: AtomicUsize::new(0),
    });
    let snd = s.clone();
    let rcv = s;
    std::thread::spawn(move || sender(snd));
    receiver(rcv);
}

with a message like:

thread 'main' panicked at 'fail nw=last=2066, current=2066, n=2065, m=2066', src/main.rs:33:9

Either there's a problem with the Rust compiler and atomic integers, or the logic is somehow not sound.

@tijsvd
Copy link
Contributor Author

tijsvd commented Nov 28, 2020

Looking at the reduced case, the problem is actually obvious. Remember that the sender updates the value/version first, then notifies. A first call to Receiver::changed() may observe an update. Then, a subsequent call to changed() may first see that the version is still the same as in the previous observation (watch.rs:249), and await the notifier (watch.rs:253). If the notification related to the previous update (which was already observed) comes through only now, the notifier will show ready, and the assertion will hit. Registering the notifier early (watch.rs:247) only makes sure we don't miss a notification.

It would be better to write this code like one generally would when waiting on some condition variable, robust against spurious wake-ups:

 loop {
        let notified = self.shared.notify_rx.notified();
        if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
            return ret;
        }
        notified.await;
}

@tijsvd
Copy link
Contributor Author

tijsvd commented Nov 28, 2020

Timer problem registered as #3190

@carllerche
Copy link
Member

@tijsvd Thanks for debugging this. You are correct that watch does not correctly handle spurious notifications. Is this something that you can submit as an issue? I'm not sure what would be the best way to write a test, either a "stress" test that loops a bunch or a loom test. What do you think?

@tijsvd
Copy link
Contributor Author

tijsvd commented Dec 8, 2020

@tijsvd Thanks for debugging this. You are correct that watch does not correctly handle spurious notifications. Is this something that you can submit as an issue? I'm not sure what would be the best way to write a test, either a "stress" test that loops a bunch or a loom test. What do you think?

I'm not at all familiar with this project's testing framework -- I just noticed the bug. If this was my own project, I'd make sure to have a test that nothing is missed, and then just add the loop in the name of making it more robust.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants