diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index e5303bbc77..f9e1674c3e 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -21,7 +21,6 @@ use std::{ any::Any, collections::{HashMap, HashSet}, sync::{atomic::AtomicU32, Arc}, - time::Duration, }; use token::{token_remove_node, undeclare_simple_token}; @@ -116,42 +115,21 @@ macro_rules! face_hat_mut { } use face_hat_mut; -struct HatTables { - linkstatepeer_subs: HashSet>, - linkstatepeer_tokens: HashSet>, - linkstatepeer_qabls: HashSet>, - linkstatepeers_net: Option, - linkstatepeers_trees_task: Option, +struct TreesComputationWorker { + _task: TerminatableTask, + tx: flume::Sender>, } -impl Drop for HatTables { - fn drop(&mut self) { - if let Some(mut task) = self.linkstatepeers_trees_task.take() { - task.terminate(Duration::from_secs(10)); - } - } -} - -impl HatTables { +impl TreesComputationWorker { fn new() -> Self { - Self { - linkstatepeer_subs: HashSet::new(), - linkstatepeer_tokens: HashSet::new(), - linkstatepeer_qabls: HashSet::new(), - linkstatepeers_net: None, - linkstatepeers_trees_task: None, - } - } - - fn schedule_compute_trees(&mut self, tables_ref: Arc) { - if self.linkstatepeers_trees_task.is_none() { - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; + let (tx, rx) = flume::bounded::>(1); + let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = rx.recv_async().await { let mut tables = zwrite!(tables_ref.tables); tracing::trace!("Compute trees"); @@ -165,15 +143,37 @@ impl HatTables { pubsub::pubsub_tree_change(&mut tables, &new_children); queries::queries_tree_change(&mut tables, &new_children); token::token_tree_change(&mut tables, &new_children); + drop(tables); + } + } + }); + Self { _task: task, tx } + } +} - tracing::trace!("Computations completed"); - hat_mut!(tables).linkstatepeers_trees_task = None; - }, - TerminatableTask::create_cancellation_token(), - ); - self.linkstatepeers_trees_task = Some(task); +struct HatTables { + linkstatepeer_subs: HashSet>, + linkstatepeer_tokens: HashSet>, + linkstatepeer_qabls: HashSet>, + linkstatepeers_net: Option, + linkstatepeers_trees_worker: TreesComputationWorker, +} + +impl HatTables { + fn new() -> Self { + Self { + linkstatepeer_subs: HashSet::new(), + linkstatepeer_tokens: HashSet::new(), + linkstatepeer_qabls: HashSet::new(), + linkstatepeers_net: None, + linkstatepeers_trees_worker: TreesComputationWorker::new(), } } + + fn schedule_compute_trees(&mut self, tables_ref: Arc) { + tracing::trace!("Schedule trees computation"); + let _ = self.linkstatepeers_trees_worker.tx.try_send(tables_ref); + } } pub(crate) struct HatCode {} diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index df1729c087..a2d3c66aa3 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -22,7 +22,6 @@ use std::{ collections::{hash_map::DefaultHasher, HashMap, HashSet}, hash::Hasher, sync::{atomic::AtomicU32, Arc}, - time::Duration, }; use token::{token_linkstate_change, token_remove_node, undeclare_simple_token}; @@ -117,6 +116,49 @@ macro_rules! face_hat_mut { } use face_hat_mut; +struct TreesComputationWorker { + _task: TerminatableTask, + tx: flume::Sender>, +} + +impl TreesComputationWorker { + fn new(net_type: WhatAmI) -> Self { + let (tx, rx) = flume::bounded::>(1); + let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + *TREES_COMPUTATION_DELAY_MS, + )) + .await; + if let Ok(tables_ref) = rx.recv_async().await { + let mut tables = zwrite!(tables_ref.tables); + + tracing::trace!("Compute trees"); + let new_children = match net_type { + WhatAmI::Router => hat_mut!(tables) + .routers_net + .as_mut() + .unwrap() + .compute_trees(), + _ => hat_mut!(tables) + .linkstatepeers_net + .as_mut() + .unwrap() + .compute_trees(), + }; + + tracing::trace!("Compute routes"); + pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); + queries::queries_tree_change(&mut tables, &new_children, net_type); + token::token_tree_change(&mut tables, &new_children, net_type); + drop(tables); + } + } + }); + Self { _task: task, tx } + } +} + struct HatTables { router_subs: HashSet>, linkstatepeer_subs: HashSet>, @@ -127,22 +169,11 @@ struct HatTables { routers_net: Option, linkstatepeers_net: Option, shared_nodes: Vec, - routers_trees_task: Option, - linkstatepeers_trees_task: Option, + routers_trees_worker: TreesComputationWorker, + linkstatepeers_trees_worker: TreesComputationWorker, router_peers_failover_brokering: bool, } -impl Drop for HatTables { - fn drop(&mut self) { - if let Some(mut task) = self.linkstatepeers_trees_task.take() { - task.terminate(Duration::from_secs(10)); - } - if let Some(mut task) = self.routers_trees_task.take() { - task.terminate(Duration::from_secs(10)); - } - } -} - impl HatTables { fn new(router_peers_failover_brokering: bool) -> Self { Self { @@ -155,8 +186,8 @@ impl HatTables { routers_net: None, linkstatepeers_net: None, shared_nodes: vec![], - routers_trees_task: None, - linkstatepeers_trees_task: None, + routers_trees_worker: TreesComputationWorker::new(WhatAmI::Router), + linkstatepeers_trees_worker: TreesComputationWorker::new(WhatAmI::Peer), router_peers_failover_brokering, } } @@ -259,49 +290,15 @@ impl HatTables { } fn schedule_compute_trees(&mut self, tables_ref: Arc, net_type: WhatAmI) { - if (net_type == WhatAmI::Router && self.routers_trees_task.is_none()) - || (net_type == WhatAmI::Peer && self.linkstatepeers_trees_task.is_none()) - { - let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, - async move { - tokio::time::sleep(std::time::Duration::from_millis( - *TREES_COMPUTATION_DELAY_MS, - )) - .await; - let mut tables = zwrite!(tables_ref.tables); - - tracing::trace!("Compute trees"); - let new_children = match net_type { - WhatAmI::Router => hat_mut!(tables) - .routers_net - .as_mut() - .unwrap() - .compute_trees(), - _ => hat_mut!(tables) - .linkstatepeers_net - .as_mut() - .unwrap() - .compute_trees(), - }; - - tracing::trace!("Compute routes"); - pubsub::pubsub_tree_change(&mut tables, &new_children, net_type); - queries::queries_tree_change(&mut tables, &new_children, net_type); - token::token_tree_change(&mut tables, &new_children, net_type); - - tracing::trace!("Computations completed"); - match net_type { - WhatAmI::Router => hat_mut!(tables).routers_trees_task = None, - _ => hat_mut!(tables).linkstatepeers_trees_task = None, - }; - }, - TerminatableTask::create_cancellation_token(), - ); - match net_type { - WhatAmI::Router => self.routers_trees_task = Some(task), - _ => self.linkstatepeers_trees_task = Some(task), - }; + tracing::trace!("Schedule trees computation"); + match net_type { + WhatAmI::Router => { + let _ = self.routers_trees_worker.tx.try_send(tables_ref); + } + WhatAmI::Peer => { + let _ = self.linkstatepeers_trees_worker.tx.try_send(tables_ref); + } + _ => (), } } }