diff --git a/Cargo.lock b/Cargo.lock index 848c22927bc6..f23539212dc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6288,8 +6288,11 @@ dependencies = [ "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", + "rand 0.8.5", + "rand_chacha 0.3.1", "rand_core 0.5.1", "schnorrkel", + "sp-authority-discovery", "sp-core", "tracing-gum", ] @@ -6542,6 +6545,7 @@ dependencies = [ "rand_chacha 0.3.1", "sc-network", "sp-application-crypto", + "sp-authority-discovery", "sp-consensus-babe", "sp-core", "sp-keyring", @@ -7569,6 +7573,7 @@ dependencies = [ "sc-keystore", "sc-network", "sp-application-crypto", + "sp-authority-discovery", "sp-core", "sp-keyring", "sp-keystore", diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 7dac12026254..6999eb2bf1e9 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -574,6 +574,7 @@ pub(crate) async fn handle_new_head( parent_hash: block_header.parent_hash, candidates: included_candidates.iter().map(|(hash, _, _, _)| *hash).collect(), slot, + session: session_index, }); imported_candidates.push(BlockImportedCandidates { diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 6037abd2a66a..a4fd49636ed6 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -1006,6 +1006,7 @@ fn distribution_messages_for_activation( parent_hash: block_entry.parent_hash(), candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(), slot: block_entry.slot(), + session: block_entry.session(), }); for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() { diff --git a/node/network/approval-distribution/Cargo.toml b/node/network/approval-distribution/Cargo.toml index 7b1556cf890c..638e63a3eb2a 100644 --- a/node/network/approval-distribution/Cargo.toml +++ b/node/network/approval-distribution/Cargo.toml @@ -10,11 +10,13 @@ polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-primitives = { path = "../../../primitives" } +rand = "0.8" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../gum" } [dev-dependencies] +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } polkadot-node-subsystem-util = { path = "../../subsystem-util" } @@ -23,5 +25,6 @@ polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } assert_matches = "1.4.0" schnorrkel = { version = "0.9.1", default-features = false } rand_core = "0.5.1" # should match schnorrkel +rand_chacha = "0.3.1" env_logger = "0.9.0" log = "0.4.16" diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index a231fe0b3472..1b54ce21bd56 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -29,16 +29,16 @@ use polkadot_node_primitives::approval::{ }; use polkadot_node_subsystem::{ messages::{ - ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage, - AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeMessage, + network_bridge_event, ApprovalCheckResult, ApprovalDistributionMessage, + ApprovalVotingMessage, AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeMessage, }, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError, }; -use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS}; use polkadot_primitives::v2::{ - BlockNumber, CandidateIndex, Hash, ValidatorIndex, ValidatorSignature, + BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature, }; +use rand::{CryptoRng, Rng, SeedableRng}; use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque}; use self::metrics::Metrics; @@ -61,6 +61,14 @@ const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message" const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::BenefitMinorFirst("Valid message with new information"); +/// The number of peers to randomly propagate messages to. +const RANDOM_CIRCULATION: usize = 4; +/// The sample rate for randomly propagating messages. This +/// reduces the left tail of the binomial distribution but also +/// introduces a bias towards peers who we sample before others +/// (i.e. those who get a block before others). +const RANDOM_SAMPLE_RATE: usize = polkadot_node_subsystem_util::MIN_GOSSIP_PEERS; + /// The Approval Distribution subsystem. pub struct ApprovalDistribution { metrics: Metrics, @@ -89,6 +97,138 @@ impl RecentlyOutdated { } } +struct SessionTopology { + peers_x: HashSet, + validator_indices_x: HashSet, + peers_y: HashSet, + validator_indices_y: HashSet, +} + +impl SessionTopology { + // Given the originator of a message, indicates the part of the topology + // we're meant to send the message to. + fn required_routing_for(&self, originator: ValidatorIndex, local: bool) -> RequiredRouting { + if local { + return RequiredRouting::GridXY + } + + let grid_x = self.validator_indices_x.contains(&originator); + let grid_y = self.validator_indices_y.contains(&originator); + + match (grid_x, grid_y) { + (false, false) => RequiredRouting::None, + (true, false) => RequiredRouting::GridY, // messages from X go to Y + (false, true) => RequiredRouting::GridX, // messages from Y go to X + (true, true) => RequiredRouting::GridXY, // if the grid works as expected, this shouldn't happen. + } + } + + // Get a filter function based on this topology and the required routing + // which returns `true` for peers that are within the required routing set + // and false otherwise. + fn route_to_peer(&self, required_routing: RequiredRouting, peer: &PeerId) -> bool { + match required_routing { + RequiredRouting::All => true, + RequiredRouting::GridX => self.peers_x.contains(peer), + RequiredRouting::GridY => self.peers_y.contains(peer), + RequiredRouting::GridXY => self.peers_x.contains(peer) || self.peers_y.contains(peer), + RequiredRouting::None | RequiredRouting::PendingTopology => false, + } + } +} + +impl From for SessionTopology { + fn from(topology: network_bridge_event::NewGossipTopology) -> Self { + let peers_x = + topology.our_neighbors_x.values().flat_map(|p| &p.peer_ids).cloned().collect(); + let peers_y = + topology.our_neighbors_y.values().flat_map(|p| &p.peer_ids).cloned().collect(); + + let validator_indices_x = + topology.our_neighbors_x.values().map(|p| p.validator_index.clone()).collect(); + let validator_indices_y = + topology.our_neighbors_y.values().map(|p| p.validator_index.clone()).collect(); + + SessionTopology { peers_x, peers_y, validator_indices_x, validator_indices_y } + } +} + +#[derive(Default)] +struct SessionTopologies { + inner: HashMap, usize)>, +} + +impl SessionTopologies { + fn get_topology(&self, session: SessionIndex) -> Option<&SessionTopology> { + self.inner.get(&session).and_then(|val| val.0.as_ref()) + } + + fn inc_session_refs(&mut self, session: SessionIndex) { + self.inner.entry(session).or_insert((None, 0)).1 += 1; + } + + fn dec_session_refs(&mut self, session: SessionIndex) { + if let hash_map::Entry::Occupied(mut occupied) = self.inner.entry(session) { + occupied.get_mut().1 = occupied.get().1.saturating_sub(1); + if occupied.get().1 == 0 { + let _ = occupied.remove(); + } + } + } + + // No-op if already present. + fn insert_topology(&mut self, session: SessionIndex, topology: SessionTopology) { + let entry = self.inner.entry(session).or_insert((None, 0)); + if entry.0.is_none() { + entry.0 = Some(topology); + } + } +} + +// A note on aggression thresholds: changes in propagation apply only to blocks which are the +// _direct descendants_ of the finalized block which are older than the given threshold, +// not to all blocks older than the threshold. Most likely, a few assignments struggle to +// be propagated in a single block and this holds up all of its descendants blocks. +// Accordingly, we only step on the gas for the block which is most obviously holding up finality. +#[derive(Clone)] +struct AggressionConfig { + /// Aggression level 1: all validators send all their own messages to all peers. + l1_threshold: Option, + /// Aggression level 2: level 1 + all validators send all messages to all peers in the X and Y dimensions. + l2_threshold: Option, + /// How often to re-send messages to all targeted recipients. + /// This applies to all unfinalized blocks. + resend_unfinalized_period: Option, +} + +impl AggressionConfig { + fn is_age_relevant(&self, block_age: BlockNumber) -> bool { + if let Some(t) = self.l1_threshold { + block_age >= t + } else if let Some(t) = self.resend_unfinalized_period { + block_age > 0 && block_age % t == 0 + } else { + false + } + } +} + +impl Default for AggressionConfig { + fn default() -> Self { + AggressionConfig { + l1_threshold: Some(10), + l2_threshold: Some(25), + resend_unfinalized_period: Some(5), + } + } +} + +#[derive(PartialEq)] +enum Resend { + Yes, + No, +} + /// The [`State`] struct is responsible for tracking the overall state of the subsystem. /// /// It tracks metadata about our view of the unfinalized chain, @@ -107,43 +247,65 @@ struct State { /// also a race that occurs typically on local networks. pending_known: HashMap>, - /// Peer view data is partially stored here, and partially inline within the [`BlockEntry`]s + /// Peer data is partially stored here, and partially inline within the [`BlockEntry`]s peer_views: HashMap, - /// Track all our neighbors in the current gossip topology. - /// We're not necessarily connected to all of them. - gossip_peers: HashSet, + /// Keeps a topology for various different sessions. + topologies: SessionTopologies, /// Tracks recently finalized blocks. recent_outdated_blocks: RecentlyOutdated, + + /// Config for aggression. + aggression_config: AggressionConfig, } -/// A short description of a validator's assignment or approval. -#[derive(Debug, Clone, Hash, PartialEq, Eq)] -enum MessageFingerprint { - Assignment(Hash, CandidateIndex, ValidatorIndex), - Approval(Hash, CandidateIndex, ValidatorIndex), +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum MessageKind { + Assignment, + Approval, } +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +struct MessageSubject(Hash, CandidateIndex, ValidatorIndex); + #[derive(Debug, Clone, Default)] struct Knowledge { - known_messages: HashSet, + // When there is no entry, this means the message is unknown + // When there is an entry with `MessageKind::Assignment`, the assignment is known. + // When there is an entry with `MessageKind::Approval`, the assignment and approval are known. + known_messages: HashMap, } impl Knowledge { - fn contains(&self, fingerprint: &MessageFingerprint) -> bool { - self.known_messages.contains(fingerprint) + fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool { + match (kind, self.known_messages.get(message)) { + (_, None) => false, + (MessageKind::Assignment, Some(_)) => true, + (MessageKind::Approval, Some(MessageKind::Assignment)) => false, + (MessageKind::Approval, Some(MessageKind::Approval)) => true, + } } - fn insert(&mut self, fingerprint: MessageFingerprint) -> bool { - self.known_messages.insert(fingerprint) + fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool { + match self.known_messages.entry(message) { + hash_map::Entry::Vacant(vacant) => { + vacant.insert(kind); + true + }, + hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) { + (MessageKind::Assignment, MessageKind::Assignment) => false, + (MessageKind::Approval, MessageKind::Approval) => false, + (MessageKind::Approval, MessageKind::Assignment) => false, + (MessageKind::Assignment, MessageKind::Approval) => { + *occupied.get_mut() = MessageKind::Approval; + true + }, + }, + } } } -/// The difference of our knowledge and peer's knowledge -/// that is used to send the missing information. -type MissingKnowledge = HashSet; - /// Information that has been circulated to and from a peer. #[derive(Debug, Clone, Default)] struct PeerKnowledge { @@ -154,8 +316,8 @@ struct PeerKnowledge { } impl PeerKnowledge { - fn contains(&self, fingerprint: &MessageFingerprint) -> bool { - self.sent.contains(fingerprint) || self.received.contains(fingerprint) + fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool { + self.sent.contains(message, kind) || self.received.contains(message, kind) } } @@ -172,6 +334,8 @@ struct BlockEntry { knowledge: Knowledge, /// A votes entry for each candidate indexed by [`CandidateIndex`]. candidates: Vec, + /// The session index of this block. + session: SessionIndex, } #[derive(Debug)] @@ -180,10 +344,83 @@ enum ApprovalState { Approved(AssignmentCert, ValidatorSignature), } -#[derive(Debug, Clone, Copy)] -enum LocalSource { - Yes, - No, +impl ApprovalState { + fn assignment_cert(&self) -> &AssignmentCert { + match *self { + ApprovalState::Assigned(ref cert) => cert, + ApprovalState::Approved(ref cert, _) => cert, + } + } + + fn approval_signature(&self) -> Option { + match *self { + ApprovalState::Assigned(_) => None, + ApprovalState::Approved(_, ref sig) => Some(sig.clone()), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum RequiredRouting { + /// We don't know yet, because we're waiting for topology info + /// (race condition between learning about the first blocks in a new session + /// and getting the topology for that session) + PendingTopology, + /// Propagate to all peers of any kind. + All, + /// Propagate to all peers sharing either the X or Y dimension of the grid. + GridXY, + /// Propagate to all peers sharing the X dimension of the grid. + GridX, + /// Propagate to all peers sharing the Y dimension of the grid. + GridY, + /// No required propagation. + None, +} + +impl RequiredRouting { + // Whether the required routing set is definitely empty. + fn is_empty(self) -> bool { + match self { + RequiredRouting::PendingTopology | RequiredRouting::None => true, + _ => false, + } + } +} + +#[derive(Debug, Default, Clone, Copy)] +struct RandomRouting { + // The number of peers to target. + target: usize, + // The number of peers this has been sent to. + sent: usize, +} + +impl RandomRouting { + fn sample(&self, n_peers_total: usize, rng: &mut (impl CryptoRng + Rng)) -> bool { + if n_peers_total == 0 || self.sent >= self.target { + false + } else if RANDOM_SAMPLE_RATE > n_peers_total { + true + } else { + rng.gen_ratio(RANDOM_SAMPLE_RATE as _, n_peers_total as _) + } + } + + fn inc_sent(&mut self) { + self.sent += 1 + } +} + +// routing state bundled with messages for the candidate. Corresponding assignments +// and approvals are stored together and should be routed in the same way, with +// assignments preceding approvals in all cases. +#[derive(Debug)] +struct MessageState { + required_routing: RequiredRouting, + local: bool, + random_routing: RandomRouting, + approval_state: ApprovalState, } /// Information about candidates in the context of a particular block they are included in. @@ -191,10 +428,10 @@ enum LocalSource { /// if it is included by multiple blocks - this is likely the case when there are forks. #[derive(Debug, Default)] struct CandidateEntry { - approvals: HashMap, + messages: HashMap, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] enum MessageSource { Peer(PeerId), Local, @@ -207,13 +444,6 @@ impl MessageSource { Self::Local => None, } } - - fn as_local_source(&self) -> LocalSource { - match self { - Self::Local => LocalSource::Yes, - _ => LocalSource::No, - } - } } enum PendingMessage { @@ -228,6 +458,7 @@ impl State { + overseer::SubsystemContext), metrics: &Metrics, event: NetworkBridgeEvent, + rng: &mut (impl CryptoRng + Rng), ) { match event { NetworkBridgeEvent::PeerConnected(peer_id, role, _) => { @@ -242,18 +473,13 @@ impl State { entry.known_by.remove(&peer_id); }) }, - NetworkBridgeEvent::NewGossipTopology(peers) => { - let newly_added: Vec = - peers.difference(&self.gossip_peers).cloned().collect(); - self.gossip_peers = peers; - for peer_id in newly_added { - if let Some(view) = self.peer_views.remove(&peer_id) { - self.handle_peer_view_change(ctx, metrics, peer_id, view).await; - } - } + NetworkBridgeEvent::NewGossipTopology(topology) => { + let session = topology.session; + self.handle_new_session_topology(ctx, session, SessionTopology::from(topology)) + .await; }, NetworkBridgeEvent::PeerViewChange(peer_id, view) => { - self.handle_peer_view_change(ctx, metrics, peer_id, view).await; + self.handle_peer_view_change(ctx, metrics, peer_id, view, rng).await; }, NetworkBridgeEvent::OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); @@ -276,7 +502,7 @@ impl State { }); }, NetworkBridgeEvent::PeerMessage(peer_id, msg) => { - self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await; + self.process_incoming_peer_message(ctx, metrics, peer_id, msg, rng).await; }, } } @@ -287,6 +513,7 @@ impl State { + overseer::SubsystemContext), metrics: &Metrics, metas: Vec, + rng: &mut (impl CryptoRng + Rng), ) { let mut new_hashes = HashSet::new(); for meta in &metas { @@ -302,7 +529,11 @@ impl State { parent_hash: meta.parent_hash.clone(), knowledge: Knowledge::default(), candidates, + session: meta.session, }); + + self.topologies.inc_session_refs(meta.session); + new_hashes.insert(meta.hash.clone()); // In case there are duplicates, we should only set this if the entry @@ -325,11 +556,13 @@ impl State { let view_intersection = View::new(intersection.cloned(), view.finalized_number); Self::unify_with_peer( ctx, - &self.gossip_peers, metrics, &mut self.blocks, + &self.topologies, + self.peer_views.len(), peer_id.clone(), view_intersection, + rng, ) .await; } @@ -372,6 +605,7 @@ impl State { MessageSource::Peer(peer_id), assignment, claimed_index, + rng, ) .await; }, @@ -388,6 +622,32 @@ impl State { } } } + + self.enable_aggression(ctx, Resend::Yes, metrics).await; + } + + async fn handle_new_session_topology( + &mut self, + ctx: &mut (impl SubsystemContext + + overseer::SubsystemContext), + session: SessionIndex, + topology: SessionTopology, + ) { + self.topologies.insert_topology(session, topology); + let topology = self.topologies.get_topology(session).expect("just inserted above; qed"); + + adjust_required_routing_and_propagate( + ctx, + &mut self.blocks, + &self.topologies, + |block_entry| block_entry.session == session, + |required_routing, local, validator_index| { + if *required_routing == RequiredRouting::PendingTopology { + *required_routing = topology.required_routing_for(*validator_index, local); + } + }, + ) + .await; } async fn process_incoming_peer_message( @@ -397,6 +657,7 @@ impl State { metrics: &Metrics, peer_id: PeerId, msg: protocol_v1::ApprovalDistributionMessage, + rng: &mut (impl CryptoRng + Rng), ) { match msg { protocol_v1::ApprovalDistributionMessage::Assignments(assignments) => { @@ -408,7 +669,7 @@ impl State { ); for (assignment, claimed_index) in assignments.into_iter() { if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) { - let fingerprint = MessageFingerprint::Assignment( + let message_subject = MessageSubject( assignment.block_hash, claimed_index, assignment.validator, @@ -417,7 +678,7 @@ impl State { gum::trace!( target: LOG_TARGET, %peer_id, - ?fingerprint, + ?message_subject, "Pending assignment", ); @@ -435,6 +696,7 @@ impl State { MessageSource::Peer(peer_id.clone()), assignment, claimed_index, + rng, ) .await; } @@ -448,7 +710,7 @@ impl State { ); for approval_vote in approvals.into_iter() { if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) { - let fingerprint = MessageFingerprint::Approval( + let message_subject = MessageSubject( approval_vote.block_hash, approval_vote.candidate_index, approval_vote.validator, @@ -457,7 +719,7 @@ impl State { gum::trace!( target: LOG_TARGET, %peer_id, - ?fingerprint, + ?message_subject, "Pending approval", ); @@ -478,6 +740,8 @@ impl State { } } + // handle a peer view change: requires that the peer is already connected + // and has an entry in the `PeerData` struct. async fn handle_peer_view_change( &mut self, ctx: &mut (impl SubsystemContext @@ -485,10 +749,12 @@ impl State { metrics: &Metrics, peer_id: PeerId, view: View, + rng: &mut (impl CryptoRng + Rng), ) { gum::trace!(target: LOG_TARGET, ?view, "Peer view change"); let finalized_number = view.finalized_number; - let old_view = self.peer_views.insert(peer_id.clone(), view.clone()); + let old_view = + self.peer_views.get_mut(&peer_id).map(|d| std::mem::replace(d, view.clone())); let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0); // we want to prune every block known_by peer up to (including) view.finalized_number @@ -511,16 +777,24 @@ impl State { Self::unify_with_peer( ctx, - &self.gossip_peers, metrics, &mut self.blocks, + &self.topologies, + self.peer_views.len(), peer_id.clone(), view, + rng, ) .await; } - fn handle_block_finalized(&mut self, finalized_number: BlockNumber) { + async fn handle_block_finalized( + &mut self, + ctx: &mut (impl SubsystemContext + + overseer::SubsystemContext), + metrics: &Metrics, + finalized_number: BlockNumber, + ) { // we want to prune every block up to (including) finalized_number // why +1 here? // split_off returns everything after the given key, including the key @@ -533,8 +807,14 @@ impl State { // now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too old_blocks.values().flatten().for_each(|relay_block| { self.recent_outdated_blocks.note_outdated(*relay_block); - self.blocks.remove(relay_block); + if let Some(block_entry) = self.blocks.remove(relay_block) { + self.topologies.dec_session_refs(block_entry.session); + } }); + + // If a block was finalized, this means we may need to move our aggression + // forward to the now oldest block(s). + self.enable_aggression(ctx, Resend::No, metrics).await; } async fn import_and_circulate_assignment( @@ -545,6 +825,7 @@ impl State { source: MessageSource, assignment: IndirectAssignmentCert, claimed_candidate_index: CandidateIndex, + rng: &mut (impl CryptoRng + Rng), ) { let block_hash = assignment.block_hash.clone(); let validator_index = assignment.validator; @@ -568,23 +849,22 @@ impl State { }, }; - // compute a fingerprint of the assignment - let fingerprint = - MessageFingerprint::Assignment(block_hash, claimed_candidate_index, validator_index); + // compute metadata on the assignment. + let message_subject = MessageSubject(block_hash, claimed_candidate_index, validator_index); + let message_kind = MessageKind::Assignment; if let Some(peer_id) = source.peer_id() { // check if our knowledge of the peer already contains this assignment match entry.known_by.entry(peer_id.clone()) { hash_map::Entry::Occupied(mut peer_knowledge) => { let peer_knowledge = peer_knowledge.get_mut(); - if peer_knowledge.contains(&fingerprint) { + if peer_knowledge.contains(&message_subject, message_kind) { // wasn't included before - if !peer_knowledge.received.insert(fingerprint.clone()) { + if !peer_knowledge.received.insert(message_subject.clone(), message_kind) { gum::debug!( target: LOG_TARGET, ?peer_id, - hash = ?block_hash, - ?fingerprint, + ?message_subject, "Duplicate assignment", ); modify_reputation(ctx, peer_id, COST_DUPLICATE_MESSAGE).await; @@ -596,8 +876,7 @@ impl State { gum::debug!( target: LOG_TARGET, ?peer_id, - hash = ?block_hash, - ?fingerprint, + ?message_subject, "Assignment from a peer is out of view", ); modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await; @@ -605,11 +884,11 @@ impl State { } // if the assignment is known to be valid, reward the peer - if entry.knowledge.known_messages.contains(&fingerprint) { + if entry.knowledge.contains(&message_subject, message_kind) { modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await; if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - gum::trace!(target: LOG_TARGET, ?peer_id, ?fingerprint, "Known assignment"); - peer_knowledge.received.insert(fingerprint.clone()); + gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment"); + peer_knowledge.received.insert(message_subject, message_kind); } return } @@ -633,13 +912,19 @@ impl State { }; drop(timer); - gum::trace!(target: LOG_TARGET, hash = ?block_hash, ?source, ?fingerprint, ?result, "Checked assignment",); + gum::trace!( + target: LOG_TARGET, + ?source, + ?message_subject, + ?result, + "Checked assignment", + ); match result { AssignmentCheckResult::Accepted => { modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; - entry.knowledge.known_messages.insert(fingerprint.clone()); + entry.knowledge.known_messages.insert(message_subject.clone(), message_kind); if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(fingerprint.clone()); + peer_knowledge.received.insert(message_subject.clone(), message_kind); } }, AssignmentCheckResult::AcceptedDuplicate => { @@ -647,7 +932,7 @@ impl State { // There is more than one way each validator can be assigned to each core. // cf. https://github.com/paritytech/polkadot/pull/2160#discussion_r557628699 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(fingerprint); + peer_knowledge.received.insert(message_subject.clone(), message_kind); } gum::debug!( target: LOG_TARGET, @@ -680,31 +965,43 @@ impl State { }, } } else { - if !entry.knowledge.known_messages.insert(fingerprint.clone()) { + if !entry.knowledge.insert(message_subject.clone(), message_kind) { // if we already imported an assignment, there is no need to distribute it again gum::warn!( target: LOG_TARGET, - ?fingerprint, + ?message_subject, "Importing locally an already known assignment", ); return } else { - gum::debug!(target: LOG_TARGET, ?fingerprint, "Importing locally a new assignment",); + gum::debug!( + target: LOG_TARGET, + ?message_subject, + "Importing locally a new assignment", + ); } } - let local_source = source.as_local_source(); - - // Invariant: none of the peers except for the `source` know about the assignment. + // Invariant: to our knowledge, none of the peers except for the `source` know about the assignment. metrics.on_assignment_imported(); - match entry.candidates.get_mut(claimed_candidate_index as usize) { + let topology = self.topologies.get_topology(entry.session); + let local = source == MessageSource::Local; + + let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| { + t.required_routing_for(validator_index, local) + }); + + let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) { Some(candidate_entry) => { // set the approval state for validator_index to Assigned // unless the approval state is set already - candidate_entry.approvals.entry(validator_index).or_insert_with(|| { - (ApprovalState::Assigned(assignment.cert.clone()), local_source) - }); + candidate_entry.messages.entry(validator_index).or_insert_with(|| MessageState { + required_routing, + local, + random_routing: RandomRouting { target: RANDOM_CIRCULATION, sent: 0 }, + approval_state: ApprovalState::Assigned(assignment.cert.clone()), + }) }, None => { gum::warn!( @@ -713,29 +1010,49 @@ impl State { ?claimed_candidate_index, "Expected a candidate entry on import_and_circulate_assignment", ); + + return }, - } + }; - // Dispatch a ApprovalDistributionV1Message::Assignment(assignment, candidate_index) - // to all peers in the BlockEntry's known_by set who know about the block, - // excluding the peer in the source, if source has kind MessageSource::Peer. - let maybe_peer_id = source.peer_id(); - let mut peers = entry - .known_by - .keys() - .cloned() - .filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key)) - .collect::>(); + // Dispatch the message to all peers in the routing set which + // know the block. + // + // If the topology isn't known yet (race with networking subsystems) + // then messages will be sent when we get it. let assignments = vec![(assignment, claimed_candidate_index)]; - let gossip_peers = &self.gossip_peers; - util::choose_random_subset(|e| gossip_peers.contains(e), &mut peers, MIN_GOSSIP_PEERS); + let n_peers_total = self.peer_views.len(); + let source_peer = source.peer_id(); + + let mut peer_filter = move |peer| { + if Some(peer) == source_peer.as_ref() { + return false + } + + if let Some(true) = topology.as_ref().map(|t| t.route_to_peer(required_routing, peer)) { + return true + } + + // Note: at this point, we haven't received the message from any peers + // other than the source peer, and we just got it, so we haven't sent it + // to any peers either. + let route_random = message_state.random_routing.sample(n_peers_total, rng); + + if route_random { + message_state.random_routing.inc_sent(); + } + + route_random + }; - // Add the fingerprint of the assignment to the knowledge of each peer. + let peers = entry.known_by.keys().filter(|p| peer_filter(p)).cloned().collect::>(); + + // Add the metadata of the assignment to the knowledge of each peer. for peer in peers.iter() { // we already filtered peers above, so this should always be Some if let Some(peer_knowledge) = entry.known_by.get_mut(peer) { - peer_knowledge.sent.insert(fingerprint.clone()); + peer_knowledge.sent.insert(message_subject.clone(), message_kind); } } @@ -744,7 +1061,7 @@ impl State { target: LOG_TARGET, ?block_hash, ?claimed_candidate_index, - ?local_source, + local = source.peer_id().is_none(), num_peers = peers.len(), "Sending an assignment to peers", ); @@ -783,22 +1100,16 @@ impl State { }, }; - // compute a fingerprint of the approval - let fingerprint = - MessageFingerprint::Approval(block_hash.clone(), candidate_index, validator_index); + // compute metadata on the assignment. + let message_subject = MessageSubject(block_hash, candidate_index, validator_index); + let message_kind = MessageKind::Approval; if let Some(peer_id) = source.peer_id() { - let assignment_fingerprint = MessageFingerprint::Assignment( - block_hash.clone(), - candidate_index, - validator_index, - ); - - if !entry.knowledge.known_messages.contains(&assignment_fingerprint) { + if !entry.knowledge.contains(&message_subject, MessageKind::Assignment) { gum::debug!( target: LOG_TARGET, ?peer_id, - ?fingerprint, + ?message_subject, "Unknown approval assignment", ); modify_reputation(ctx, peer_id, COST_UNEXPECTED_MESSAGE).await; @@ -809,12 +1120,12 @@ impl State { match entry.known_by.entry(peer_id.clone()) { hash_map::Entry::Occupied(mut knowledge) => { let peer_knowledge = knowledge.get_mut(); - if peer_knowledge.contains(&fingerprint) { - if !peer_knowledge.received.insert(fingerprint.clone()) { + if peer_knowledge.contains(&message_subject, message_kind) { + if !peer_knowledge.received.insert(message_subject.clone(), message_kind) { gum::debug!( target: LOG_TARGET, ?peer_id, - ?fingerprint, + ?message_subject, "Duplicate approval", ); @@ -827,7 +1138,7 @@ impl State { gum::debug!( target: LOG_TARGET, ?peer_id, - ?fingerprint, + ?message_subject, "Approval from a peer is out of view", ); modify_reputation(ctx, peer_id.clone(), COST_UNEXPECTED_MESSAGE).await; @@ -835,11 +1146,11 @@ impl State { } // if the approval is known to be valid, reward the peer - if entry.knowledge.contains(&fingerprint) { - gum::trace!(target: LOG_TARGET, ?peer_id, ?fingerprint, "Known approval"); + if entry.knowledge.contains(&message_subject, message_kind) { + gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known approval"); modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE).await; if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(fingerprint.clone()); + peer_knowledge.received.insert(message_subject.clone(), message_kind); } return } @@ -859,14 +1170,20 @@ impl State { }; drop(timer); - gum::trace!(target: LOG_TARGET, ?peer_id, ?fingerprint, ?result, "Checked approval",); + gum::trace!( + target: LOG_TARGET, + ?peer_id, + ?message_subject, + ?result, + "Checked approval", + ); match result { ApprovalCheckResult::Accepted => { modify_reputation(ctx, peer_id.clone(), BENEFIT_VALID_MESSAGE_FIRST).await; - entry.knowledge.insert(fingerprint.clone()); + entry.knowledge.insert(message_subject.clone(), message_kind); if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(fingerprint.clone()); + peer_knowledge.received.insert(message_subject.clone(), message_kind); } }, ApprovalCheckResult::Bad(error) => { @@ -881,38 +1198,55 @@ impl State { }, } } else { - if !entry.knowledge.insert(fingerprint.clone()) { + if !entry.knowledge.insert(message_subject.clone(), message_kind) { // if we already imported an approval, there is no need to distribute it again gum::warn!( target: LOG_TARGET, - ?fingerprint, + ?message_subject, "Importing locally an already known approval", ); return } else { - gum::debug!(target: LOG_TARGET, ?fingerprint, "Importing locally a new approval",); + gum::debug!( + target: LOG_TARGET, + ?message_subject, + "Importing locally a new approval", + ); } } - let local_source = source.as_local_source(); - - // Invariant: none of the peers except for the `source` know about the approval. + // Invariant: to our knowledge, none of the peers except for the `source` know about the approval. metrics.on_approval_imported(); - match entry.candidates.get_mut(candidate_index as usize) { + let required_routing = match entry.candidates.get_mut(candidate_index as usize) { Some(candidate_entry) => { // set the approval state for validator_index to Approved // it should be in assigned state already - match candidate_entry.approvals.remove(&validator_index) { - Some((ApprovalState::Assigned(cert), _local)) => { - candidate_entry.approvals.insert( + match candidate_entry.messages.remove(&validator_index) { + Some(MessageState { + approval_state: ApprovalState::Assigned(cert), + required_routing, + local, + random_routing, + }) => { + candidate_entry.messages.insert( validator_index, - (ApprovalState::Approved(cert, vote.signature.clone()), local_source), + MessageState { + approval_state: ApprovalState::Approved( + cert, + vote.signature.clone(), + ), + required_routing, + local, + random_routing, + }, ); + + required_routing }, - Some((ApprovalState::Approved(..), _)) => { + Some(_) => { unreachable!( - "we only insert it after the fingerprint, checked the fingerprint above; qed" + "we only insert it after the metadata, checked the metadata above; qed" ); }, None => { @@ -924,6 +1258,8 @@ impl State { ?validator_index, "Importing an approval we don't have an assignment for", ); + + return }, } }, @@ -935,38 +1271,59 @@ impl State { ?validator_index, "Expected a candidate entry on import_and_circulate_approval", ); + + return }, - } + }; // Dispatch a ApprovalDistributionV1Message::Approval(vote) - // to all peers in the BlockEntry's known_by set who know about the block, - // excluding the peer in the source, if source has kind MessageSource::Peer. - let maybe_peer_id = source.peer_id(); - let mut peers = entry + // to all peers required by the topology, with the exception of the source peer. + + let topology = self.topologies.get_topology(entry.session); + let source_peer = source.peer_id(); + + let message_subject = &message_subject; + let peer_filter = move |peer, knowledge: &PeerKnowledge| { + if Some(peer) == source_peer.as_ref() { + return false + } + + // Here we're leaning on a few behaviors of assignment propagation: + // 1. At this point, the only peer we're aware of which has the approval + // message is the source peer. + // 2. We have sent the assignment message to every peer in the required routing + // which is aware of this block _unless_ the peer we originally received the + // assignment from was part of the required routing. In that case, we've sent + // the assignment to all aware peers in the required routing _except_ the original + // source of the assignment. Hence the `in_topology_check`. + // 3. Any randomly selected peers have been sent the assignment already. + let in_topology = topology.map_or(false, |t| t.route_to_peer(required_routing, peer)); + in_topology || knowledge.sent.contains(message_subject, MessageKind::Assignment) + }; + + let peers = entry .known_by - .keys() + .iter() + .filter(|(p, k)| peer_filter(p, k)) + .map(|(p, _)| p) .cloned() - .filter(|key| maybe_peer_id.as_ref().map_or(true, |id| id != key)) .collect::>(); - let gossip_peers = &self.gossip_peers; - util::choose_random_subset(|e| gossip_peers.contains(e), &mut peers, MIN_GOSSIP_PEERS); - - // Add the fingerprint of the assignment to the knowledge of each peer. + // Add the metadata of the assignment to the knowledge of each peer. for peer in peers.iter() { // we already filtered peers above, so this should always be Some if let Some(entry) = entry.known_by.get_mut(peer) { - entry.sent.insert(fingerprint.clone()); + entry.sent.insert(message_subject.clone(), message_kind); } } - let approvals = vec![vote]; if !peers.is_empty() { + let approvals = vec![vote]; gum::trace!( target: LOG_TARGET, ?block_hash, ?candidate_index, - ?local_source, + local = source.peer_id().is_none(), num_peers = peers.len(), "Sending an approval to peers", ); @@ -984,228 +1341,357 @@ impl State { async fn unify_with_peer( ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), - gossip_peers: &HashSet, metrics: &Metrics, entries: &mut HashMap, + topologies: &SessionTopologies, + total_peers: usize, peer_id: PeerId, view: View, + rng: &mut (impl CryptoRng + Rng), ) { metrics.on_unify_with_peer(); let _timer = metrics.time_unify_with_peer(); - let mut to_send: Vec<(Hash, MissingKnowledge)> = Vec::new(); + + let mut assignments_to_send = Vec::new(); + let mut approvals_to_send = Vec::new(); let view_finalized_number = view.finalized_number; for head in view.into_iter() { let mut block = head; - let interesting_blocks = std::iter::from_fn(|| { - // step 2. + loop { let entry = match entries.get_mut(&block) { Some(entry) if entry.number > view_finalized_number => entry, - _ => return None, - }; - let missing_knowledge = match entry.known_by.entry(peer_id.clone()) { - hash_map::Entry::Occupied(e) => { - let missing: MissingKnowledge = entry - .knowledge - .known_messages - .iter() - .filter(|m| !e.get().contains(m)) - .cloned() - .collect(); - // step 3. - // We assume if peer's knowledge is complete for block N, - // this is also true for its ancestors. - // This safeguard is needed primarily in case of long finality stalls - // so we don't waste time in a loop for every peer. - if missing.is_empty() { - gum::trace!( - target: LOG_TARGET, - ?block, - ?peer_id, - "Stopping at this block, because peer knows all", - ); - return None - } - missing - }, - // step 4. - hash_map::Entry::Vacant(vacant) => { - let knowledge = PeerKnowledge::default(); - vacant.insert(knowledge); - entry.knowledge.known_messages.clone() - }, + _ => break, }; - // step 5. - let interesting_block = block; - block = entry.parent_hash.clone(); - Some((interesting_block, missing_knowledge)) - }); - to_send.extend(interesting_blocks); - } - let is_gossip_peer = gossip_peers.contains(&peer_id); - let lucky = is_gossip_peer || - util::gen_ratio( - util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()), - util::MIN_GOSSIP_PEERS, - ); - if !lucky { - gum::trace!(target: LOG_TARGET, ?peer_id, "Unlucky peer"); - return - } + // Any peer which is in the `known_by` set has already been + // sent all messages it's meant to get for that block and all + // in-scope prior blocks. + if entry.known_by.contains_key(&peer_id) { + break + } - // step 6. - // send all assignments and approvals for all candidates in those blocks to the peer - Self::send_gossip_messages_to_peer(entries, ctx, peer_id, to_send).await; - } + let peer_knowledge = entry.known_by.entry(peer_id.clone()).or_default(); + + let topology = topologies.get_topology(entry.session); + + // Iterate all messages in all candidates. + for (candidate_index, validator, message_state) in + entry.candidates.iter_mut().enumerate().flat_map(|(c_i, c)| { + c.messages.iter_mut().map(move |(k, v)| (c_i as _, k, v)) + }) { + // Propagate the message to all peers in the required routing set OR + // randomly sample peers. + { + let random_routing = &mut message_state.random_routing; + let required_routing = message_state.required_routing; + let rng = &mut *rng; + let mut peer_filter = move |peer_id| { + let in_topology = topology + .as_ref() + .map_or(false, |t| t.route_to_peer(required_routing, peer_id)); + in_topology || { + let route_random = random_routing.sample(total_peers, rng); + if route_random { + random_routing.inc_sent(); + } - async fn send_gossip_messages_to_peer( - entries: &mut HashMap, - ctx: &mut (impl SubsystemContext - + overseer::SubsystemContext), - peer_id: PeerId, - blocks: Vec<(Hash, MissingKnowledge)>, - ) { - let mut assignments = Vec::new(); - let mut approvals = Vec::new(); - let num_blocks = blocks.len(); - - for (block, missing) in blocks.into_iter() { - let entry = match entries.get_mut(&block) { - Some(entry) => entry, - None => continue, // should be unreachable - }; + route_random + } + }; - gum::trace!( - target: LOG_TARGET, - "Sending all assignments and approvals in block {} to peer {}", - block, - peer_id, - ); + if !peer_filter(&peer_id) { + continue + } + } - for (candidate_index, candidate_entry) in entry.candidates.iter().enumerate() { - let candidate_index = candidate_index as u32; - for (validator_index, (approval_state, _is_local)) in - candidate_entry.approvals.iter() - { - let assignment_fingerprint = MessageFingerprint::Assignment( - block.clone(), + let message_subject = + MessageSubject(block.clone(), candidate_index, validator.clone()); + + let assignment_message = ( + IndirectAssignmentCert { + block_hash: block.clone(), + validator: validator.clone(), + cert: message_state.approval_state.assignment_cert().clone(), + }, candidate_index, - validator_index.clone(), ); - match approval_state { - ApprovalState::Assigned(cert) => { - if !missing.contains(&assignment_fingerprint) { - gum::trace!( - target: LOG_TARGET, - ?block, - ?validator_index, - ?candidate_index, - "Skipping sending known assignment", - ); - continue - } - if let Some(p) = entry.known_by.get_mut(&peer_id) { - p.sent.insert(assignment_fingerprint); - } - assignments.push(( - IndirectAssignmentCert { - block_hash: block.clone(), - validator: validator_index.clone(), - cert: cert.clone(), - }, - candidate_index.clone(), - )); - }, - ApprovalState::Approved(assignment_cert, signature) => { - let fingerprint = MessageFingerprint::Approval( - block.clone(), + let approval_message = + message_state.approval_state.approval_signature().map(|signature| { + IndirectSignedApprovalVote { + block_hash: block.clone(), + validator: validator.clone(), candidate_index, - validator_index.clone(), - ); - if missing.contains(&assignment_fingerprint) { - if let Some(p) = entry.known_by.get_mut(&peer_id) { - p.sent.insert(assignment_fingerprint); - } - assignments.push(( - IndirectAssignmentCert { - block_hash: block.clone(), - validator: validator_index.clone(), - cert: assignment_cert.clone(), - }, - candidate_index.clone(), - )); - } else { - gum::trace!( - target: LOG_TARGET, - ?block, - ?validator_index, - ?candidate_index, - "Skipping sending known assignment", - ); + signature, } - if missing.contains(&fingerprint) { - if let Some(p) = entry.known_by.get_mut(&peer_id) { - p.sent.insert(fingerprint); - } - approvals.push(IndirectSignedApprovalVote { - block_hash: block.clone(), - validator: validator_index.clone(), - candidate_index: candidate_index.clone(), - signature: signature.clone(), - }); - } else { - gum::trace!( - target: LOG_TARGET, - ?block, - ?validator_index, - ?candidate_index, - "Skipping sending known approval", - ); - } - }, + }); + + if !peer_knowledge.contains(&message_subject, MessageKind::Assignment) { + peer_knowledge + .sent + .insert(message_subject.clone(), MessageKind::Assignment); + assignments_to_send.push(assignment_message); + } + + if let Some(approval_message) = approval_message { + if !peer_knowledge.contains(&message_subject, MessageKind::Approval) { + peer_knowledge + .sent + .insert(message_subject.clone(), MessageKind::Approval); + approvals_to_send.push(approval_message); + } } } + + block = entry.parent_hash.clone(); } } - if !assignments.is_empty() { + if !assignments_to_send.is_empty() { gum::trace!( target: LOG_TARGET, - num = assignments.len(), - ?num_blocks, ?peer_id, - "Sending assignments to a peer", + num = assignments_to_send.len(), + "Sending assignments to unified peer", ); ctx.send_message(NetworkBridgeMessage::SendValidationMessage( vec![peer_id.clone()], protocol_v1::ValidationProtocol::ApprovalDistribution( - protocol_v1::ApprovalDistributionMessage::Assignments(assignments), + protocol_v1::ApprovalDistributionMessage::Assignments(assignments_to_send), ), )) .await; } - if !approvals.is_empty() { + if !approvals_to_send.is_empty() { gum::trace!( target: LOG_TARGET, - num = approvals.len(), - ?num_blocks, ?peer_id, - "Sending approvals to a peer", + num = approvals_to_send.len(), + "Sending approvals to unified peer", ); ctx.send_message(NetworkBridgeMessage::SendValidationMessage( - vec![peer_id], + vec![peer_id.clone()], protocol_v1::ValidationProtocol::ApprovalDistribution( - protocol_v1::ApprovalDistributionMessage::Approvals(approvals), + protocol_v1::ApprovalDistributionMessage::Approvals(approvals_to_send), ), )) .await; } } + + async fn enable_aggression( + &mut self, + ctx: &mut (impl SubsystemContext + + overseer::SubsystemContext), + resend: Resend, + metrics: &Metrics, + ) { + let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num); + let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num); + let config = self.aggression_config.clone(); + + let (min_age, max_age) = match (min_age, max_age) { + (Some(min), Some(max)) => (min, max), + _ => return, // empty. + }; + + let diff = max_age - min_age; + if !self.aggression_config.is_age_relevant(diff) { + return + } + + adjust_required_routing_and_propagate( + ctx, + &mut self.blocks, + &self.topologies, + |block_entry| { + let block_age = max_age - block_entry.number; + + if resend == Resend::Yes && + config + .resend_unfinalized_period + .as_ref() + .map_or(false, |p| block_age > 0 && block_age % p == 0) + { + // Retry sending to all peers. + for (_, knowledge) in block_entry.known_by.iter_mut() { + knowledge.sent = Knowledge::default(); + } + + true + } else { + false + } + }, + |_, _, _| {}, + ) + .await; + + adjust_required_routing_and_propagate( + ctx, + &mut self.blocks, + &self.topologies, + |block_entry| { + // Ramp up aggression only for the very oldest block(s). + // Approval voting can get stuck on a single block preventing + // its descendants from being finalized. Waste minimal bandwidth + // this way. Also, disputes might prevent finality - again, nothing + // to waste bandwidth on newer blocks for. + &block_entry.number == min_age + }, + |required_routing, local, _| { + // It's a bit surprising not to have a topology at this age. + if *required_routing == RequiredRouting::PendingTopology { + gum::debug!( + target: LOG_TARGET, + age = ?diff, + "Encountered old block pending gossip topology", + ); + return + } + + if config.l1_threshold.as_ref().map_or(false, |t| &diff >= t) { + // Message originator sends to everyone. + if local && *required_routing != RequiredRouting::All { + metrics.on_aggression_l1(); + *required_routing = RequiredRouting::All; + } + } + + if config.l2_threshold.as_ref().map_or(false, |t| &diff >= t) { + // Message originator sends to everyone. Everyone else sends to XY. + if !local && *required_routing != RequiredRouting::GridXY { + metrics.on_aggression_l2(); + *required_routing = RequiredRouting::GridXY; + } + } + }, + ) + .await; + } +} + +// This adjusts the required routing of messages in blocks that pass the block filter +// according to the modifier function given. +// +// The modifier accepts as inputs the current required-routing state, whether +// the message is locally originating, and the validator index of the message issuer. +// +// Then, if the topology is known, this progates messages to all peers in the required +// routing set which are aware of the block. Peers which are unaware of the block +// will have the message sent when it enters their view in `unify_with_peer`. +// +// Note that the required routing of a message can be modified even if the +// topology is unknown yet. +async fn adjust_required_routing_and_propagate( + ctx: &mut (impl SubsystemContext + + overseer::SubsystemContext), + blocks: &mut HashMap, + topologies: &SessionTopologies, + block_filter: impl Fn(&mut BlockEntry) -> bool, + routing_modifier: impl Fn(&mut RequiredRouting, bool, &ValidatorIndex), +) { + let mut peer_assignments = HashMap::new(); + let mut peer_approvals = HashMap::new(); + + // Iterate all blocks in the session, producing payloads + // for each connected peer. + for (block_hash, block_entry) in blocks { + if !block_filter(block_entry) { + continue + } + + // Iterate all messages in all candidates. + for (candidate_index, validator, message_state) in block_entry + .candidates + .iter_mut() + .enumerate() + .flat_map(|(c_i, c)| c.messages.iter_mut().map(move |(k, v)| (c_i as _, k, v))) + { + routing_modifier(&mut message_state.required_routing, message_state.local, validator); + + if message_state.required_routing.is_empty() { + continue + } + + let topology = match topologies.get_topology(block_entry.session) { + Some(t) => t, + None => continue, + }; + + // Propagate the message to all peers in the required routing set. + let message_subject = + MessageSubject(block_hash.clone(), candidate_index, validator.clone()); + + let assignment_message = ( + IndirectAssignmentCert { + block_hash: block_hash.clone(), + validator: validator.clone(), + cert: message_state.approval_state.assignment_cert().clone(), + }, + candidate_index, + ); + let approval_message = + message_state.approval_state.approval_signature().map(|signature| { + IndirectSignedApprovalVote { + block_hash: block_hash.clone(), + validator: validator.clone(), + candidate_index, + signature, + } + }); + + for (peer, peer_knowledge) in &mut block_entry.known_by { + if !topology.route_to_peer(message_state.required_routing, peer) { + continue + } + + if !peer_knowledge.contains(&message_subject, MessageKind::Assignment) { + peer_knowledge.sent.insert(message_subject.clone(), MessageKind::Assignment); + peer_assignments + .entry(peer.clone()) + .or_insert_with(Vec::new) + .push(assignment_message.clone()); + } + + if let Some(approval_message) = approval_message.as_ref() { + if !peer_knowledge.contains(&message_subject, MessageKind::Approval) { + peer_knowledge.sent.insert(message_subject.clone(), MessageKind::Approval); + peer_approvals + .entry(peer.clone()) + .or_insert_with(Vec::new) + .push(approval_message.clone()); + } + } + } + } + } + + // Send messages in accumulated packets, assignments preceding approvals. + + for (peer, assignments_packet) in peer_assignments { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + vec![peer], + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(assignments_packet), + ), + )) + .await; + } + + for (peer, approvals_packet) in peer_approvals { + ctx.send_message(NetworkBridgeMessage::SendValidationMessage( + vec![peer], + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(approvals_packet), + ), + )) + .await; + } } /// Modify the reputation of a peer based on its behavior. @@ -1237,12 +1723,20 @@ impl ApprovalDistribution { Context: overseer::SubsystemContext, { let mut state = State::default(); - self.run_inner(ctx, &mut state).await + + // According to the docs of `rand`, this is a ChaCha12 RNG in practice + // and will always be chosen for strong performance and security properties. + let mut rng = rand::rngs::StdRng::from_entropy(); + self.run_inner(ctx, &mut state, &mut rng).await } /// Used for testing. - async fn run_inner(self, mut ctx: Context, state: &mut State) - where + async fn run_inner( + self, + mut ctx: Context, + state: &mut State, + rng: &mut (impl CryptoRng + Rng), + ) where Context: SubsystemContext, Context: overseer::SubsystemContext, { @@ -1256,7 +1750,7 @@ impl ApprovalDistribution { }; match message { FromOverseer::Communication { msg } => - Self::handle_incoming(&mut ctx, state, msg, &self.metrics).await, + Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await, FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { .. })) => { @@ -1267,7 +1761,7 @@ impl ApprovalDistribution { }, FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); - state.handle_block_finalized(number); + state.handle_block_finalized(&mut ctx, &self.metrics, number).await; }, FromOverseer::Signal(OverseerSignal::Conclude) => return, } @@ -1279,16 +1773,17 @@ impl ApprovalDistribution { state: &mut State, msg: ApprovalDistributionMessage, metrics: &Metrics, + rng: &mut (impl CryptoRng + Rng), ) where Context: SubsystemContext, Context: overseer::SubsystemContext, { match msg { ApprovalDistributionMessage::NetworkBridgeUpdateV1(event) => { - state.handle_network_msg(ctx, metrics, event).await; + state.handle_network_msg(ctx, metrics, event, rng).await; }, ApprovalDistributionMessage::NewBlocks(metas) => { - state.handle_new_blocks(ctx, metrics, metas).await; + state.handle_new_blocks(ctx, metrics, metas, rng).await; }, ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) => { gum::debug!( @@ -1305,6 +1800,7 @@ impl ApprovalDistribution { MessageSource::Local, cert, candidate_index, + rng, ) .await; }, diff --git a/node/network/approval-distribution/src/metrics.rs b/node/network/approval-distribution/src/metrics.rs index b96916a7f0e7..c0887b25f7f4 100644 --- a/node/network/approval-distribution/src/metrics.rs +++ b/node/network/approval-distribution/src/metrics.rs @@ -25,6 +25,8 @@ struct MetricsInner { assignments_imported_total: prometheus::Counter, approvals_imported_total: prometheus::Counter, unified_with_peer_total: prometheus::Counter, + aggression_l1_messages_total: prometheus::Counter, + aggression_l2_messages_total: prometheus::Counter, time_unify_with_peer: prometheus::Histogram, time_import_pending_now_known: prometheus::Histogram, @@ -69,6 +71,18 @@ impl Metrics { .as_ref() .map(|metrics| metrics.time_awaiting_approval_voting.start_timer()) } + + pub(crate) fn on_aggression_l1(&self) { + if let Some(metrics) = &self.0 { + metrics.aggression_l1_messages_total.inc(); + } + } + + pub(crate) fn on_aggression_l2(&self) { + if let Some(metrics) = &self.0 { + metrics.aggression_l2_messages_total.inc(); + } + } } impl MetricsTrait for Metrics { @@ -95,6 +109,20 @@ impl MetricsTrait for Metrics { )?, registry, )?, + aggression_l1_messages_total: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_approval_distribution_aggression_l1_messages_total", + "Number of messages in approval distribution for which aggression L1 has been triggered", + )?, + registry, + )?, + aggression_l2_messages_total: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_approval_distribution_aggression_l2_messages_total", + "Number of messages in approval distribution for which aggression L2 has been triggered", + )?, + registry, + )?, time_unify_with_peer: prometheus::register( prometheus::Histogram::with_opts(prometheus::HistogramOpts::new( "polkadot_parachain_time_unify_with_peer", diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index 6f08b2a8523a..39d4b61a6a03 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -24,6 +24,10 @@ use polkadot_node_primitives::approval::{ use polkadot_node_subsystem::messages::{AllMessages, ApprovalCheckError}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt as _; +use polkadot_primitives::v2::{AuthorityDiscoveryId, BlakeTwo256, HashT}; +use rand::SeedableRng; +use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; +use sp_core::crypto::Pair as PairT; use std::time::Duration; type VirtualOverseer = test_helpers::TestSubsystemContextHandle; @@ -46,7 +50,9 @@ fn test_harness>( let subsystem = ApprovalDistribution::new(Default::default()); { - let subsystem = subsystem.run_inner(context, &mut state); + let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345); + + let subsystem = subsystem.run_inner(context, &mut state, &mut rng); let test_fut = test_fn(virtual_overseer); @@ -99,6 +105,65 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { msg } +fn make_peers_and_authority_ids(n: usize) -> Vec<(PeerId, AuthorityDiscoveryId)> { + (0..n) + .map(|_| { + let peer_id = PeerId::random(); + let authority_id = AuthorityDiscoveryPair::generate().0.public(); + + (peer_id, authority_id) + }) + .collect() +} + +fn make_gossip_topology( + session: SessionIndex, + all_peers: &[(PeerId, AuthorityDiscoveryId)], + neighbors_x: &[usize], + neighbors_y: &[usize], +) -> network_bridge_event::NewGossipTopology { + let mut t = network_bridge_event::NewGossipTopology { + session, + our_neighbors_x: HashMap::new(), + our_neighbors_y: HashMap::new(), + }; + + for &i in neighbors_x { + t.our_neighbors_x.insert( + all_peers[i].1.clone(), + network_bridge_event::TopologyPeerInfo { + peer_ids: vec![all_peers[i].0.clone()], + validator_index: ValidatorIndex::from(i as u32), + }, + ); + } + + for &i in neighbors_y { + t.our_neighbors_y.insert( + all_peers[i].1.clone(), + network_bridge_event::TopologyPeerInfo { + peer_ids: vec![all_peers[i].0.clone()], + validator_index: ValidatorIndex::from(i as u32), + }, + ); + } + + t +} + +async fn setup_gossip_topology( + virtual_overseer: &mut VirtualOverseer, + gossip_topology: network_bridge_event::NewGossipTopology, +) { + overseer_send( + virtual_overseer, + ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::NewGossipTopology( + gossip_topology, + )), + ) + .await; +} + async fn setup_peer_with_view( virtual_overseer: &mut VirtualOverseer, peer_id: &PeerId, @@ -201,6 +266,7 @@ fn try_import_the_same_assignment() { number: 2, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -283,6 +349,7 @@ fn spam_attack_results_in_negative_reputation_change() { number: 2, candidates: vec![Default::default(); candidates_count], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); @@ -365,6 +432,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -442,6 +510,7 @@ fn import_approval_happy_path() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -528,6 +597,7 @@ fn import_approval_bad() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -605,6 +675,7 @@ fn update_our_view() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let meta_b = BlockApprovalMeta { hash: hash_b, @@ -612,6 +683,7 @@ fn update_our_view() { number: 2, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let meta_c = BlockApprovalMeta { hash: hash_c, @@ -619,6 +691,7 @@ fn update_our_view() { number: 3, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); @@ -678,6 +751,7 @@ fn update_peer_view() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let meta_b = BlockApprovalMeta { hash: hash_b, @@ -685,6 +759,7 @@ fn update_peer_view() { number: 2, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let meta_c = BlockApprovalMeta { hash: hash_c, @@ -692,6 +767,7 @@ fn update_peer_view() { number: 3, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta_a, meta_b, meta_c]); @@ -829,6 +905,7 @@ fn import_remotely_then_locally() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -913,6 +990,7 @@ fn sends_assignments_even_when_state_is_approved() { number: 1, candidates: vec![Default::default(); 1], slot: 1.into(), + session: 1, }; let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); overseer_send(overseer, msg).await; @@ -999,6 +1077,7 @@ fn race_condition_in_local_vs_remote_view_update() { number: 2, candidates: vec![Default::default(); candidates_count], slot: 1.into(), + session: 1, }; // This will send a peer view that is ahead of our view @@ -1052,3 +1131,1099 @@ fn race_condition_in_local_vs_remote_view_update() { virtual_overseer }); } + +// Tests that local messages propagate to both dimensions. +#[test] +fn propagates_locally_generated_assignment_to_both_dimensions() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + let expected_indices = [ + // Both dimensions in the gossip topology + 0, 10, 20, 30, 50, 51, 52, 53, + ]; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: dummy_signature(), + }; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert.clone(), candidate_index), + ) + .await; + + overseer_send(overseer, ApprovalDistributionMessage::DistributeApproval(approval.clone())) + .await; + + let assignments = vec![(cert.clone(), candidate_index)]; + let approvals = vec![approval.clone()]; + + let assignment_sent_peers = assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), expected_indices.len() + 4); + for &i in &expected_indices { + assert!( + sent_peers.contains(&peers[i].0), + "Message not sent to expected peer {}", + i, + ); + } + assert_eq!(sent_assignments, assignments); + sent_peers + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + ) + )) => { + // Random sampling is reused from the assignment. + assert_eq!(sent_peers, assignment_sent_peers); + assert_eq!(sent_approvals, approvals); + } + ); + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// Tests that messages propagate to the unshared dimension. +#[test] +fn propagates_assignments_along_unshared_dimension() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + // Test messages from X direction go to Y peers + { + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let assignments = vec![(cert.clone(), candidate_index)]; + + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + + // Issuer of the message is important, not the peer we receive from. + // 99 deliberately chosen because it's not in X or Y. + send_message_from_peer(overseer, &peers[99].0, msg).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + _, + _, + tx, + )) => { + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + expect_reputation_change(overseer, &peers[99].0, BENEFIT_VALID_MESSAGE_FIRST).await; + + let expected_y = [50, 51, 52, 53]; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), expected_y.len() + 4); + for &i in &expected_y { + assert!( + sent_peers.contains(&peers[i].0), + "Message not sent to expected peer {}", + i, + ); + } + assert_eq!(sent_assignments, assignments); + } + ); + }; + + // Test messages from X direction go to Y peers + { + let validator_index = ValidatorIndex(50); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let assignments = vec![(cert.clone(), candidate_index)]; + + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + + // Issuer of the message is important, not the peer we receive from. + // 99 deliberately chosen because it's not in X or Y. + send_message_from_peer(overseer, &peers[99].0, msg).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + _, + _, + tx, + )) => { + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + expect_reputation_change(overseer, &peers[99].0, BENEFIT_VALID_MESSAGE_FIRST).await; + + let expected_x = [0, 10, 20, 30]; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), expected_x.len() + 4); + for &i in &expected_x { + assert!( + sent_peers.contains(&peers[i].0), + "Message not sent to expected peer {}", + i, + ); + } + assert_eq!(sent_assignments, assignments); + } + ); + }; + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// tests that messages are propagated to necessary peers after they connect +#[test] +fn propagates_to_required_after_connect() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + let omitted = [0, 10, 50, 51]; + + // Connect all peers except omitted. + for (i, (peer, _)) in peers.iter().enumerate() { + if !omitted.contains(&i) { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + } + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + let expected_indices = [ + // Both dimensions in the gossip topology, minus omitted. + 20, 30, 52, 53, + ]; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: dummy_signature(), + }; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert.clone(), candidate_index), + ) + .await; + + overseer_send(overseer, ApprovalDistributionMessage::DistributeApproval(approval.clone())) + .await; + + let assignments = vec![(cert.clone(), candidate_index)]; + let approvals = vec![approval.clone()]; + + let assignment_sent_peers = assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), expected_indices.len() + 4); + for &i in &expected_indices { + assert!( + sent_peers.contains(&peers[i].0), + "Message not sent to expected peer {}", + i, + ); + } + assert_eq!(sent_assignments, assignments); + sent_peers + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + ) + )) => { + // Random sampling is reused from the assignment. + assert_eq!(sent_peers, assignment_sent_peers); + assert_eq!(sent_approvals, approvals); + } + ); + + for i in omitted.iter().copied() { + setup_peer_with_view(overseer, &peers[i].0, view![hash]).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), 1); + assert_eq!(&sent_peers[0], &peers[i].0); + assert_eq!(sent_assignments, assignments); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + ) + )) => { + assert_eq!(sent_peers.len(), 1); + assert_eq!(&sent_peers[0], &peers[i].0); + assert_eq!(sent_approvals, approvals); + } + ); + } + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// test that new gossip topology triggers send of messages. +#[test] +fn sends_to_more_peers_after_getting_topology() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let _ = test_harness(State::default(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers except omitted. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: dummy_signature(), + }; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert.clone(), candidate_index), + ) + .await; + + overseer_send(overseer, ApprovalDistributionMessage::DistributeApproval(approval.clone())) + .await; + + let assignments = vec![(cert.clone(), candidate_index)]; + let approvals = vec![approval.clone()]; + + let mut expected_indices = vec![0, 10, 20, 30, 50, 51, 52, 53]; + let assignment_sent_peers = assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + // Only sends to random peers. + assert_eq!(sent_peers.len(), 4); + for peer in &sent_peers { + let i = peers.iter().position(|p| peer == &p.0).unwrap(); + // Random gossip before topology can send to topology-targeted peers. + // Remove them from the expected indices so we don't expect + // them to get the messages again after the assignment. + expected_indices.retain(|&i2| i2 != i); + } + assert_eq!(sent_assignments, assignments); + sent_peers + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + ) + )) => { + // Random sampling is reused from the assignment. + assert_eq!(sent_peers, assignment_sent_peers); + assert_eq!(sent_approvals, approvals); + } + ); + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + let mut expected_indices_assignments = expected_indices.clone(); + let mut expected_indices_approvals = expected_indices.clone(); + + for _ in 0..expected_indices_assignments.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + // Sends to all expected peers. + assert_eq!(sent_peers.len(), 1); + assert_eq!(sent_assignments, assignments); + + let pos = expected_indices_assignments.iter() + .position(|i| &peers[*i].0 == &sent_peers[0]) + .unwrap(); + expected_indices_assignments.remove(pos); + } + ); + } + + for _ in 0..expected_indices_approvals.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + ) + )) => { + // Sends to all expected peers. + assert_eq!(sent_peers.len(), 1); + assert_eq!(sent_approvals, approvals); + + let pos = expected_indices_approvals.iter() + .position(|i| &peers[*i].0 == &sent_peers[0]) + .unwrap(); + + expected_indices_approvals.remove(pos); + } + ); + } + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// test aggression L1 +#[test] +fn originator_aggression_l1() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let mut state = State::default(); + state.aggression_config.resend_unfinalized_period = None; + let aggression_l1_threshold = state.aggression_config.l1_threshold.clone().unwrap(); + + let _ = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers except omitted. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let approval = IndirectSignedApprovalVote { + block_hash: hash, + candidate_index, + validator: validator_index, + signature: dummy_signature(), + }; + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + overseer_send( + overseer, + ApprovalDistributionMessage::DistributeAssignment(cert.clone(), candidate_index), + ) + .await; + + overseer_send(overseer, ApprovalDistributionMessage::DistributeApproval(approval.clone())) + .await; + + let assignments = vec![(cert.clone(), candidate_index)]; + let approvals = vec![approval.clone()]; + + let prev_sent_indices = assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(_) + ) + )) => { + sent_peers.into_iter() + .filter_map(|sp| peers.iter().position(|p| &p.0 == &sp)) + .collect::>() + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + _, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(_) + ) + )) => { } + ); + + // Add blocks until aggression L1 is triggered. + { + let mut parent_hash = hash; + for level in 0..aggression_l1_threshold { + let number = 1 + level + 1; // first block had number 1 + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + } + + let unsent_indices = + (0..peers.len()).filter(|i| !prev_sent_indices.contains(&i)).collect::>(); + + for _ in 0..unsent_indices.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + // Sends to all expected peers. + assert_eq!(sent_peers.len(), 1); + assert_eq!(sent_assignments, assignments); + + assert!(unsent_indices.iter() + .find(|i| &peers[**i].0 == &sent_peers[0]) + .is_some()); + } + ); + } + + for _ in 0..unsent_indices.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals) + ) + )) => { + // Sends to all expected peers. + assert_eq!(sent_peers.len(), 1); + assert_eq!(sent_approvals, approvals); + + assert!(unsent_indices.iter() + .find(|i| &peers[**i].0 == &sent_peers[0]) + .is_some()); + } + ); + } + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// test aggression L1 +#[test] +fn non_originator_aggression_l1() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let mut state = State::default(); + state.aggression_config.resend_unfinalized_period = None; + let aggression_l1_threshold = state.aggression_config.l1_threshold.clone().unwrap(); + + let _ = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers except omitted. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + let assignments = vec![(cert.clone(), candidate_index)]; + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + + // Issuer of the message is important, not the peer we receive from. + // 99 deliberately chosen because it's not in X or Y. + send_message_from_peer(overseer, &peers[99].0, msg).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + _, + _, + tx, + )) => { + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, &peers[99].0, BENEFIT_VALID_MESSAGE_FIRST).await; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + _, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(_) + ) + )) => { } + ); + + // Add blocks until aggression L1 is triggered. + { + let mut parent_hash = hash; + for level in 0..aggression_l1_threshold { + let number = 1 + level + 1; // first block had number 1 + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + } + + // No-op on non-originator + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// test aggression L2 on non-originator +#[test] +fn non_originator_aggression_l2() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let mut state = State::default(); + state.aggression_config.resend_unfinalized_period = None; + + let aggression_l1_threshold = state.aggression_config.l1_threshold.clone().unwrap(); + let aggression_l2_threshold = state.aggression_config.l2_threshold.clone().unwrap(); + let _ = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers except omitted. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + let assignments = vec![(cert.clone(), candidate_index)]; + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + + // Issuer of the message is important, not the peer we receive from. + // 99 deliberately chosen because it's not in X or Y. + send_message_from_peer(overseer, &peers[99].0, msg).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + _, + _, + tx, + )) => { + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + + expect_reputation_change(overseer, &peers[99].0, BENEFIT_VALID_MESSAGE_FIRST).await; + + let prev_sent_indices = assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(_) + ) + )) => { + sent_peers.into_iter() + .filter_map(|sp| peers.iter().position(|p| &p.0 == &sp)) + .collect::>() + } + ); + + // Add blocks until aggression L1 is triggered. + let chain_head = { + let mut parent_hash = hash; + for level in 0..aggression_l1_threshold { + let number = 1 + level + 1; // first block had number 1 + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + + parent_hash + }; + + // No-op on non-originator + + // Add blocks until aggression L2 is triggered. + { + let mut parent_hash = chain_head; + for level in 0..aggression_l2_threshold - aggression_l1_threshold { + let number = aggression_l1_threshold + level + 1 + 1; // first block had number 1 + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + } + + // XY dimension - previously sent. + let unsent_indices = [0, 10, 20, 30, 50, 51, 52, 53] + .iter() + .cloned() + .filter(|i| !prev_sent_indices.contains(&i)) + .collect::>(); + + for _ in 0..unsent_indices.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + // Sends to all expected peers. + assert_eq!(sent_peers.len(), 1); + assert_eq!(sent_assignments, assignments); + + assert!(unsent_indices.iter() + .find(|i| &peers[**i].0 == &sent_peers[0]) + .is_some()); + } + ); + } + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} + +// Tests that messages propagate to the unshared dimension. +#[test] +fn resends_messages_periodically() { + let parent_hash = Hash::repeat_byte(0xFF); + let hash = Hash::repeat_byte(0xAA); + + let peers = make_peers_and_authority_ids(100); + + let mut state = State::default(); + state.aggression_config.l1_threshold = None; + state.aggression_config.l2_threshold = None; + state.aggression_config.resend_unfinalized_period = Some(2); + let _ = test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + + // Connect all peers. + for (peer, _) in &peers { + setup_peer_with_view(overseer, peer, view![hash]).await; + } + + // Set up a gossip topology. + setup_gossip_topology( + overseer, + make_gossip_topology(1, &peers, &[0, 10, 20, 30], &[50, 51, 52, 53]), + ) + .await; + + // new block `hash_a` with 1 candidates + let meta = BlockApprovalMeta { + hash, + parent_hash, + number: 1, + candidates: vec![Default::default(); 1], + slot: 1.into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + let validator_index = ValidatorIndex(0); + let candidate_index = 0u32; + + // import an assignment and approval locally. + let cert = fake_assignment_cert(hash, validator_index); + let assignments = vec![(cert.clone(), candidate_index)]; + + { + let msg = protocol_v1::ApprovalDistributionMessage::Assignments(assignments.clone()); + + // Issuer of the message is important, not the peer we receive from. + // 99 deliberately chosen because it's not in X or Y. + send_message_from_peer(overseer, &peers[99].0, msg).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ApprovalVoting(ApprovalVotingMessage::CheckAndImportAssignment( + _, + _, + tx, + )) => { + tx.send(AssignmentCheckResult::Accepted).unwrap(); + } + ); + expect_reputation_change(overseer, &peers[99].0, BENEFIT_VALID_MESSAGE_FIRST).await; + + let expected_y = [50, 51, 52, 53]; + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), expected_y.len() + 4); + for &i in &expected_y { + assert!( + sent_peers.contains(&peers[i].0), + "Message not sent to expected peer {}", + i, + ); + } + assert_eq!(sent_assignments, assignments); + } + ); + }; + + let mut number = 1; + for _ in 0..10 { + // Add blocks until resend is done. + { + let mut parent_hash = hash; + for level in 0..2 { + number = number + 1; + let hash = BlakeTwo256::hash_of(&(parent_hash, number)); + let meta = BlockApprovalMeta { + hash, + parent_hash, + number, + candidates: vec![], + slot: (level as u64).into(), + session: 1, + }; + + let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]); + overseer_send(overseer, msg).await; + + parent_hash = hash; + } + } + + let mut expected_y = vec![50, 51, 52, 53]; + + // Expect messages sent only to topology peers, one by one. + for _ in 0..expected_y.len() { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( + sent_peers, + protocol_v1::ValidationProtocol::ApprovalDistribution( + protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments) + ) + )) => { + assert_eq!(sent_peers.len(), 1); + let expected_pos = expected_y.iter() + .position(|&i| &peers[i].0 == &sent_peers[0]) + .unwrap(); + + expected_y.remove(expected_pos); + assert_eq!(sent_assignments, assignments); + } + ); + } + } + + assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent"); + virtual_overseer + }); +} diff --git a/node/network/bitfield-distribution/src/lib.rs b/node/network/bitfield-distribution/src/lib.rs index 6bd952233111..0bedc677f53d 100644 --- a/node/network/bitfield-distribution/src/lib.rs +++ b/node/network/bitfield-distribution/src/lib.rs @@ -523,7 +523,14 @@ async fn handle_network_msg( // get rid of superfluous data state.peer_views.remove(&peer); }, - NetworkBridgeEvent::NewGossipTopology(peers) => { + NetworkBridgeEvent::NewGossipTopology(topology) => { + // Combine all peers in the x & y direction as we don't make any distinction. + let peers: HashSet = topology + .our_neighbors_x + .values() + .chain(topology.our_neighbors_y.values()) + .flat_map(|peer_info| peer_info.peer_ids.iter().cloned()) + .collect(); let newly_added: Vec = peers.difference(&state.gossip_peers).cloned().collect(); state.gossip_peers = peers; for new_peer in newly_added { diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index b2129cdebbdf..0ca57f044c8b 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -31,10 +31,13 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::gen::{OverseerError, Subsystem}; -use polkadot_primitives::v2::{BlockNumber, Hash}; +use polkadot_primitives::v2::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorIndex}; use polkadot_subsystem::{ errors::{SubsystemError, SubsystemResult}, - messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage}, + messages::{ + network_bridge_event::{NewGossipTopology, TopologyPeerInfo}, + AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage, + }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemSender, }; @@ -45,7 +48,8 @@ use polkadot_subsystem::{ pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; use std::{ - collections::{hash_map, HashMap, HashSet}, + collections::{hash_map, HashMap}, + iter::ExactSizeIterator, sync::Arc, }; @@ -590,30 +594,36 @@ where ).await; } NetworkBridgeMessage::NewGossipTopology { - our_neighbors, + session, + our_neighbors_x, + our_neighbors_y, } => { gum::debug!( target: LOG_TARGET, action = "NewGossipTopology", - neighbors = our_neighbors.len(), + neighbors_x = our_neighbors_x.len(), + neighbors_y = our_neighbors_y.len(), "Gossip topology has changed", ); - let ads = &mut authority_discovery_service; - let mut gossip_peers = HashSet::with_capacity(our_neighbors.len()); - for authority in our_neighbors { - let addr = get_peer_id_by_authority_id( - ads, - authority.clone(), - ).await; + let gossip_peers_x = update_gossip_peers_1d( + &mut authority_discovery_service, + our_neighbors_x, + ).await; - if let Some(peer_id) = addr { - gossip_peers.insert(peer_id); - } - } + let gossip_peers_y = update_gossip_peers_1d( + &mut authority_discovery_service, + our_neighbors_y, + ).await; dispatch_validation_event_to_all_unbounded( - NetworkBridgeEvent::NewGossipTopology(gossip_peers), + NetworkBridgeEvent::NewGossipTopology( + NewGossipTopology { + session, + our_neighbors_x: gossip_peers_x, + our_neighbors_y: gossip_peers_y, + } + ), ctx.sender(), ); } @@ -624,6 +634,28 @@ where } } +async fn update_gossip_peers_1d( + ads: &mut AD, + neighbors: N, +) -> HashMap +where + AD: validator_discovery::AuthorityDiscovery, + N: IntoIterator, + N::IntoIter: std::iter::ExactSizeIterator, +{ + let neighbors = neighbors.into_iter(); + let mut peers = HashMap::with_capacity(neighbors.len()); + for (authority, validator_index) in neighbors { + let addr = get_peer_id_by_authority_id(ads, authority.clone()).await; + + if let Some(peer_id) = addr { + peers.insert(authority, TopologyPeerInfo { peer_ids: vec![peer_id], validator_index }); + } + } + + peers +} + async fn handle_network_messages( mut sender: impl SubsystemSender, mut network_service: impl Network, diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index bb972598abe5..5a286bdf89a8 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -971,7 +971,7 @@ where PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; }, - NewGossipTopology(..) => { + NewGossipTopology { .. } => { // impossible! }, } diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index e3c188739148..f3e50a630097 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -1090,7 +1090,7 @@ where state.peer_data.remove(&peer_id); state.metrics.note_collator_peer_count(state.peer_data.len()); }, - NewGossipTopology(..) => { + NewGossipTopology { .. } => { // impossible! }, PeerViewChange(peer_id, view) => { diff --git a/node/network/gossip-support/Cargo.toml b/node/network/gossip-support/Cargo.toml index c1a60a97efdb..54984e748fac 100644 --- a/node/network/gossip-support/Cargo.toml +++ b/node/network/gossip-support/Cargo.toml @@ -25,6 +25,7 @@ gum = { package = "tracing-gum", path = "../../gum" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index 9db1e9050df2..d8ba6ce7c89d 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -49,10 +49,12 @@ use polkadot_node_subsystem::{ RuntimeApiRequest, }, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, - SubsystemError, SubsystemSender, + SubsystemError, }; use polkadot_node_subsystem_util as util; -use polkadot_primitives::v2::{AuthorityDiscoveryId, Hash, SessionIndex}; +use polkadot_primitives::v2::{ + AuthorityDiscoveryId, Hash, SessionIndex, SessionInfo, ValidatorIndex, +}; #[cfg(test)] mod tests; @@ -213,6 +215,24 @@ where if force_request { leaf_session } else { maybe_new_session }; if let Some((session_index, relay_parent)) = maybe_issue_connection { + let session_info = + util::request_session_info(leaf, session_index, ctx.sender()).await.await??; + + let session_info = match session_info { + Some(s) => s, + None => { + gum::warn!( + relay_parent = ?leaf, + session_index = self.last_session_index, + "Failed to get session info.", + ); + + continue + }, + }; + + // Note: we only update `last_session_index` once we've + // successfully gotten the `SessionInfo`. let is_new_session = maybe_new_session.is_some(); if is_new_session { gum::debug!( @@ -223,45 +243,52 @@ where self.last_session_index = Some(session_index); } - let all_authorities = determine_relevant_authorities(ctx, relay_parent).await?; - let our_index = ensure_i_am_an_authority(&self.keystore, &all_authorities).await?; - let other_authorities = { - let mut authorities = all_authorities.clone(); - authorities.swap_remove(our_index); - authorities - }; + // Connect to authorities from the past/present/future. + // + // This is maybe not the right place for this logic to live, + // but at the moment we're limited by the network bridge's ability + // to handle connection requests (it only allows one, globally). + // + // Certain network protocols - mostly req/res, but some gossip, + // will require being connected to past/future validators as well + // as current. That is, the old authority sets are not made obsolete + // by virtue of a new session being entered. Therefore we maintain + // connections to a much broader set of validators. + { + let mut connections = authorities_past_present_future(ctx, leaf).await?; + + // Remove all of our locally controlled validator indices so we don't connect to ourself. + // If we control none of them, don't issue connection requests - we're outside + // of the 'clique' of recent validators. + if remove_all_controlled(&self.keystore, &mut connections).await != 0 { + self.issue_connection_request(ctx, connections).await; + } + } - self.issue_connection_request(ctx, other_authorities).await; + // Gossip topology is only relevant for authorities in the current session. + let our_index = + ensure_i_am_an_authority(&self.keystore, &session_info.discovery_keys).await?; if is_new_session { - update_gossip_topology(ctx, our_index, all_authorities, relay_parent).await?; - self.update_authority_status_metrics(leaf, ctx.sender()).await?; + self.update_authority_status_metrics(&session_info).await; + + update_gossip_topology( + ctx, + our_index, + session_info.discovery_keys, + relay_parent, + session_index, + ) + .await?; } } } Ok(()) } - async fn update_authority_status_metrics( - &mut self, - leaf: Hash, - sender: &mut impl SubsystemSender, - ) -> Result<(), util::Error> { - if let Some(session_info) = util::request_session_info( - leaf, - self.last_session_index - .expect("Last session index is always set on every session index change"), - sender, - ) - .await - .await?? - { - let maybe_index = match ensure_i_am_an_authority( - &self.keystore, - &session_info.discovery_keys, - ) - .await - { + async fn update_authority_status_metrics(&mut self, session_info: &SessionInfo) { + let maybe_index = + match ensure_i_am_an_authority(&self.keystore, &session_info.discovery_keys).await { Ok(index) => { self.metrics.on_is_authority(); Some(index) @@ -275,21 +302,19 @@ where Err(_) => None, }; - if let Some(validator_index) = maybe_index { - // The subset of authorities participating in parachain consensus. - let parachain_validators_this_session = session_info.validators; + if let Some(validator_index) = maybe_index { + // The subset of authorities participating in parachain consensus. + let parachain_validators_this_session = session_info.validators.len(); - // First `maxValidators` entries are the parachain validators. We'll check - // if our index is in this set to avoid searching for the keys. - // https://github.com/paritytech/polkadot/blob/a52dca2be7840b23c19c153cf7e110b1e3e475f8/runtime/parachains/src/configuration.rs#L148 - if validator_index < parachain_validators_this_session.len() { - self.metrics.on_is_parachain_validator(); - } else { - self.metrics.on_is_not_parachain_validator(); - } + // First `maxValidators` entries are the parachain validators. We'll check + // if our index is in this set to avoid searching for the keys. + // https://github.com/paritytech/polkadot/blob/a52dca2be7840b23c19c153cf7e110b1e3e475f8/runtime/parachains/src/configuration.rs#L148 + if validator_index < parachain_validators_this_session { + self.metrics.on_is_parachain_validator(); + } else { + self.metrics.on_is_not_parachain_validator(); } } - Ok(()) } async fn issue_connection_request( @@ -378,7 +403,7 @@ where }, NetworkBridgeEvent::OurViewChange(_) => {}, NetworkBridgeEvent::PeerViewChange(_, _) => {}, - NetworkBridgeEvent::NewGossipTopology(_) => {}, + NetworkBridgeEvent::NewGossipTopology { .. } => {}, NetworkBridgeEvent::PeerMessage(_, v) => { match v {}; }, @@ -416,7 +441,8 @@ where } } -async fn determine_relevant_authorities( +// Get the authorities of the past, present, and future. +async fn authorities_past_present_future( ctx: &mut Context, relay_parent: Hash, ) -> Result, util::Error> @@ -428,7 +454,7 @@ where gum::debug!( target: LOG_TARGET, authority_count = ?authorities.len(), - "Determined relevant authorities", + "Determined past/present/future authorities", ); Ok(authorities) } @@ -447,6 +473,25 @@ async fn ensure_i_am_an_authority( Err(util::Error::NotAValidator) } +/// Filter out all controlled keys in the given set. Returns the number of keys removed. +async fn remove_all_controlled( + keystore: &SyncCryptoStorePtr, + authorities: &mut Vec, +) -> usize { + let mut to_remove = Vec::new(); + for (i, v) in authorities.iter().enumerate() { + if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]).await { + to_remove.push(i); + } + } + + for i in to_remove.iter().rev().copied() { + authorities.remove(i); + } + + to_remove.len() +} + /// We partition the list of all sorted `authorities` into `sqrt(len)` groups of `sqrt(len)` size /// and form a matrix where each validator is connected to all validators in its row and column. /// This is similar to `[web3]` research proposed topology, except for the groups are not parachain @@ -460,6 +505,7 @@ async fn update_gossip_topology( our_index: usize, authorities: Vec, relay_parent: Hash, + session_index: SessionIndex, ) -> Result<(), util::Error> where Context: SubsystemContext, @@ -469,6 +515,8 @@ where let random_seed = { let (tx, rx) = oneshot::channel(); + // TODO https://github.com/paritytech/polkadot/issues/5316: + // get the random seed from the `SessionInfo` instead. ctx.send_message(RuntimeApiMessage::Request( relay_parent, RuntimeApiRequest::CurrentBabeEpoch(tx), @@ -493,16 +541,38 @@ where .expect("our_index < len; indices contains it; qed"); let neighbors = matrix_neighbors(our_shuffled_position, len); - let our_neighbors = neighbors.map(|i| authorities[indices[i]].clone()).collect(); - - ctx.send_message(NetworkBridgeMessage::NewGossipTopology { our_neighbors }) - .await; + let row_neighbors = neighbors + .row_neighbors + .map(|i| indices[i]) + .map(|i| (authorities[i].clone(), ValidatorIndex::from(i as u32))) + .collect(); + + let column_neighbors = neighbors + .column_neighbors + .map(|i| indices[i]) + .map(|i| (authorities[i].clone(), ValidatorIndex::from(i as u32))) + .collect(); + + ctx.send_message(NetworkBridgeMessage::NewGossipTopology { + session: session_index, + our_neighbors_x: row_neighbors, + our_neighbors_y: column_neighbors, + }) + .await; Ok(()) } +struct MatrixNeighbors { + row_neighbors: R, + column_neighbors: C, +} + /// Compute our row and column neighbors in a matrix -fn matrix_neighbors(our_index: usize, len: usize) -> impl Iterator { +fn matrix_neighbors( + our_index: usize, + len: usize, +) -> MatrixNeighbors, impl Iterator> { assert!(our_index < len, "our_index is computed using `enumerate`; qed"); // e.g. for size 11 the matrix would be @@ -520,7 +590,10 @@ fn matrix_neighbors(our_index: usize, len: usize) -> impl Iterator let row_neighbors = our_row * sqrt..std::cmp::min(our_row * sqrt + sqrt, len); let column_neighbors = (our_column..len).step_by(sqrt); - row_neighbors.chain(column_neighbors).filter(move |i| *i != our_index) + MatrixNeighbors { + row_neighbors: row_neighbors.filter(move |i| *i != our_index), + column_neighbors: column_neighbors.filter(move |i| *i != our_index), + } } impl overseer::Subsystem for GossipSupport diff --git a/node/network/gossip-support/src/tests.rs b/node/network/gossip-support/src/tests.rs index 6302772501ff..72c1dff85ad6 100644 --- a/node/network/gossip-support/src/tests.rs +++ b/node/network/gossip-support/src/tests.rs @@ -24,7 +24,9 @@ use futures::{executor, future, Future}; use lazy_static::lazy_static; use sc_network::multiaddr::Protocol; +use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch}; +use sp_core::crypto::Pair as PairT; use sp_keyring::Sr25519Keyring; use polkadot_node_subsystem::{ @@ -38,25 +40,46 @@ use test_helpers::mock::make_ferdie_keystore; use super::*; +const AUTHORITY_KEYRINGS: &[Sr25519Keyring] = &[ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Eve, + Sr25519Keyring::One, + Sr25519Keyring::Two, + Sr25519Keyring::Ferdie, +]; + lazy_static! { static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new(); - static ref AUTHORITIES: Vec = { - let mut authorities = OTHER_AUTHORITIES.clone(); - authorities.push(Sr25519Keyring::Ferdie.public().into()); - authorities + static ref AUTHORITIES: Vec = + AUTHORITY_KEYRINGS.iter().map(|k| k.public().into()).collect(); + + static ref AUTHORITIES_WITHOUT_US: Vec = { + let mut a = AUTHORITIES.clone(); + a.pop(); // remove FERDIE. + a + }; + + static ref PAST_PRESENT_FUTURE_AUTHORITIES: Vec = { + (0..50) + .map(|_| AuthorityDiscoveryPair::generate().0.public()) + .chain(AUTHORITIES.clone()) + .collect() }; - static ref OTHER_AUTHORITIES: Vec = vec![ - Sr25519Keyring::Alice.public().into(), - Sr25519Keyring::Bob.public().into(), - Sr25519Keyring::Charlie.public().into(), - Sr25519Keyring::Eve.public().into(), - Sr25519Keyring::One.public().into(), - Sr25519Keyring::Two.public().into(), + + // [2 6] + // [4 5] + // [1 3] + // [0 ] + + static ref ROW_NEIGHBORS: Vec<(AuthorityDiscoveryId, ValidatorIndex)> = vec![ + (Sr25519Keyring::Charlie.public().into(), ValidatorIndex::from(2)), ]; - static ref NEIGHBORS: Vec = vec![ - Sr25519Keyring::Two.public().into(), - Sr25519Keyring::Charlie.public().into(), - Sr25519Keyring::Eve.public().into(), + + static ref COLUMN_NEIGHBORS: Vec<(AuthorityDiscoveryId, ValidatorIndex)> = vec![ + (Sr25519Keyring::Two.public().into(), ValidatorIndex::from(5)), + (Sr25519Keyring::Eve.public().into(), ValidatorIndex::from(3)), ]; } @@ -70,8 +93,11 @@ struct MockAuthorityDiscovery { impl MockAuthorityDiscovery { fn new() -> Self { - let authorities: HashMap<_, _> = - AUTHORITIES.clone().into_iter().map(|a| (PeerId::random(), a)).collect(); + let authorities: HashMap<_, _> = PAST_PRESENT_FUTURE_AUTHORITIES + .clone() + .into_iter() + .map(|a| (PeerId::random(), a)) + .collect(); let addrs = authorities .clone() .into_iter() @@ -103,10 +129,10 @@ impl AuthorityDiscovery for MockAuthorityDiscovery { } } -async fn get_other_authorities_addrs() -> Vec> { - let mut addrs = Vec::with_capacity(OTHER_AUTHORITIES.len()); +async fn get_multiaddrs(authorities: Vec) -> Vec> { + let mut addrs = Vec::with_capacity(authorities.len()); let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); - for authority in OTHER_AUTHORITIES.iter().cloned() { + for authority in authorities.into_iter() { if let Some(addr) = discovery.get_addresses_by_authority_id(authority).await { addrs.push(addr); } @@ -114,10 +140,12 @@ async fn get_other_authorities_addrs() -> Vec> { addrs } -async fn get_other_authorities_addrs_map() -> HashMap> { - let mut addrs = HashMap::with_capacity(OTHER_AUTHORITIES.len()); +async fn get_address_map( + authorities: Vec, +) -> HashMap> { + let mut addrs = HashMap::with_capacity(authorities.len()); let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); - for authority in OTHER_AUTHORITIES.iter().cloned() { + for authority in authorities.into_iter() { if let Some(addr) = discovery.get_addresses_by_authority_id(authority.clone()).await { addrs.insert(authority, addr); } @@ -179,13 +207,32 @@ async fn overseer_signal_active_leaves(overseer: &mut VirtualOverseer, leaf: Has .expect("signal send timeout"); } +fn make_session_info() -> SessionInfo { + let all_validator_indices: Vec<_> = (0..6).map(ValidatorIndex::from).collect(); + SessionInfo { + active_validator_indices: all_validator_indices.clone(), + random_seed: [0; 32], + dispute_period: 6, + validators: AUTHORITY_KEYRINGS.iter().map(|k| k.public().into()).collect(), + discovery_keys: AUTHORITIES.clone(), + assignment_keys: AUTHORITY_KEYRINGS.iter().map(|k| k.public().into()).collect(), + validator_groups: vec![all_validator_indices], + n_cores: 1, + zeroth_delay_tranche_width: 1, + relay_vrf_modulo_samples: 1, + n_delay_tranches: 1, + no_show_slots: 1, + needed_approvals: 1, + } +} + async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { let msg = overseer.recv().timeout(TIMEOUT).await.expect("msg recv timeout"); msg } -async fn test_neighbors(overseer: &mut VirtualOverseer) { +async fn test_neighbors(overseer: &mut VirtualOverseer, expected_session: SessionIndex) { assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -209,11 +256,17 @@ async fn test_neighbors(overseer: &mut VirtualOverseer) { assert_matches!( overseer_recv(overseer).await, AllMessages::NetworkBridge(NetworkBridgeMessage::NewGossipTopology { - our_neighbors, + session: got_session, + our_neighbors_x, + our_neighbors_y, }) => { - let mut got: Vec<_> = our_neighbors.into_iter().collect(); - got.sort(); - assert_eq!(got, NEIGHBORS.clone()); + assert_eq!(expected_session, got_session); + let mut got_row: Vec<_> = our_neighbors_x.into_iter().collect(); + let mut got_column: Vec<_> = our_neighbors_y.into_iter().collect(); + got_row.sort(); + got_column.sort(); + assert_eq!(got_row, ROW_NEIGHBORS.clone()); + assert_eq!(got_column, COLUMN_NEIGHBORS.clone()); } ); } @@ -235,6 +288,18 @@ fn issues_a_connection_request_on_new_session() { } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -252,23 +317,12 @@ fn issues_a_connection_request_on_new_session() { validator_addrs, peer_set, }) => { - assert_eq!(validator_addrs, get_other_authorities_addrs().await); + assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone()).await); assert_eq!(peer_set, PeerSet::Validation); } ); - test_neighbors(overseer).await; - - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(1, sender), - )) => { - assert_eq!(relay_parent, hash); - sender.send(Ok(None)).unwrap(); - } - ); + test_neighbors(overseer, 1).await; virtual_overseer }); @@ -313,6 +367,18 @@ fn issues_a_connection_request_on_new_session() { } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 2); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -330,28 +396,83 @@ fn issues_a_connection_request_on_new_session() { validator_addrs, peer_set, }) => { - assert_eq!(validator_addrs, get_other_authorities_addrs().await); + assert_eq!(validator_addrs, get_multiaddrs(AUTHORITIES_WITHOUT_US.clone()).await); assert_eq!(peer_set, PeerSet::Validation); } ); - test_neighbors(overseer).await; + test_neighbors(overseer, 2).await; + + virtual_overseer + }); + assert_eq!(state.last_session_index, Some(2)); + assert!(state.last_failure.is_none()); +} + +#[test] +fn issues_connection_request_to_past_present_future() { + let hash = Hash::repeat_byte(0xAA); + test_harness(make_subsystem(), |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent, - RuntimeApiRequest::SessionInfo(2, sender), + RuntimeApiRequest::Authorities(tx), )) => { assert_eq!(relay_parent, hash); - sender.send(Ok(None)).unwrap(); + tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap(); } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + }) => { + let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES + .iter() + .cloned() + .filter(|p| p != &Sr25519Keyring::Ferdie.public().into()) + .collect(); + + let addrs = get_multiaddrs(all_without_ferdie).await; + + assert_eq!(validator_addrs, addrs); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + // Ensure neighbors are unaffected + test_neighbors(overseer, 1).await; + virtual_overseer }); - assert_eq!(state.last_session_index, Some(2)); - assert!(state.last_failure.is_none()); } #[test] @@ -407,6 +528,18 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -424,7 +557,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { validator_addrs, peer_set, }) => { - let mut expected = get_other_authorities_addrs_map().await; + let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone()).await; expected.remove(&alice); expected.remove(&bob); let expected: HashSet = expected.into_iter().map(|(_,v)| v.into_iter()).flatten().collect(); @@ -433,18 +566,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { } ); - test_neighbors(overseer).await; - - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionInfo(1, sender), - )) => { - assert_eq!(relay_parent, hash); - sender.send(Ok(None)).unwrap(); - } - ); + test_neighbors(overseer, 1).await; virtual_overseer }) @@ -470,6 +592,19 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { tx.send(Ok(1)).unwrap(); } ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionInfo(s, tx), + )) => { + assert_eq!(relay_parent, hash); + assert_eq!(s, 1); + tx.send(Ok(Some(make_session_info()))).unwrap(); + } + ); + assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -487,7 +622,7 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { validator_addrs, peer_set, }) => { - let mut expected = get_other_authorities_addrs_map().await; + let mut expected = get_address_map(AUTHORITIES_WITHOUT_US.clone()).await; expected.remove(&bob); let expected: HashSet = expected.into_iter().map(|(_,v)| v.into_iter()).flatten().collect(); assert_eq!(validator_addrs.into_iter().map(|v| v.into_iter()).flatten().collect::>(), expected); @@ -504,18 +639,23 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { #[test] fn test_matrix_neighbors() { - for (our_index, len, expected) in vec![ - (0usize, 1usize, vec![]), - (1, 2, vec![0usize]), - (0, 9, vec![1, 2, 3, 6]), - (9, 10, vec![0, 3, 6]), - (10, 11, vec![1, 4, 7, 9]), - (7, 11, vec![1, 4, 6, 8, 10]), + for (our_index, len, expected_row, expected_column) in vec![ + (0usize, 1usize, vec![], vec![]), + (1, 2, vec![], vec![0usize]), + (0, 9, vec![1, 2], vec![3, 6]), + (9, 10, vec![], vec![0, 3, 6]), + (10, 11, vec![9], vec![1, 4, 7]), + (7, 11, vec![6, 8], vec![1, 4, 10]), ] .into_iter() { - let mut result: Vec<_> = matrix_neighbors(our_index, len).collect(); - result.sort(); - assert_eq!(result, expected); + let matrix = matrix_neighbors(our_index, len); + let mut row_result: Vec<_> = matrix.row_neighbors.collect(); + let mut column_result: Vec<_> = matrix.column_neighbors.collect(); + row_result.sort(); + column_result.sort(); + + assert_eq!(row_result, expected_row); + assert_eq!(column_result, expected_column); } } diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index d54994797733..156fe2fb5223 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -24,6 +24,7 @@ fatality = "0.0.6" [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } assert_matches = "1.4.0" +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index 0bf43b883cd3..04a163f3883a 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -1632,7 +1632,14 @@ async fn handle_network_update( }); } }, - NetworkBridgeEvent::NewGossipTopology(new_peers) => { + NetworkBridgeEvent::NewGossipTopology(topology) => { + // Combine all peers in the x & y direction as we don't make any distinction. + let new_peers: HashSet = topology + .our_neighbors_x + .values() + .chain(topology.our_neighbors_y.values()) + .flat_map(|peer_info| peer_info.peer_ids.iter().cloned()) + .collect(); let _ = metrics.time_network_bridge_update_v1("new_gossip_topology"); let newly_added: Vec = new_peers.difference(gossip_peers).cloned().collect(); *gossip_peers = new_peers; diff --git a/node/network/statement-distribution/src/tests.rs b/node/network/statement-distribution/src/tests.rs index 9e91ac5ba650..c20aa3dccece 100644 --- a/node/network/statement-distribution/src/tests.rs +++ b/node/network/statement-distribution/src/tests.rs @@ -34,11 +34,12 @@ use polkadot_primitives_test_helpers::{ }; use polkadot_subsystem::{ jaeger, - messages::{RuntimeApiMessage, RuntimeApiRequest}, + messages::{network_bridge_event, RuntimeApiMessage, RuntimeApiRequest}, ActivatedLeaf, LeafStatus, }; use sc_keystore::LocalKeystore; use sp_application_crypto::{sr25519::Pair, AppKey, Pair as TraitPair}; +use sp_authority_discovery::AuthorityPair; use sp_keyring::Sr25519Keyring; use sp_keystore::{CryptoStore, SyncCryptoStore, SyncCryptoStorePtr}; use std::{iter::FromIterator as _, sync::Arc, time::Duration}; @@ -1964,12 +1965,34 @@ fn handle_multiple_seconded_statements() { // Explicitly add all `lucky` peers to the gossip peers to ensure that neither `peerA` not `peerB` // receive statements + let gossip_topology = { + let mut t = network_bridge_event::NewGossipTopology { + session: 1, + our_neighbors_x: HashMap::new(), + our_neighbors_y: HashMap::new(), + }; + + // This is relying on the fact that statement distribution + // just extracts the peer IDs from this struct and does nothing else + // with it. + for (i, peer) in lucky_peers.iter().enumerate() { + let authority_id = AuthorityPair::generate().0.public(); + t.our_neighbors_x.insert( + authority_id, + network_bridge_event::TopologyPeerInfo { + peer_ids: vec![peer.clone()], + validator_index: (i as u32).into(), + }, + ); + } + + t + }; + handle .send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::NewGossipTopology( - lucky_peers.iter().cloned().collect::>(), - ), + NetworkBridgeEvent::NewGossipTopology(gossip_topology), ), }) .await; diff --git a/node/primitives/src/approval.rs b/node/primitives/src/approval.rs index d53a37ed4e7b..ab9a4ce012bd 100644 --- a/node/primitives/src/approval.rs +++ b/node/primitives/src/approval.rs @@ -21,8 +21,8 @@ pub use sp_consensus_vrf::schnorrkel::{Randomness, VRFOutput, VRFProof}; use parity_scale_codec::{Decode, Encode}; use polkadot_primitives::v2::{ - BlockNumber, CandidateHash, CandidateIndex, CoreIndex, Hash, Header, ValidatorIndex, - ValidatorSignature, + BlockNumber, CandidateHash, CandidateIndex, CoreIndex, Hash, Header, SessionIndex, + ValidatorIndex, ValidatorSignature, }; use sp_application_crypto::ByteArray; use sp_consensus_babe as babe_primitives; @@ -128,6 +128,8 @@ pub struct BlockApprovalMeta { pub candidates: Vec, /// The consensus slot of the block. pub slot: Slot, + /// The session of the block. + pub session: SessionIndex, } /// Errors that can occur during the approvals protocol. diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 8657ec16283b..4451f7c4fd61 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -49,7 +49,7 @@ use polkadot_primitives::v2::{ }; use polkadot_statement_table::v2::Misbehavior; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, time::Duration, }; @@ -378,9 +378,20 @@ pub enum NetworkBridgeMessage { /// Inform the distribution subsystems about the new /// gossip network topology formed. NewGossipTopology { - /// Ids of our neighbors in the new gossip topology. - /// We're not necessarily connected to all of them, but we should. - our_neighbors: HashSet, + /// The session info this gossip topology is concerned with. + session: SessionIndex, + /// Ids of our neighbors in the X dimensions of the new gossip topology, + /// along with their validator indices within the session. + /// + /// We're not necessarily connected to all of them, but we should + /// try to be. + our_neighbors_x: HashMap, + /// Ids of our neighbors in the X dimensions of the new gossip topology, + /// along with their validator indices within the session. + /// + /// We're not necessarily connected to all of them, but we should + /// try to be. + our_neighbors_y: HashMap, }, } diff --git a/node/subsystem-types/src/messages/network_bridge_event.rs b/node/subsystem-types/src/messages/network_bridge_event.rs index bcdeee1d557a..91facbc6fe1f 100644 --- a/node/subsystem-types/src/messages/network_bridge_event.rs +++ b/node/subsystem-types/src/messages/network_bridge_event.rs @@ -14,12 +14,36 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::HashSet; +use std::{ + collections::{HashMap, HashSet}, + convert::TryFrom, +}; pub use sc_network::{PeerId, ReputationChange}; use polkadot_node_network_protocol::{ObservedRole, OurView, View, WrongVariant}; -use polkadot_primitives::v2::AuthorityDiscoveryId; +use polkadot_primitives::v2::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex}; + +/// Information about a peer in the gossip topology for a session. +#[derive(Debug, Clone, PartialEq)] +pub struct TopologyPeerInfo { + /// The validator's known peer IDs. + pub peer_ids: Vec, + /// The index of the validator in the discovery keys of the corresponding + /// `SessionInfo`. This can extend _beyond_ the set of active parachain validators. + pub validator_index: ValidatorIndex, +} + +/// A struct indicating new gossip topology. +#[derive(Debug, Clone, PartialEq)] +pub struct NewGossipTopology { + /// The session index this topology corresponds to. + pub session: SessionIndex, + /// Neighbors in the 'X' dimension of the grid. + pub our_neighbors_x: HashMap, + /// Neighbors in the 'Y' dimension of the grid. + pub our_neighbors_y: HashMap, +} /// Events from network. #[derive(Debug, Clone, PartialEq)] @@ -30,14 +54,14 @@ pub enum NetworkBridgeEvent { /// A peer has disconnected. PeerDisconnected(PeerId), - /// Our neighbors in the new gossip topology. + /// Our neighbors in the new gossip topology for the session. /// We're not necessarily connected to all of them. /// /// This message is issued only on the validation peer set. /// /// Note, that the distribution subsystems need to handle the last /// view update of the newly added gossip peers manually. - NewGossipTopology(HashSet), + NewGossipTopology(NewGossipTopology), /// Peer has sent a message. PeerMessage(PeerId, M), @@ -77,8 +101,8 @@ impl NetworkBridgeEvent { NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone(), authority_id.clone()), NetworkBridgeEvent::PeerDisconnected(ref peer) => NetworkBridgeEvent::PeerDisconnected(peer.clone()), - NetworkBridgeEvent::NewGossipTopology(ref peers) => - NetworkBridgeEvent::NewGossipTopology(peers.clone()), + NetworkBridgeEvent::NewGossipTopology(ref topology) => + NetworkBridgeEvent::NewGossipTopology(topology.clone()), NetworkBridgeEvent::PeerViewChange(ref peer, ref view) => NetworkBridgeEvent::PeerViewChange(peer.clone(), view.clone()), NetworkBridgeEvent::OurViewChange(ref view) => diff --git a/roadmap/implementers-guide/src/node/approval/approval-distribution.md b/roadmap/implementers-guide/src/node/approval/approval-distribution.md index c8d8f60ae699..9afc53c7e777 100644 --- a/roadmap/implementers-guide/src/node/approval/approval-distribution.md +++ b/roadmap/implementers-guide/src/node/approval/approval-distribution.md @@ -22,6 +22,16 @@ For assignments, what we need to be checking is whether we are aware of the (blo However, awareness on its own of a (block, candidate) pair would imply that even ancient candidates all the way back to the genesis are relevant. We are actually not interested in anything before finality. +We gossip assignments along a grid topology produced by the [Gossip Support Subsystem](../utility/gossip-support.md) and also to a few random peers. The first time we accept an assignment or approval, regardless of the source, which originates from a validator peer in a shared dimension of the grid, we propagate the message to validator peers in the unshared dimension as well as a few random peers. + +But, in case these mechanisms don't work on their own, we need to trade bandwidth for protocol liveness by introducing aggression. + +Aggression has 3 levels: + Aggression Level 0: The basic behaviors described above. + Aggression Level 1: The originator of a message sends to all peers. Other peers follow the rules above. + Aggression Level 2: All peers send all messages to all their row and column neighbors. This means that each validator will, on average, receive each message approximately 2*sqrt(n) times. + +These aggression levels are chosen based on how long a block has taken to finalize: assignments and approvals related to the unfinalized block will be propagated with more aggression. In particular, it's only the earliest unfinalized blocks that aggression should be applied to, because descendants may be unfinalized only by virtue of being descendants. ## Protocol diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index b51015c3848e..34875816ca34 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -142,6 +142,23 @@ enum CollationProtocolV1 { These updates are posted from the [Network Bridge Subsystem](../node/utility/network-bridge.md) to other subsystems based on registered listeners. ```rust +struct NewGossipTopology { + /// The session index this topology corresponds to. + session: SessionIndex, + /// Neighbors in the 'X' dimension of the grid. + our_neighbors_x: HashMap, + /// Neighbors in the 'Y' dimension of the grid. + our_neighbors_y: HashMap, +} + +struct TopologyPeerInfo { + /// The validator's known peer IDs. + peer_ids: Vec, + /// The index of the validator in the discovery keys of the corresponding + /// `SessionInfo`. This can extend _beyond_ the set of active parachain validators. + validator_index: ValidatorIndex, +} + enum NetworkBridgeEvent { /// A peer with given ID is now connected. PeerConnected(PeerId, ObservedRole, Option>), @@ -154,7 +171,7 @@ enum NetworkBridgeEvent { /// /// Note, that the distribution subsystems need to handle the last /// view update of the newly added gossip peers manually. - NewGossipTopology(HashSet), + NewGossipTopology(NewGossipTopology), /// We received a message from the given peer. PeerMessage(PeerId, M), /// The given peer has updated its description of its view. diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 3fabdd59540f..4180f0e20d01 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -175,6 +175,8 @@ struct BlockApprovalMeta { candidates: Vec, /// The consensus slot of the block. slot: Slot, + /// The session of the block. + session: SessionIndex, } enum ApprovalDistributionMessage { @@ -553,9 +555,14 @@ enum NetworkBridgeMessage { /// Inform the distribution subsystems about the new /// gossip network topology formed. NewGossipTopology { - /// Ids of our neighbors in the new gossip topology. - /// We're not necessarily connected to all of them, but we should. - our_neighbors: HashSet, + /// The session this topology corresponds to. + session: SessionIndex, + /// Ids of our neighbors in the X dimension of the new gossip topology. + /// We're not necessarily connected to all of them, but we should try to be. + our_neighbors_x: HashSet, + /// Ids of our neighbors in the Y dimension of the new gossip topology. + /// We're not necessarily connected to all of them, but we should try to be. + our_neighbors_y: HashSet, } } ``` diff --git a/runtime/polkadot/src/lib.rs b/runtime/polkadot/src/lib.rs index ea72ba6db1bd..b703fdd43839 100644 --- a/runtime/polkadot/src/lib.rs +++ b/runtime/polkadot/src/lib.rs @@ -2036,7 +2036,7 @@ sp_api::impl_runtime_apis! { impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { fn authorities() -> Vec { - AuthorityDiscovery::authorities() + parachains_runtime_api_impl::relevant_authority_ids::() } } diff --git a/runtime/test-runtime/src/lib.rs b/runtime/test-runtime/src/lib.rs index 045844c9d4ae..e9ceaf5cc713 100644 --- a/runtime/test-runtime/src/lib.rs +++ b/runtime/test-runtime/src/lib.rs @@ -796,7 +796,7 @@ sp_api::impl_runtime_apis! { impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { fn authorities() -> Vec { - AuthorityDiscovery::authorities() + runtime_impl::relevant_authority_ids::() } }