Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node)!: Generate events on peer connection/disconnection #291

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::panic::Location;
use std::time::Duration;

use instant::SystemTime;
use libp2p::PeerId;
use serde::Serialize;
use tokio::sync::broadcast;

Expand Down Expand Up @@ -161,6 +162,24 @@ pub struct NodeEventInfo {
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum NodeEvent {
/// Peer just connected
PeerConnected {
#[serde(serialize_with = "serialize_as_string")]
/// The ID of the peer.
id: PeerId,
/// Whether peer was in the trusted list or not.
trusted: bool,
},

/// Peer just disconnected
PeerDisconnected {
#[serde(serialize_with = "serialize_as_string")]
/// The ID of the peer.
id: PeerId,
/// Whether peer was in the trusted list or not.
trusted: bool,
},

/// Sampling just started.
SamplingStarted {
/// The block height that will be sampled.
Expand Down Expand Up @@ -205,6 +224,20 @@ pub enum NodeEvent {
impl fmt::Display for NodeEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NodeEvent::PeerConnected { id, trusted } => {
if *trusted {
write!(f, "Trusted peer connected: {id}")
} else {
write!(f, "Peer connected: {id}")
}
}
NodeEvent::PeerDisconnected { id, trusted } => {
if *trusted {
write!(f, "Trusted peer disconnected: {id}")
} else {
write!(f, "Peer disconnected: {id}")
}
}
NodeEvent::SamplingStarted {
height,
square_width,
Expand Down Expand Up @@ -243,6 +276,14 @@ impl fmt::Display for NodeEvent {
}
}

fn serialize_as_string<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: ToString,
S: serde::ser::Serializer,
{
value.to_string().serialize(serializer)
}

#[cfg(target_arch = "wasm32")]
fn serialize_system_time<S>(value: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
1 change: 1 addition & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ where
listen_on: config.p2p_listen_on,
blockstore: config.blockstore,
store: store.clone(),
event_pub: event_channel.publisher(),
})?);

let syncer = Arc::new(Syncer::start(SyncerArgs {
Expand Down
5 changes: 4 additions & 1 deletion node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ mod header_session;
pub(crate) mod shwap;
mod swarm;

use crate::events::EventPublisher;
use crate::executor::{self, spawn, Interval};
use crate::p2p::header_ex::{HeaderExBehaviour, HeaderExConfig};
use crate::p2p::header_session::HeaderSession;
Expand Down Expand Up @@ -187,6 +188,8 @@ where
pub blockstore: B,
/// The store for headers.
pub store: Arc<S>,
/// Event publisher.
pub event_pub: EventPublisher,
}

#[derive(Debug)]
Expand Down Expand Up @@ -234,7 +237,7 @@ impl P2p {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (header_sub_tx, header_sub_rx) = watch::channel(None);

let peer_tracker = Arc::new(PeerTracker::new());
let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
let peer_tracker_info_watcher = peer_tracker.info_watcher();

let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?;
Expand Down
4 changes: 3 additions & 1 deletion node/src/p2p/header_ex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ async fn decode_and_verify_responses(
#[cfg(test)]
mod tests {
use super::*;
use crate::events::EventChannel;
use crate::p2p::header_ex::utils::ExtendedHeaderExt;
use crate::test_utils::async_test;
use celestia_proto::p2p::pb::StatusCode;
Expand Down Expand Up @@ -1105,7 +1106,8 @@ mod tests {
}

fn peer_tracker_with_n_peers(amount: usize) -> Arc<PeerTracker> {
let peers = Arc::new(PeerTracker::new());
let event_channel = EventChannel::new();
let peers = Arc::new(PeerTracker::new(event_channel.publisher()));

for i in 0..amount {
let peer = PeerId::random();
Expand Down
35 changes: 25 additions & 10 deletions node/src/peer_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ use serde::Serialize;
use smallvec::SmallVec;
use tokio::sync::watch;

use crate::events::{EventPublisher, NodeEvent};

/// Keeps track various information about peers.
#[derive(Debug)]
pub struct PeerTracker {
peers: DashMap<PeerId, PeerInfo>,
info_tx: watch::Sender<PeerTrackerInfo>,
event_pub: EventPublisher,
}

/// Statistics of the connected peers
Expand Down Expand Up @@ -51,10 +54,11 @@ impl PeerInfo {

impl PeerTracker {
/// Constructs an empty PeerTracker.
pub fn new() -> Self {
pub fn new(event_pub: EventPublisher) -> Self {
PeerTracker {
peers: DashMap::new(),
info_tx: watch::channel(PeerTrackerInfo::default()).0,
event_pub,
}
}

Expand Down Expand Up @@ -164,7 +168,13 @@ impl PeerTracker {
// If peer was not already connected from before
if !peer_info.is_connected() {
peer_info.state = PeerState::Connected;

increment_connected_peers(&self.info_tx, peer_info.trusted);

self.event_pub.send(NodeEvent::PeerConnected {
id: peer,
trusted: peer_info.trusted,
});
}
}

Expand All @@ -185,6 +195,12 @@ impl PeerTracker {
}

decrement_connected_peers(&self.info_tx, peer_info.trusted);

self.event_pub.send(NodeEvent::PeerDisconnected {
id: peer,
trusted: peer_info.trusted,
});

true
} else {
false
Expand Down Expand Up @@ -270,12 +286,6 @@ impl PeerTracker {
}
}

impl Default for PeerTracker {
fn default() -> Self {
PeerTracker::new()
}
}

fn increment_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
info_tx.send_modify(|tracker_info| {
tracker_info.num_connected_peers += 1;
Expand All @@ -298,11 +308,14 @@ fn decrement_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted:

#[cfg(test)]
mod tests {
use crate::events::EventChannel;

use super::*;

#[test]
fn trust_before_connect() {
let tracker = PeerTracker::new();
let event_channel = EventChannel::new();
let tracker = PeerTracker::new(event_channel.publisher());
let mut watcher = tracker.info_watcher();
let peer = PeerId::random();

Expand All @@ -320,7 +333,8 @@ mod tests {

#[test]
fn trust_after_connect() {
let tracker = PeerTracker::new();
let event_channel = EventChannel::new();
let tracker = PeerTracker::new(event_channel.publisher());
let mut watcher = tracker.info_watcher();
let peer = PeerId::random();

Expand All @@ -341,7 +355,8 @@ mod tests {

#[test]
fn untrust_after_connect() {
let tracker = PeerTracker::new();
let event_channel = EventChannel::new();
let tracker = PeerTracker::new(event_channel.publisher());
let mut watcher = tracker.info_watcher();
let peer = PeerId::random();

Expand Down
Loading