diff --git a/node/src/events.rs b/node/src/events.rs index 146b09fc..d173a92e 100644 --- a/node/src/events.rs +++ b/node/src/events.rs @@ -5,6 +5,7 @@ use std::panic::Location; use std::time::Duration; use instant::SystemTime; +use libp2p::PeerId; use serde::Serialize; use tokio::sync::broadcast; @@ -161,6 +162,24 @@ pub struct NodeEventInfo { #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum NodeEvent { + /// Peer just connected + PeerConnected { + #[serde(serialize_with = "serialize_as_string")] + /// The ID of the peer. + id: PeerId, + /// Whether peer was in the trusted list or not. + trusted: bool, + }, + + /// Peer just disconnected + PeerDisconnected { + #[serde(serialize_with = "serialize_as_string")] + /// The ID of the peer. + id: PeerId, + /// Whether peer was in the trusted list or not. + trusted: bool, + }, + /// Sampling just started. SamplingStarted { /// The block height that will be sampled. @@ -205,6 +224,20 @@ pub enum NodeEvent { impl fmt::Display for NodeEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + NodeEvent::PeerConnected { id, trusted } => { + if *trusted { + write!(f, "Trusted peer connected: {id}") + } else { + write!(f, "Peer connected: {id}") + } + } + NodeEvent::PeerDisconnected { id, trusted } => { + if *trusted { + write!(f, "Trusted peer disconnected: {id}") + } else { + write!(f, "Peer disconnected: {id}") + } + } NodeEvent::SamplingStarted { height, square_width, @@ -243,6 +276,14 @@ impl fmt::Display for NodeEvent { } } +fn serialize_as_string(value: &T, serializer: S) -> Result +where + T: ToString, + S: serde::ser::Serializer, +{ + value.to_string().serialize(serializer) +} + #[cfg(target_arch = "wasm32")] fn serialize_system_time(value: &SystemTime, serializer: S) -> Result where diff --git a/node/src/node.rs b/node/src/node.rs index 061f77a1..415f848c 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -105,6 +105,7 @@ where listen_on: config.p2p_listen_on, blockstore: config.blockstore, store: store.clone(), + event_pub: event_channel.publisher(), })?); let syncer = Arc::new(Syncer::start(SyncerArgs { diff --git a/node/src/p2p.rs b/node/src/p2p.rs index d7106a95..a818cbd4 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -55,6 +55,7 @@ mod header_session; pub(crate) mod shwap; mod swarm; +use crate::events::EventPublisher; use crate::executor::{self, spawn, Interval}; use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig}; use crate::p2p::header_session::HeaderSession; @@ -187,6 +188,8 @@ where pub blockstore: B, /// The store for headers. pub store: Arc, + /// Event publisher. + pub event_pub: EventPublisher, } #[derive(Debug)] @@ -234,7 +237,7 @@ impl P2p { let (cmd_tx, cmd_rx) = mpsc::channel(16); let (header_sub_tx, header_sub_rx) = watch::channel(None); - let peer_tracker = Arc::new(PeerTracker::new()); + let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone())); let peer_tracker_info_watcher = peer_tracker.info_watcher(); let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?; diff --git a/node/src/p2p/header_ex/client.rs b/node/src/p2p/header_ex/client.rs index 3020187a..302308dd 100644 --- a/node/src/p2p/header_ex/client.rs +++ b/node/src/p2p/header_ex/client.rs @@ -303,6 +303,7 @@ async fn decode_and_verify_responses( #[cfg(test)] mod tests { use super::*; + use crate::events::EventChannel; use crate::p2p::header_ex::utils::ExtendedHeaderExt; use crate::test_utils::async_test; use celestia_proto::p2p::pb::StatusCode; @@ -1105,7 +1106,8 @@ mod tests { } fn peer_tracker_with_n_peers(amount: usize) -> Arc { - let peers = Arc::new(PeerTracker::new()); + let event_channel = EventChannel::new(); + let peers = Arc::new(PeerTracker::new(event_channel.publisher())); for i in 0..amount { let peer = PeerId::random(); diff --git a/node/src/peer_tracker.rs b/node/src/peer_tracker.rs index 773d3713..efc34c5e 100644 --- a/node/src/peer_tracker.rs +++ b/node/src/peer_tracker.rs @@ -11,11 +11,14 @@ use serde::Serialize; use smallvec::SmallVec; use tokio::sync::watch; +use crate::events::{EventPublisher, NodeEvent}; + /// Keeps track various information about peers. #[derive(Debug)] pub struct PeerTracker { peers: DashMap, info_tx: watch::Sender, + event_pub: EventPublisher, } /// Statistics of the connected peers @@ -51,10 +54,11 @@ impl PeerInfo { impl PeerTracker { /// Constructs an empty PeerTracker. - pub fn new() -> Self { + pub fn new(event_pub: EventPublisher) -> Self { PeerTracker { peers: DashMap::new(), info_tx: watch::channel(PeerTrackerInfo::default()).0, + event_pub, } } @@ -164,7 +168,13 @@ impl PeerTracker { // If peer was not already connected from before if !peer_info.is_connected() { peer_info.state = PeerState::Connected; + increment_connected_peers(&self.info_tx, peer_info.trusted); + + self.event_pub.send(NodeEvent::PeerConnected { + id: peer, + trusted: peer_info.trusted, + }); } } @@ -185,6 +195,12 @@ impl PeerTracker { } decrement_connected_peers(&self.info_tx, peer_info.trusted); + + self.event_pub.send(NodeEvent::PeerDisconnected { + id: peer, + trusted: peer_info.trusted, + }); + true } else { false @@ -270,12 +286,6 @@ impl PeerTracker { } } -impl Default for PeerTracker { - fn default() -> Self { - PeerTracker::new() - } -} - fn increment_connected_peers(info_tx: &watch::Sender, trusted: bool) { info_tx.send_modify(|tracker_info| { tracker_info.num_connected_peers += 1; @@ -298,11 +308,14 @@ fn decrement_connected_peers(info_tx: &watch::Sender, trusted: #[cfg(test)] mod tests { + use crate::events::EventChannel; + use super::*; #[test] fn trust_before_connect() { - let tracker = PeerTracker::new(); + let event_channel = EventChannel::new(); + let tracker = PeerTracker::new(event_channel.publisher()); let mut watcher = tracker.info_watcher(); let peer = PeerId::random(); @@ -320,7 +333,8 @@ mod tests { #[test] fn trust_after_connect() { - let tracker = PeerTracker::new(); + let event_channel = EventChannel::new(); + let tracker = PeerTracker::new(event_channel.publisher()); let mut watcher = tracker.info_watcher(); let peer = PeerId::random(); @@ -341,7 +355,8 @@ mod tests { #[test] fn untrust_after_connect() { - let tracker = PeerTracker::new(); + let event_channel = EventChannel::new(); + let tracker = PeerTracker::new(event_channel.publisher()); let mut watcher = tracker.info_watcher(); let peer = PeerId::random();