Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid duplicate autoconnection attempt #1742

Merged
Merged
22 changes: 22 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@
/// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Each value is a list of: "peer", "router" and/or "client".
autoconnect: { router: [], peer: ["router", "peer"], client: ["router"] },
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
/// Possible options are:
/// - "always": always attempt to autoconnect, may result in redundant connection which will then be closed.
/// - "greater-zid": attempt to connect to another node only if its own zid is greater than the other's.
/// If both nodes use this strategy, only one will attempt the connection.
/// This strategy may not be suited if one of the nodes is not reachable by the other one, for example
/// because of a private IP.
/// Accepts a single value (e.g. autoconnect: "always") which applies whatever node would be auto-connected to,
/// or different values for router, peer, or client depending on the type of node detected
/// (e.g. autoconnect_strategy : { router: "always", peer: "greater-zid", client: "always" })
autoconnect_strategy: { router: "always", peer: "always", client: "always" },
/// Whether or not to listen for scout messages on UDP multicast and reply to them.
listen: true,
},
Expand All @@ -161,6 +172,17 @@
/// or different values for router or peer mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Each value is a list of: "peer" and/or "router".
autoconnect: { router: [], peer: ["router", "peer"] },
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
/// Possible options are:
/// - "always": always attempt to autoconnect, may result in redundant connection which will then be closed.
/// - "greater-zid": attempt to connect to another node only if its own zid is greater than the other's.
/// If both nodes use this strategy, only one will attempt the connection.
/// This strategy may not be suited if one of the nodes is not reachable by the other one, for example
/// because of a private IP.
/// Accepts a single value (e.g. autoconnect: "always") which applies whatever node would be auto-connected to,
/// or different values for router, peer, or client depending on the type of node detected
/// (e.g. autoconnect_strategy : { router: "always", peer: "greater-zid", client: "always" })
autoconnect_strategy: { router: "always", peer: "always", client: "always" },
},
},

Expand Down
22 changes: 22 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ pub mod scouting {
&crate::WhatAmIMatcher::empty().router();
mode_accessor!(crate::WhatAmIMatcher);
}
pub const autoconnect_strategy: crate::ModeDependentValue<crate::AutoConnectStrategy> =
crate::ModeDependentValue::Unique(crate::AutoConnectStrategy::Always);
// pub mod autoconnect_strategy {
// pub const router: &crate::AutoConnectStrategy::Always =
// &crate::AutoConnectStrategy::Always;
// pub const peer: &crate::AutoConnectStrategy::Always =
// &crate::AutoConnectStrategy::Always;
// pub const client: &crate::AutoConnectStrategy::Always =
// &crate::AutoConnectStrategy::Always;
// mode_accessor!(crate::AutoConnectStrategy);
// }
pub mod listen {
pub const router: &bool = &true;
pub const peer: &bool = &true;
Expand Down Expand Up @@ -113,6 +124,17 @@ pub mod scouting {
&crate::WhatAmIMatcher::empty();
mode_accessor!(crate::WhatAmIMatcher);
}
pub const autoconnect_strategy: crate::ModeDependentValue<crate::AutoConnectStrategy> =
crate::ModeDependentValue::Unique(crate::AutoConnectStrategy::Always);
// pub mod autoconnect_strategy {
// pub const router: &crate::AutoConnectStrategy::Always =
// &crate::AutoConnectStrategy::Always;
// pub const peer: &crate::AutoConnectStrategy::Always =
// &crate::AutoConnectStrategy::Always;
// pub const client: &crate::AutoConnectStrategy::Always =
// &crate::AutoConnectStrategy::Always;
// mode_accessor!(crate::AutoConnectStrategy);
// }
}
}

Expand Down
38 changes: 29 additions & 9 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ pub mod qos;
pub mod wrappers;

#[allow(unused_imports)]
use std::convert::TryFrom; // This is a false positive from the rust analyser
use std::convert::TryFrom;
// This is a false positive from the rust analyser
use std::{
any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak,
};
Expand Down Expand Up @@ -184,6 +185,21 @@ pub enum Permission {
Deny,
}

/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum AutoConnectStrategy {
/// Always attempt to connect to another node, may result in redundant connection which
/// will be then be closed.
#[default]
Always,
/// A node will attempt to connect to another one only if its own zid is greater than the
/// other one. If both nodes use this strategy, only one will attempt the connection.
/// This strategy may not be suited if one of the node is not reachable by the other one,
/// for example because of a private IP.
GreaterZid,
}

pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
Expand Down Expand Up @@ -301,6 +317,8 @@ validated_struct::validator! {
pub ttl: Option<u32>,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery through UDP multicast.
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
autoconnect_strategy: Option<ModeDependentValue<AutoConnectStrategy>>,
/// Whether or not to listen for scout messages on UDP multicast and reply to them.
listen: Option<ModeDependentValue<bool>>,
},
Expand All @@ -319,6 +337,8 @@ validated_struct::validator! {
target: Option<ModeDependentValue<WhatAmIMatcher>>,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery through gossip.
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
autoconnect_strategy: Option<ModeDependentValue<AutoConnectStrategy>>,
},
},

Expand Down Expand Up @@ -692,9 +712,9 @@ fn config_deser() {
&mut json5::Deserializer::from_str(
r#"{transport: { auth: { usrpwd: { user: null, password: null, dictionary_file: "file" }}}}"#,
)
.unwrap(),
.unwrap(),
)
.unwrap();
.unwrap();
assert_eq!(
config
.transport()
Expand All @@ -709,9 +729,9 @@ fn config_deser() {
&mut json5::Deserializer::from_str(
r#"{transport: { auth: { usrpwd: { user: null, password: null, user_password_dictionary: "file" }}}}"#,
)
.unwrap(),
.unwrap(),
)
.unwrap_err());
.unwrap_err());
dbg!(Config::from_file("../../DEFAULT_CONFIG.json5").unwrap());
}

Expand Down Expand Up @@ -988,15 +1008,15 @@ impl PluginsConfig {
_ => id,
};

if let Some(paths) = value.get("__path__"){
if let Some(paths) = value.get("__path__") {
let paths = match paths {
Value::String(s) => vec![s.clone()],
Value::Array(a) => a.iter().map(|s| if let Value::String(s) = s {s.clone()} else {panic!("Plugin '{}' has an invalid '__path__' configuration property (must be either string or array of strings)", id)}).collect(),
Value::Array(a) => a.iter().map(|s| if let Value::String(s) = s { s.clone() } else { panic!("Plugin '{}' has an invalid '__path__' configuration property (must be either string or array of strings)", id) }).collect(),
_ => panic!("Plugin '{}' has an invalid '__path__' configuration property (must be either string or array of strings)", id)
};
PluginLoad {id: id.clone(), name: name.clone(), paths: Some(paths), required}
PluginLoad { id: id.clone(), name: name.clone(), paths: Some(paths), required }
} else {
PluginLoad {id: id.clone(), name: name.clone(), paths: None, required}
PluginLoad { id: id.clone(), name: name.clone(), paths: None, required }
}
})
}
Expand Down
42 changes: 39 additions & 3 deletions commons/zenoh-config/src/mode_dependent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
use std::{fmt, marker::PhantomData};

use serde::{
de::{self, MapAccess, Visitor},
de::{self, IntoDeserializer, MapAccess, Visitor},
Deserialize, Serialize,
};
use zenoh_protocol::core::{EndPoint, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor};

use crate::AutoConnectStrategy;

pub trait ModeDependent<T> {
fn router(&self) -> Option<&T>;
fn peer(&self) -> Option<&T>;
Expand All @@ -35,7 +37,7 @@ pub trait ModeDependent<T> {
fn get_mut(&mut self, whatami: WhatAmI) -> Option<&mut T>;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ModeValues<T> {
#[serde(skip_serializing_if = "Option::is_none")]
pub router: Option<T>,
Expand Down Expand Up @@ -71,7 +73,7 @@ impl<T> ModeDependent<T> for ModeValues<T> {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub enum ModeDependentValue<T> {
Unique(T),
Dependent(ModeValues<T>),
Expand Down Expand Up @@ -290,6 +292,40 @@ impl<'a> serde::Deserialize<'a> for ModeDependentValue<WhatAmIMatcher> {
}
}

impl<'a> serde::Deserialize<'a> for ModeDependentValue<AutoConnectStrategy> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
struct UniqueOrDependent<U>(PhantomData<fn() -> U>);

impl<'de> Visitor<'de> for UniqueOrDependent<ModeDependentValue<AutoConnectStrategy>> {
type Value = ModeDependentValue<AutoConnectStrategy>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("AutoConnectStrategy or mode dependent AutoConnectStrategy")
}

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
let strategy = AutoConnectStrategy::deserialize(value.into_deserializer())?;
Ok(ModeDependentValue::Unique(strategy))
}

fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
ModeValues::deserialize(de::value::MapAccessDeserializer::new(map))
.map(ModeDependentValue::Dependent)
}
}
deserializer.deserialize_any(UniqueOrDependent(PhantomData))
}
}

impl<'a> serde::Deserialize<'a> for ModeDependentValue<Vec<EndPoint>> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
Expand Down
69 changes: 69 additions & 0 deletions zenoh/src/net/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use zenoh_config::{
unwrap_or_default, AutoConnectStrategy, Config, ModeDependent, ModeDependentValue,
};
use zenoh_protocol::core::{WhatAmI, WhatAmIMatcher, ZenohIdProto};

#[derive(Clone, Copy)]
pub(crate) struct AutoConnect {
zid: ZenohIdProto,
matcher: WhatAmIMatcher,
strategy: ModeDependentValue<AutoConnectStrategy>,
}

impl AutoConnect {
pub(crate) fn multicast(config: &Config, what: WhatAmI) -> Self {
Self {
zid: (*config.id()).into(),
matcher: *unwrap_or_default!(config.scouting().multicast().autoconnect().get(what)),
strategy: unwrap_or_default!(config.scouting().multicast().autoconnect_strategy()),
}
}

pub(crate) fn gossip(config: &Config, what: WhatAmI) -> Self {
Self {
zid: (*config.id()).into(),
matcher: *unwrap_or_default!(config.scouting().multicast().autoconnect().get(what)),
strategy: unwrap_or_default!(config.scouting().multicast().autoconnect_strategy()),
}
}

pub(crate) fn disabled() -> Self {
Self {
zid: ZenohIdProto::default(),
matcher: WhatAmIMatcher::empty(),
strategy: ModeDependentValue::Unique(AutoConnectStrategy::default()),
}
}

// pub(crate) fn new(
// zid: impl Into<ZenohIdProto>,
// matcher: WhatAmIMatcher,
// strategy: ModeDependentValue<AutoConnectStrategy>,
// ) -> Self {
// Self {
// zid,
// matcher,
// strategy,
// }
// }

pub(crate) fn matcher(&self) -> WhatAmIMatcher {
self.matcher
}

pub(crate) fn is_enabled(&self) -> bool {
!self.matcher.is_empty()
}

/// Returns if the node should autoconnect to the other one according to the chosen strategy.
///
/// The goal is to avoid both node to attempt connecting to each other, as it would result into
/// a waste of resource.
pub(crate) fn should_autoconnect(&self, to: ZenohIdProto, what: WhatAmI) -> bool {
let strategy = || match self.strategy.get(what).copied().unwrap_or_default() {
AutoConnectStrategy::Always => true,
AutoConnectStrategy::GreaterZid => self.zid > to,
};
self.matcher.matches(what) && strategy()
}
}
5 changes: 1 addition & 4 deletions zenoh/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
//! This module is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
#[doc(hidden)]
pub(crate) mod codec;
#[doc(hidden)]
mod common;
pub(crate) mod primitives;
#[doc(hidden)]
pub(crate) mod protocol;
#[doc(hidden)]
pub(crate) mod routing;
#[doc(hidden)]
pub mod runtime;
Expand Down
8 changes: 5 additions & 3 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
};

use token::{token_remove_node, undeclare_simple_token};
use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher};
use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI};
use zenoh_protocol::{
common::ZExtBody,
core::ZenohIdProto,
Expand Down Expand Up @@ -114,6 +114,8 @@ macro_rules! face_hat_mut {
}
use face_hat_mut;

use crate::net::common::AutoConnect;

struct TreesComputationWorker {
_task: TerminatableTask,
tx: flume::Sender<Arc<TablesLock>>,
Expand Down Expand Up @@ -189,9 +191,9 @@ impl HatBaseTrait for HatCode {
bail!("\"client\" is not allowed as gossip target")
}
let autoconnect = if gossip {
*unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami))
AutoConnect::gossip(config, whatami)
} else {
WhatAmIMatcher::empty()
AutoConnect::disabled()
};

let peer_full_linkstate =
Expand Down
Loading