diff --git a/prdoc/pr_7724.prdoc b/prdoc/pr_7724.prdoc new file mode 100644 index 0000000000000..30c1da3fc1553 --- /dev/null +++ b/prdoc/pr_7724.prdoc @@ -0,0 +1,13 @@ +title: Terminate libp2p the outbound notification substream on io errors + +doc: + - audience: [Node Dev, Node Operator] + description: | + This PR handles a case where we called the poll_next on an outbound substream notification to check if the stream is closed. + It is entirely possible that the poll_next would return an io::error, for example end of file. + This PR ensures that we make the distinction between unexpected incoming data, and error originated from poll_next. + While at it, the bulk of the PR change propagates the PeerID from the network behavior, through the notification handler, to the notification outbound stream for logging purposes. + +crates: + - name: sc-network + bump: patch diff --git a/substrate/client/network/src/protocol/notifications/behaviour.rs b/substrate/client/network/src/protocol/notifications/behaviour.rs index 217ef304bd0fc..f524d3f0d3d1d 100644 --- a/substrate/client/network/src/protocol/notifications/behaviour.rs +++ b/substrate/client/network/src/protocol/notifications/behaviour.rs @@ -703,7 +703,7 @@ impl Notifications { self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(*connec_id), - event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, + event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id }, }); *connec_state = ConnectionState::Opening; *occ_entry.into_mut() = PeerState::Enabled { connections }; @@ -1062,7 +1062,10 @@ impl Notifications { self.events.push_back(ToSwarm::NotifyHandler { peer_id: incoming.peer_id, handler: NotifyHandler::One(*connec_id), - event: NotifsHandlerIn::Open { protocol_index: incoming.set_id.into() }, + event: NotifsHandlerIn::Open { + protocol_index: incoming.set_id.into(), + peer_id: incoming.peer_id, + }, }); *connec_state = ConnectionState::Opening; } @@ -1260,7 +1263,10 @@ impl NetworkBehaviour for Notifications { self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), - event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, + event: NotifsHandlerIn::Open { + protocol_index: set_id.into(), + peer_id, + }, }); let mut connections = SmallVec::new(); @@ -1762,7 +1768,10 @@ impl NetworkBehaviour for Notifications { self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), - event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, + event: NotifsHandlerIn::Open { + protocol_index: set_id.into(), + peer_id, + }, }); *connec_state = ConnectionState::Opening; } else { @@ -1849,7 +1858,10 @@ impl NetworkBehaviour for Notifications { self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), - event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, + event: NotifsHandlerIn::Open { + protocol_index: set_id.into(), + peer_id, + }, }); *connec_state = ConnectionState::Opening; @@ -2336,7 +2348,7 @@ impl NetworkBehaviour for Notifications { self.events.push_back(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(*connec_id), - event: NotifsHandlerIn::Open { protocol_index: set_id.into() }, + event: NotifsHandlerIn::Open { protocol_index: set_id.into(), peer_id }, }); *connec_state = ConnectionState::Opening; *peer_state = PeerState::Enabled { connections: mem::take(connections) }; diff --git a/substrate/client/network/src/protocol/notifications/handler.rs b/substrate/client/network/src/protocol/notifications/handler.rs index 416a35ad88c9a..c5f0de9991c71 100644 --- a/substrate/client/network/src/protocol/notifications/handler.rs +++ b/substrate/client/network/src/protocol/notifications/handler.rs @@ -264,6 +264,9 @@ pub enum NotifsHandlerIn { Open { /// Index of the protocol in the list of protocols passed at initialization. protocol_index: usize, + + /// The peer id of the remote. + peer_id: PeerId, }, /// Instruct the handler to close the notification substreams, or reject any pending incoming @@ -632,7 +635,7 @@ impl ConnectionHandler for NotifsHandler { fn on_behaviour_event(&mut self, message: NotifsHandlerIn) { match message { - NotifsHandlerIn::Open { protocol_index } => { + NotifsHandlerIn::Open { protocol_index, peer_id } => { let protocol_info = &mut self.protocols[protocol_index]; match &mut protocol_info.state { State::Closed { pending_opening } => { @@ -642,6 +645,7 @@ impl ConnectionHandler for NotifsHandler { protocol_info.config.fallback_names.clone(), protocol_info.config.handshake.read().clone(), protocol_info.config.max_notification_size, + peer_id, ); self.events_queue.push_back( @@ -663,6 +667,7 @@ impl ConnectionHandler for NotifsHandler { protocol_info.config.fallback_names.clone(), handshake_message.clone(), protocol_info.config.max_notification_size, + peer_id, ); self.events_queue.push_back( @@ -1202,7 +1207,10 @@ pub mod tests { .await; // move the handler state to 'Opening' - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + handler.on_behaviour_event(NotifsHandlerIn::Open { + protocol_index: 0, + peer_id: PeerId::random(), + }); assert!(std::matches!( handler.protocols[0].state, State::Opening { in_substream: Some(_), .. } @@ -1273,7 +1281,10 @@ pub mod tests { .await; // move the handler state to 'Opening' - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + handler.on_behaviour_event(NotifsHandlerIn::Open { + protocol_index: 0, + peer_id: PeerId::random(), + }); assert!(std::matches!( handler.protocols[0].state, State::Opening { in_substream: Some(_), .. } @@ -1362,7 +1373,10 @@ pub mod tests { // first instruct the handler to open a connection and then close it right after // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + handler.on_behaviour_event(NotifsHandlerIn::Open { + protocol_index: 0, + peer_id: PeerId::random(), + }); assert!(std::matches!( handler.protocols[0].state, State::Opening { in_substream: Some(_), .. } @@ -1421,7 +1435,10 @@ pub mod tests { // first instruct the handler to open a connection and then close it right after // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + handler.on_behaviour_event(NotifsHandlerIn::Open { + protocol_index: 0, + peer_id: PeerId::random(), + }); assert!(std::matches!( handler.protocols[0].state, State::Opening { in_substream: Some(_), .. } @@ -1502,7 +1519,10 @@ pub mod tests { // first instruct the handler to open a connection and then close it right after // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + handler.on_behaviour_event(NotifsHandlerIn::Open { + protocol_index: 0, + peer_id: PeerId::random(), + }); assert!(std::matches!( handler.protocols[0].state, State::Opening { in_substream: Some(_), .. } @@ -1550,7 +1570,10 @@ pub mod tests { // first instruct the handler to open a connection and then close it right after // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + handler.on_behaviour_event(NotifsHandlerIn::Open { + protocol_index: 0, + peer_id: PeerId::random(), + }); assert!(std::matches!( handler.protocols[0].state, State::Opening { in_substream: Some(_), .. } diff --git a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs index b4d0de171a183..dba6a7b598acf 100644 --- a/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -39,8 +39,11 @@ use crate::types::ProtocolName; use asynchronous_codec::Framed; use bytes::BytesMut; use futures::prelude::*; -use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use log::{error, warn}; +use libp2p::{ + core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, + PeerId, +}; +use log::{debug, error, warn}; use unsigned_varint::codec::UviBytes; use std::{ @@ -78,6 +81,8 @@ pub struct NotificationsOut { initial_message: Vec, /// Maximum allowed size for a single notification. max_notification_size: u64, + /// The peerID of the remote. + peer_id: PeerId, } /// A substream for incoming notification messages. @@ -114,12 +119,15 @@ pub struct NotificationsOutSubstream { /// Substream where to send messages. #[pin] socket: Framed>>>, + + /// The remote peer. + peer_id: PeerId, } #[cfg(test)] impl NotificationsOutSubstream { pub fn new(socket: Framed>>>) -> Self { - Self { socket } + Self { socket, peer_id: PeerId::random() } } } @@ -349,6 +357,7 @@ impl NotificationsOut { fallback_names: Vec, initial_message: impl Into>, max_notification_size: u64, + peer_id: PeerId, ) -> Self { let initial_message = initial_message.into(); if initial_message.len() > MAX_HANDSHAKE_SIZE { @@ -358,7 +367,7 @@ impl NotificationsOut { let mut protocol_names = fallback_names; protocol_names.insert(0, main_protocol_name.into()); - Self { protocol_names, initial_message, max_notification_size } + Self { protocol_names, initial_message, max_notification_size, peer_id } } } @@ -414,7 +423,10 @@ where } else { Some(negotiated_name) }, - substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) }, + substream: NotificationsOutSubstream { + socket: Framed::new(socket, codec), + peer_id: self.peer_id, + }, }) }) } @@ -465,11 +477,25 @@ where // even if we don't write anything into it. match Stream::poll_next(this.socket.as_mut(), cx) { Poll::Pending => {}, - Poll::Ready(Some(_)) => { - error!( - target: LOG_TARGET, - "Unexpected incoming data in `NotificationsOutSubstream`", - ); + Poll::Ready(Some(result)) => match result { + Ok(_) => { + error!( + target: "sub-libp2p", + "Unexpected incoming data in `NotificationsOutSubstream` peer={:?}", + this.peer_id + ); + }, + Err(error) => { + debug!( + target: "sub-libp2p", + "Error while reading from `NotificationsOutSubstream` peer={:?} error={error:?}", + this.peer_id + ); + + // The expectation is that the remote has closed the substream. + // This is similar to the `Poll::Ready(None)` branch below. + return Poll::Ready(Err(NotificationsOutError::Terminated)); + }, }, Poll::Ready(None) => return Poll::Ready(Err(NotificationsOutError::Terminated)), } @@ -537,7 +563,10 @@ mod tests { NotificationsOutSubstream, }; use futures::{channel::oneshot, future, prelude::*, SinkExt, StreamExt}; - use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; + use libp2p::{ + core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}, + PeerId, + }; use std::{pin::Pin, task::Poll}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -557,7 +586,13 @@ mod tests { NotificationsHandshakeError, > { let socket = TcpStream::connect(addr).await.unwrap(); - let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024); + let notifs_out = NotificationsOut::new( + "/test/proto/1", + Vec::new(), + handshake, + 1024 * 1024, + PeerId::random(), + ); let (_, substream) = multistream_select::dialer_select_proto( socket.compat(), notifs_out.protocol_info(), @@ -721,7 +756,13 @@ mod tests { let client = tokio::spawn(async move { let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); let NotificationsOutOpen { handshake, .. } = OutboundUpgrade::upgrade_outbound( - NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), + NotificationsOut::new( + PROTO_NAME, + Vec::new(), + &b"initial message"[..], + 1024 * 1024, + PeerId::random(), + ), socket.compat(), ProtocolName::Static(PROTO_NAME), ) @@ -766,6 +807,7 @@ mod tests { Vec::new(), &b"initial message"[..], 1024 * 1024, + PeerId::random(), ), socket.compat(), ProtocolName::Static(PROTO_NAME),