Skip to content

Commit bd1f78c

Browse files
committed
fix(node)!: Do not skip header-sub reports when store writes are slow
1 parent faa2533 commit bd1f78c

File tree

8 files changed

+124
-91
lines changed

8 files changed

+124
-91
lines changed

node-wasm/src/node.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ impl NodeDriver {
266266
let response = self.client.exec(command).await?;
267267
let header = response.into_last_seen_network_head().check_variant()?;
268268

269-
Ok(header)
269+
header.into()
270270
}
271271

272272
/// Get the latest locally synced header.

node-wasm/src/worker.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,11 @@ impl NodeWorker {
164164
.context("could not serialise fetched headers")
165165
}
166166

167-
async fn get_last_seen_network_head(&mut self) -> JsValue {
168-
// JS interface returns `undefined`, if node haven't received any headers from HeaderSub yet
169-
to_value(&self.node.get_network_head_header()).unwrap_or(JsValue::UNDEFINED)
167+
async fn get_last_seen_network_head(&mut self) -> Result<JsValue> {
168+
match self.node.get_network_head_header().await? {
169+
Some(header) => to_value(&header).context("could not serialise head header"),
170+
None => Ok(JsValue::UNDEFINED),
171+
}
170172
}
171173

172174
async fn get_sampling_metadata(&mut self, height: u64) -> Result<Option<SamplingMetadata>> {
@@ -222,7 +224,7 @@ impl NodeWorker {
222224
.into(),
223225
),
224226
NodeCommand::LastSeenNetworkHead => {
225-
WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await)
227+
WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await.into())
226228
}
227229
NodeCommand::GetSamplingMetadata { height } => {
228230
WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await)

node-wasm/src/worker/commands.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ pub(crate) enum WorkerResponse {
7777
Listeners(Result<Vec<Multiaddr>>),
7878
Header(JsResult<JsValue, Error>),
7979
Headers(JsResult<Array, Error>),
80-
#[serde(with = "serde_wasm_bindgen::preserve")]
81-
LastSeenNetworkHead(JsValue),
80+
LastSeenNetworkHead(JsResult<JsValue, Error>),
8281
SamplingMetadata(Result<Option<SamplingMetadata>>),
8382
WorkerClosed(()),
8483
}

node/src/node.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,8 @@ where
300300
}
301301

302302
/// Get the latest header announced in the network.
303-
pub fn get_network_head_header(&self) -> Option<ExtendedHeader> {
304-
self.p2p.header_sub_watcher().borrow().clone()
303+
pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
304+
Ok(self.p2p.get_network_head().await?)
305305
}
306306

307307
/// Get the latest locally synced header.

node/src/p2p.rs

+73-57
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use celestia_types::nmt::Namespace;
2727
use celestia_types::row::Row;
2828
use celestia_types::sample::Sample;
2929
use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash};
30-
use celestia_types::{ExtendedHeader, FraudProof, Height};
30+
use celestia_types::{ExtendedHeader, FraudProof};
3131
use cid::Cid;
3232
use futures::StreamExt;
3333
use libp2p::{
@@ -165,7 +165,6 @@ impl From<oneshot::error::RecvError> for P2pError {
165165
#[derive(Debug)]
166166
pub struct P2p {
167167
cmd_tx: mpsc::Sender<P2pCmd>,
168-
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
169168
peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
170169
local_peer_id: PeerId,
171170
}
@@ -209,6 +208,8 @@ pub(crate) enum P2pCmd {
209208
},
210209
InitHeaderSub {
211210
head: Box<ExtendedHeader>,
211+
/// Any valid headers received by header-sub will be send to this channel.
212+
channel: mpsc::Sender<ExtendedHeader>,
212213
},
213214
SetPeerTrust {
214215
peer_id: PeerId,
@@ -221,6 +222,9 @@ pub(crate) enum P2pCmd {
221222
GetNetworkCompromisedToken {
222223
respond_to: oneshot::Sender<CancellationToken>,
223224
},
225+
GetNetworkHead {
226+
respond_to: oneshot::Sender<Option<ExtendedHeader>>,
227+
},
224228
}
225229

226230
impl P2p {
@@ -234,21 +238,18 @@ impl P2p {
234238

235239
let local_peer_id = PeerId::from(args.local_keypair.public());
236240

237-
let (cmd_tx, cmd_rx) = mpsc::channel(16);
238-
let (header_sub_tx, header_sub_rx) = watch::channel(None);
239-
240241
let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
241242
let peer_tracker_info_watcher = peer_tracker.info_watcher();
242243

243-
let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?;
244+
let (cmd_tx, cmd_rx) = mpsc::channel(16);
245+
let mut worker = Worker::new(args, cmd_rx, peer_tracker)?;
244246

245247
spawn(async move {
246248
worker.run().await;
247249
});
248250

249251
Ok(P2p {
250252
cmd_tx,
251-
header_sub_watcher: header_sub_rx,
252253
peer_tracker_info_watcher,
253254
local_peer_id,
254255
})
@@ -258,20 +259,18 @@ impl P2p {
258259
#[cfg(any(test, feature = "test-utils"))]
259260
pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
260261
let (cmd_tx, cmd_rx) = mpsc::channel(16);
261-
let (header_sub_tx, header_sub_rx) = watch::channel(None);
262262
let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
263263

264264
let p2p = P2p {
265265
cmd_tx: cmd_tx.clone(),
266-
header_sub_watcher: header_sub_rx,
267266
peer_tracker_info_watcher: peer_tracker_rx,
268267
local_peer_id: PeerId::random(),
269268
};
270269

271270
let handle = crate::test_utils::MockP2pHandle {
272271
cmd_tx,
273272
cmd_rx,
274-
header_sub_tx,
273+
header_sub_tx: None,
275274
peer_tracker_tx,
276275
};
277276

@@ -296,11 +295,6 @@ impl P2p {
296295
.map_err(|_| P2pError::WorkerDied)
297296
}
298297

299-
/// Watcher for the latest verified network head headers announced on `header-sub`.
300-
pub fn header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>> {
301-
self.header_sub_watcher.clone()
302-
}
303-
304298
/// Watcher for the current [`PeerTrackerInfo`].
305299
pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
306300
self.peer_tracker_info_watcher.clone()
@@ -312,9 +306,14 @@ impl P2p {
312306
}
313307

314308
/// Initializes `header-sub` protocol with a given `subjective_head`.
315-
pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> {
309+
pub async fn init_header_sub(
310+
&self,
311+
head: ExtendedHeader,
312+
channel: mpsc::Sender<ExtendedHeader>,
313+
) -> Result<()> {
316314
self.send_command(P2pCmd::InitHeaderSub {
317315
head: Box::new(head),
316+
channel,
318317
})
319318
.await
320319
}
@@ -545,6 +544,16 @@ impl P2p {
545544

546545
Ok(rx.await?)
547546
}
547+
548+
/// Get the latest header announced in the network.
549+
pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
550+
let (tx, rx) = oneshot::channel();
551+
552+
self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
553+
.await?;
554+
555+
Ok(rx.await?)
556+
}
548557
}
549558

550559
/// Our network behaviour.
@@ -573,12 +582,17 @@ where
573582
bad_encoding_fraud_sub_topic: TopicHash,
574583
cmd_rx: mpsc::Receiver<P2pCmd>,
575584
peer_tracker: Arc<PeerTracker>,
576-
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
585+
header_sub_state: Option<HeaderSubState>,
577586
bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
578587
network_compromised_token: CancellationToken,
579588
store: Arc<S>,
580589
}
581590

591+
struct HeaderSubState {
592+
known_head: ExtendedHeader,
593+
channel: mpsc::Sender<ExtendedHeader>,
594+
}
595+
582596
impl<B, S> Worker<B, S>
583597
where
584598
B: Blockstore,
@@ -587,7 +601,6 @@ where
587601
fn new(
588602
args: P2pArgs<B, S>,
589603
cmd_rx: mpsc::Receiver<P2pCmd>,
590-
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
591604
peer_tracker: Arc<PeerTracker>,
592605
) -> Result<Self, P2pError> {
593606
let local_peer_id = PeerId::from(args.local_keypair.public());
@@ -649,7 +662,7 @@ where
649662
bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
650663
header_sub_topic_hash: header_sub_topic.hash(),
651664
peer_tracker,
652-
header_sub_watcher,
665+
header_sub_state: None,
653666
bitswap_queries: HashMap::new(),
654667
network_compromised_token: CancellationToken::new(),
655668
store: args.store,
@@ -771,8 +784,8 @@ where
771784
P2pCmd::ConnectedPeers { respond_to } => {
772785
respond_to.maybe_send(self.peer_tracker.connected_peers());
773786
}
774-
P2pCmd::InitHeaderSub { head } => {
775-
self.on_init_header_sub(*head);
787+
P2pCmd::InitHeaderSub { head, channel } => {
788+
self.on_init_header_sub(*head, channel);
776789
}
777790
P2pCmd::SetPeerTrust {
778791
peer_id,
@@ -786,7 +799,14 @@ where
786799
self.on_get_shwap_cid(cid, respond_to);
787800
}
788801
P2pCmd::GetNetworkCompromisedToken { respond_to } => {
789-
respond_to.maybe_send(self.network_compromised_token.child_token())
802+
respond_to.maybe_send(self.network_compromised_token.child_token());
803+
}
804+
P2pCmd::GetNetworkHead { respond_to } => {
805+
let head = self
806+
.header_sub_state
807+
.as_ref()
808+
.map(|state| state.known_head.clone());
809+
respond_to.maybe_send(head);
790810
}
791811
}
792812

@@ -836,7 +856,7 @@ where
836856
};
837857

838858
let acceptance = if message.topic == self.header_sub_topic_hash {
839-
self.on_header_sub_message(&message.data[..]).await
859+
self.on_header_sub_message(&message.data[..])
840860
} else if message.topic == self.bad_encoding_fraud_sub_topic {
841861
self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
842862
.await
@@ -961,41 +981,41 @@ where
961981
}
962982

963983
#[instrument(skip_all, fields(header = %head))]
964-
fn on_init_header_sub(&mut self, head: ExtendedHeader) {
965-
self.header_sub_watcher.send_replace(Some(head));
984+
fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
985+
self.header_sub_state = Some(HeaderSubState {
986+
known_head: head,
987+
channel,
988+
});
966989
trace!("HeaderSub initialized");
967990
}
968991

969992
#[instrument(skip_all)]
970-
async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
993+
fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
971994
let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
972995
trace!("Malformed or invalid header from header-sub");
973996
return gossipsub::MessageAcceptance::Reject;
974997
};
975998

976999
trace!("Received header from header-sub ({header})");
9771000

978-
let updated = self.header_sub_watcher.send_if_modified(move |state| {
979-
let Some(known_header) = state else {
980-
debug!("HeaderSub not initialized yet");
981-
return false;
982-
};
1001+
let Some(ref mut state) = self.header_sub_state else {
1002+
debug!("header-sub not initialized yet");
1003+
return gossipsub::MessageAcceptance::Ignore;
1004+
};
9831005

984-
if known_header.verify(&header).is_err() {
985-
trace!("Failed to verify HeaderSub header. Ignoring {header}");
986-
return false;
987-
}
1006+
if state.known_head.verify(&header).is_err() {
1007+
trace!("Failed to verify HeaderSub header. Ignoring {header}");
1008+
return gossipsub::MessageAcceptance::Ignore;
1009+
}
9881010

989-
debug!("New header from header-sub ({header})");
990-
*state = Some(header);
991-
true
992-
});
1011+
trace!("New header from header-sub ({header})");
9931012

994-
if updated {
995-
gossipsub::MessageAcceptance::Accept
996-
} else {
997-
gossipsub::MessageAcceptance::Ignore
998-
}
1013+
state.known_head = header.clone();
1014+
// We intentionally do not `send().await` to avoid blocking `P2p`
1015+
// in case `Syncer` enters some weird state.
1016+
let _ = state.channel.try_send(header);
1017+
1018+
gossipsub::MessageAcceptance::Accept
9991019
}
10001020

10011021
#[instrument(skip_all)]
@@ -1011,15 +1031,15 @@ where
10111031
};
10121032

10131033
let height = befp.height().value();
1014-
let current_height =
1015-
if let Some(network_height) = network_head_height(&self.header_sub_watcher) {
1016-
network_height.value()
1017-
} else if let Ok(local_head) = self.store.get_head().await {
1018-
local_head.height().value()
1019-
} else {
1020-
// we aren't tracking the network and have uninitialized store
1021-
return gossipsub::MessageAcceptance::Ignore;
1022-
};
1034+
1035+
let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
1036+
header_sub_state.known_head.height().value()
1037+
} else if let Ok(local_head) = self.store.get_head().await {
1038+
local_head.height().value()
1039+
} else {
1040+
// we aren't tracking the network and have uninitialized store
1041+
return gossipsub::MessageAcceptance::Ignore;
1042+
};
10231043

10241044
if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
10251045
// does this threshold make any sense if we're gonna ignore it anyway
@@ -1156,7 +1176,3 @@ where
11561176
.client_set_send_dont_have(false)
11571177
.build())
11581178
}
1159-
1160-
fn network_head_height(watcher: &watch::Sender<Option<ExtendedHeader>>) -> Option<Height> {
1161-
watcher.borrow().as_ref().map(|header| header.height())
1162-
}

0 commit comments

Comments
 (0)