Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs querying liveliness tokens #1374

Merged
merged 8 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,9 @@ fn declare_simple_token(
interest_id: Option<InterestId>,
send_declare: &mut SendDeclare,
) {
register_simple_token(tables, face, id, res);

propagate_simple_token(tables, res, face, send_declare);

let wire_expr = Resource::decl_key(res, face, true);
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Expand All @@ -137,6 +133,9 @@ fn declare_simple_token(
),
)
}
} else {
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}
}

Expand Down
15 changes: 8 additions & 7 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;

use super::{
face_hat, face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest,
token::declare_token_interest, HatCode, HatFace,
face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest,
queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace,
};
use crate::net::routing::{
dispatcher::{
Expand Down Expand Up @@ -132,11 +132,12 @@ impl HatInterestTrait for HatCode {
src_interest_id: id,
});

for dst_face in tables
.faces
.values_mut()
.filter(|f| f.whatami == WhatAmI::Router)
{
for dst_face in tables.faces.values_mut().filter(|f| {
f.whatami == WhatAmI::Router
|| (options.tokens()
&& f.whatami == WhatAmI::Peer
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
}) {
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(dst_face).local_interests.insert(
id,
Expand Down
17 changes: 15 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl HatBaseTrait for HatCode {
}
if face.state.whatami == WhatAmI::Peer {
get_mut_unchecked(&mut face.state).local_interests.insert(
0,
INITIAL_INTEREST_ID,
InterestState {
options: InterestOptions::ALL,
res: None,
Expand Down Expand Up @@ -418,7 +418,7 @@ struct HatFace {
impl HatFace {
fn new() -> Self {
Self {
next_id: AtomicU32::new(0),
next_id: AtomicU32::new(1), // In p2p, id 0 is erserved for initial interest
remote_interests: HashMap::new(),
local_subs: HashMap::new(),
remote_subs: HashMap::new(),
Expand All @@ -440,3 +440,16 @@ fn get_routes_entries() -> RoutesIndexes {
clients: vec![0],
}
}

// In p2p, at connection, while no interest is sent on the network,
// peers act as if they received an interest CurrentFuture with id 0
// and send back a DeclareFinal with interest_id 0.
// This 'ghost' interest is registered locally to allow tracking if
// the DeclareFinal has been received or not (finalized).

const INITIAL_INTEREST_ID: u32 = 0;

#[inline]
fn initial_interest(face: &FaceState) -> Option<&InterestState> {
face.local_interests.get(&INITIAL_INTEREST_ID)
}
10 changes: 4 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ use crate::{
resource::{NodeId, Resource, SessionContext},
tables::{Route, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources},
hat::{
p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources,
},
router::{update_data_routes_from, RoutesIndexes},
RoutingContext,
},
Expand Down Expand Up @@ -654,11 +656,7 @@ impl HatPubSubTrait for HatCode {

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::net::routing::{
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
router::{update_query_routes_from, RoutesIndexes},
RoutingContext,
};
Expand Down Expand Up @@ -604,11 +604,7 @@ impl HatQueriesTrait for HatCode {

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
Expand Down
31 changes: 25 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,30 @@ fn declare_simple_token(
face: &mut Arc<FaceState>,
id: TokenId,
res: &mut Arc<Resource>,
interest_id: Option<InterestId>,
send_declare: &mut SendDeclare,
) {
register_simple_token(tables, face, id, res);

propagate_simple_token(tables, res, face, send_declare);
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Declare {
interest_id: Some(interest.src_interest_id),
ext_qos: ext::QoSType::default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
},
res.expr(),
),
)
}
} else {
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}
}

#[inline]
Expand Down Expand Up @@ -411,7 +430,7 @@ pub(crate) fn declare_token_interest(
aggregate: bool,
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
if mode.current() {
let interest_id = (!mode.future()).then_some(id);
if let Some(res) = res.as_ref() {
if aggregate {
Expand Down Expand Up @@ -525,10 +544,10 @@ impl HatTokenTrait for HatCode {
id: TokenId,
res: &mut Arc<Resource>,
_node_id: NodeId,
_interest_id: Option<InterestId>,
interest_id: Option<InterestId>,
send_declare: &mut SendDeclare,
) {
declare_simple_token(tables, face, id, res, send_declare)
declare_simple_token(tables, face, id, res, interest_id, send_declare)
}

fn undeclare_token(
Expand Down
Loading