Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(node): Allow syncing from header-sub as soon as node is connected #324

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 50 additions & 31 deletions node/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,8 @@ where
let network_head = p2p.get_head_header().await?;
let network_head_height = network_head.height().value();

// If store is empty, intialize it with network head
if store.head_height().await.is_err() {
store.insert(network_head.clone()).await?;
}

// Insert HEAD to the store and initialize header-sub
store.insert(network_head.clone()).await?;
p2p.init_header_sub(network_head).await?;

Ok(network_head_height)
Expand Down Expand Up @@ -795,7 +792,7 @@ mod tests {
let store = Arc::new(store);

let mut headers = gen.next_many(520);
let network_head = headers.last().cloned().unwrap();
let network_head = gen.next(); // height 546

let syncer = Syncer::start(SyncerArgs {
p2p: Arc::new(p2p),
Expand All @@ -816,9 +813,9 @@ mod tests {
let head_from_syncer = p2p_mock.expect_init_header_sub().await;
assert_eq!(head_from_syncer, network_head);

assert_syncing(&syncer, &store, &[1..=25], 545).await;
assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;

// Syncer requested the first batch ([37, 545])
// Syncer requested the first batch ([34, 545])
handle_session_batch(
&mut p2p_mock,
&headers,
Expand All @@ -834,7 +831,7 @@ mod tests {
],
)
.await;
assert_syncing(&syncer, &store, &[1..=25, 34..=545], 545).await;
assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;

// Syncer requested the remaining batch ([26, 33])
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
Expand All @@ -844,7 +841,7 @@ mod tests {
.send(Ok(headers.drain(..8).collect()))
.map_err(|_| "headers [538, 545]")
.unwrap();
assert_syncing(&syncer, &store, &[1..=545], 545).await;
assert_syncing(&syncer, &store, &[1..=546], 546).await;

// Syncer is fulling synced and awaiting for events
p2p_mock.expect_no_cmd().await;
Expand Down Expand Up @@ -872,45 +869,64 @@ mod tests {
#[async_test]
async fn all_peers_disconnected() {
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(26);

// Start Syncer and report height 25 as HEAD
let (syncer, store, mut p2p_mock) = initialized_syncer(headers[25].clone()).await;
let _gap = gen.next_many(24);
let header25 = gen.next();
let _gap = gen.next_many(4);
let header30 = gen.next();
let _gap = gen.next_many(4);
let header35 = gen.next();

// Start Syncer and report height 30 as HEAD
let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;

// Wait for the request but do not reply to it
// Wait for the request but do not reply to it.
let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 25);
assert_eq!(amount, 29);

p2p_mock.announce_all_peers_disconnected();
// Syncer is now back to `connecting_event_loop`.
p2p_mock.expect_no_cmd().await;

// Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
// if a trusted peer is connected.
p2p_mock.announce_peer_connected();
p2p_mock.expect_no_cmd().await;

// Accounce a trusted peer.
p2p_mock.announce_trusted_peer_connected();

// Syncer is now back to `connecting_event_loop`, so we expect a request for HEAD
// Now syncer will send request for HEAD.
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 0);
assert_eq!(amount, 1);
// Now HEAD is height 26
respond_to.send(Ok(vec![headers[25].clone()])).unwrap();

// Syncer initializes HeaderSub with the latest HEAD
let head_from_syncer = p2p_mock.expect_init_header_sub().await;
assert_eq!(&head_from_syncer, headers.last().unwrap());
// Report an older head. Syncer should not accept it.
respond_to.send(Ok(vec![header25])).unwrap();
assert_syncing(&syncer, &store, &[30..=30], 30).await;

// Syncer now moved to `connected_event_loop`
// Syncer will request HEAD again after some time.
sleep(Duration::from_secs(1)).await;
let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 25);
respond_to
.send(Ok(headers[0..25].to_vec()))
// Mapping to avoid spamming error message on failure
.map_err(|_| "headers [1, 26]")
.unwrap();
assert_eq!(height, 0);
assert_eq!(amount, 1);

assert_syncing(&syncer, &store, &[1..=26], 26).await;
// Report newer HEAD than before.
respond_to.send(Ok(vec![header35.clone()])).unwrap();
assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;

// Node is fully synced, so nothing else is produced.
// Syncer initializes HeaderSub with the latest HEAD.
let head_from_syncer = p2p_mock.expect_init_header_sub().await;
assert_eq!(head_from_syncer, header35);

// Syncer now is in `connected_event_loop` and will try to sync the gap
// that is closer to HEAD.
let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 31);
assert_eq!(amount, 4);

p2p_mock.announce_all_peers_disconnected();
p2p_mock.expect_no_cmd().await;
}

Expand Down Expand Up @@ -1039,6 +1055,9 @@ mod tests {
let head_from_syncer = handle.expect_init_header_sub().await;
assert_eq!(head_from_syncer, head);

let head_height = head.height().value();
assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;

(syncer, store, handle)
}

Expand Down
Loading