Skip to content

Commit

Permalink
notifications/libp2p: Terminate the outbound notification substream o…
Browse files Browse the repository at this point in the history
…n `std::io::Errors` (#7724)

This PR handles a case where we called the `poll_next` on an outbound
substream notification to check if the stream is closed. It is entirely
possible that the `poll_next` would return an `io::error`, for example
end of file.

This PR ensures that we make the distinction between unexpected incoming
data, and error originated from `poll_next`.

While at it, the bulk of the PR change propagates the PeerID from the
network behavior, through the notification handler, to the notification
outbound stream for logging purposes.

cc @paritytech/networking 

Part of: #7722

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Feb 28, 2025
1 parent 9adb8d2 commit 1bc6ca6
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 26 deletions.
13 changes: 13 additions & 0 deletions prdoc/pr_7724.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
title: Terminate libp2p the outbound notification substream on io errors

doc:
- audience: [Node Dev, Node Operator]
description: |
This PR handles a case where we called the poll_next on an outbound substream notification to check if the stream is closed.
It is entirely possible that the poll_next would return an io::error, for example end of file.
This PR ensures that we make the distinction between unexpected incoming data, and error originated from poll_next.
While at it, the bulk of the PR change propagates the PeerID from the network behavior, through the notification handler, to the notification outbound stream for logging purposes.

crates:
- name: sc-network
bump: patch
24 changes: 18 additions & 6 deletions substrate/client/network/src/protocol/notifications/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ impl Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
});
*connec_state = ConnectionState::Opening;
*occ_entry.into_mut() = PeerState::Enabled { connections };
Expand Down Expand Up @@ -1062,7 +1062,10 @@ impl Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: incoming.peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: incoming.set_id.into(),
peer_id: incoming.peer_id,
},
});
*connec_state = ConnectionState::Opening;
}
Expand Down Expand Up @@ -1260,7 +1263,10 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: set_id.into(),
peer_id,
},
});

let mut connections = SmallVec::new();
Expand Down Expand Up @@ -1762,7 +1768,10 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: set_id.into(),
peer_id,
},
});
*connec_state = ConnectionState::Opening;
} else {
Expand Down Expand Up @@ -1849,7 +1858,10 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open {
protocol_index: set_id.into(),
peer_id,
},
});
*connec_state = ConnectionState::Opening;

Expand Down Expand Up @@ -2336,7 +2348,7 @@ impl NetworkBehaviour for Notifications {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connec_id),
event: NotifsHandlerIn::Open { protocol_index: set_id.into() },
event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id },
});
*connec_state = ConnectionState::Opening;
*peer_state = PeerState::Enabled { connections: mem::take(connections) };
Expand Down
37 changes: 30 additions & 7 deletions substrate/client/network/src/protocol/notifications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ pub enum NotifsHandlerIn {
Open {
/// Index of the protocol in the list of protocols passed at initialization.
protocol_index: usize,

/// The peer id of the remote.
peer_id: PeerId,
},

/// Instruct the handler to close the notification substreams, or reject any pending incoming
Expand Down Expand Up @@ -632,7 +635,7 @@ impl ConnectionHandler for NotifsHandler {

fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
match message {
NotifsHandlerIn::Open { protocol_index } => {
NotifsHandlerIn::Open { protocol_index, peer_id } => {
let protocol_info = &mut self.protocols[protocol_index];
match &mut protocol_info.state {
State::Closed { pending_opening } => {
Expand All @@ -642,6 +645,7 @@ impl ConnectionHandler for NotifsHandler {
protocol_info.config.fallback_names.clone(),
protocol_info.config.handshake.read().clone(),
protocol_info.config.max_notification_size,
peer_id,
);

self.events_queue.push_back(
Expand All @@ -663,6 +667,7 @@ impl ConnectionHandler for NotifsHandler {
protocol_info.config.fallback_names.clone(),
handshake_message.clone(),
protocol_info.config.max_notification_size,
peer_id,
);

self.events_queue.push_back(
Expand Down Expand Up @@ -1202,7 +1207,10 @@ pub mod tests {
.await;

// move the handler state to 'Opening'
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1273,7 +1281,10 @@ pub mod tests {
.await;

// move the handler state to 'Opening'
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1362,7 +1373,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1421,7 +1435,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1502,7 +1519,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down Expand Up @@ -1550,7 +1570,10 @@ pub mod tests {

// first instruct the handler to open a connection and then close it right after
// so the handler is in state `Closed { pending_opening: true }`
handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 });
handler.on_behaviour_event(NotifsHandlerIn::Open {
protocol_index: 0,
peer_id: PeerId::random(),
});
assert!(std::matches!(
handler.protocols[0].state,
State::Opening { in_substream: Some(_), .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ use crate::types::ProtocolName;
use asynchronous_codec::Framed;
use bytes::BytesMut;
use futures::prelude::*;
use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use log::{error, warn};
use libp2p::{
core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
PeerId,
};
use log::{debug, error, warn};
use unsigned_varint::codec::UviBytes;

use std::{
Expand Down Expand Up @@ -78,6 +81,8 @@ pub struct NotificationsOut {
initial_message: Vec<u8>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
/// The peerID of the remote.
peer_id: PeerId,
}

/// A substream for incoming notification messages.
Expand Down Expand Up @@ -114,12 +119,15 @@ pub struct NotificationsOutSubstream<TSubstream> {
/// Substream where to send messages.
#[pin]
socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>,

/// The remote peer.
peer_id: PeerId,
}

#[cfg(test)]
impl<TSubstream> NotificationsOutSubstream<TSubstream> {
pub fn new(socket: Framed<TSubstream, UviBytes<io::Cursor<Vec<u8>>>>) -> Self {
Self { socket }
Self { socket, peer_id: PeerId::random() }
}
}

Expand Down Expand Up @@ -349,6 +357,7 @@ impl NotificationsOut {
fallback_names: Vec<ProtocolName>,
initial_message: impl Into<Vec<u8>>,
max_notification_size: u64,
peer_id: PeerId,
) -> Self {
let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE {
Expand All @@ -358,7 +367,7 @@ impl NotificationsOut {
let mut protocol_names = fallback_names;
protocol_names.insert(0, main_protocol_name.into());

Self { protocol_names, initial_message, max_notification_size }
Self { protocol_names, initial_message, max_notification_size, peer_id }
}
}

Expand Down Expand Up @@ -414,7 +423,10 @@ where
} else {
Some(negotiated_name)
},
substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) },
substream: NotificationsOutSubstream {
socket: Framed::new(socket, codec),
peer_id: self.peer_id,
},
})
})
}
Expand Down Expand Up @@ -465,11 +477,25 @@ where
// even if we don't write anything into it.
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Pending => {},
Poll::Ready(Some(_)) => {
error!(
target: LOG_TARGET,
"Unexpected incoming data in `NotificationsOutSubstream`",
);
Poll::Ready(Some(result)) => match result {
Ok(_) => {
error!(
target: "sub-libp2p",
"Unexpected incoming data in `NotificationsOutSubstream` peer={:?}",
this.peer_id
);
},
Err(error) => {
debug!(
target: "sub-libp2p",
"Error while reading from `NotificationsOutSubstream` peer={:?} error={error:?}",
this.peer_id
);

// The expectation is that the remote has closed the substream.
// This is similar to the `Poll::Ready(None)` branch below.
return Poll::Ready(Err(NotificationsOutError::Terminated));
},
},
Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)),
}
Expand Down Expand Up @@ -537,7 +563,10 @@ mod tests {
NotificationsOutSubstream,
};
use futures::{channel::oneshot, future, prelude::*, SinkExt, StreamExt};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p::{
core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo},
PeerId,
};
use std::{pin::Pin, task::Poll};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::TokioAsyncReadCompatExt;
Expand All @@ -557,7 +586,13 @@ mod tests {
NotificationsHandshakeError,
> {
let socket = TcpStream::connect(addr).await.unwrap();
let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024);
let notifs_out = NotificationsOut::new(
"/test/proto/1",
Vec::new(),
handshake,
1024 * 1024,
PeerId::random(),
);
let (_, substream) = multistream_select::dialer_select_proto(
socket.compat(),
notifs_out.protocol_info(),
Expand Down Expand Up @@ -721,7 +756,13 @@ mod tests {
let client = tokio::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let NotificationsOutOpen { handshake, .. } = OutboundUpgrade::upgrade_outbound(
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
NotificationsOut::new(
PROTO_NAME,
Vec::new(),
&b"initial message"[..],
1024 * 1024,
PeerId::random(),
),
socket.compat(),
ProtocolName::Static(PROTO_NAME),
)
Expand Down Expand Up @@ -766,6 +807,7 @@ mod tests {
Vec::new(),
&b"initial message"[..],
1024 * 1024,
PeerId::random(),
),
socket.compat(),
ProtocolName::Static(PROTO_NAME),
Expand Down

0 comments on commit 1bc6ca6

Please sign in to comment.