Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close builder #1576

Merged
merged 20 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,10 +477,12 @@ impl TransportManager {
}

pub async fn close(&self) {
self.close_with_timeout(Duration::from_secs(10)).await;
}

pub async fn close_with_timeout(&self, timeout: Duration) {
self.close_unicast().await;
self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async(timeout).await;
}

/*************************************/
Expand Down
182 changes: 182 additions & 0 deletions zenoh/src/api/builders/close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{
future::{Future, IntoFuture},
pin::Pin,
time::{Duration, Instant},
};

use async_trait::async_trait;
use tokio::{select, time::sleep};
use tracing::warn;
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use zenoh_runtime::ZRuntime;
use zenoh_task::TaskController;

/// A behavior selector for close operations used if initial timeout was reached
pub enum CloseBackoff {
/// fail immediately
Fail,
/// wait for an additional timeout, then fail if timeout reached
Wait(Duration),
}

/// A builder for close operations.
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
pub struct CloseBuilder<TCloseable: Closeable> {
closee: TCloseable::TClosee,
timeout: Duration,
backoff: CloseBackoff,
}

// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
impl<TCloseable: Closeable> CloseBuilder<TCloseable> {
pub(crate) fn new(closeable: &TCloseable) -> Self {
Self {
closee: closeable.get_closee(),
timeout: Duration::from_secs(3600),
backoff: CloseBackoff::Fail,
}
}

/// Set the timeout for close operation
///
/// # Arguments
///
/// * `timeout` - The timeout value for close operation
///
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

/// Set the backoff for close operation
///
/// # Arguments
///
/// * `backoff` - The backoff behavior for close operation
///
pub fn backoff(mut self, backoff: CloseBackoff) -> Self {
self.backoff = backoff;
self
}
}

impl<TCloseable: Closeable> Resolvable for CloseBuilder<TCloseable> {
type To = ZResult<()>;
}

impl<TCloseable: Closeable> Wait for CloseBuilder<TCloseable> {
fn wait(self) -> Self::To {
ZRuntime::Application.block_in_place(self.into_future())
}
}

impl<TCloseable: Closeable> IntoFuture for CloseBuilder<TCloseable> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(
async move {
//TODO: this is a workaround because Instant lacks saturating_add operation
let time_limit = Instant::now()
.checked_add(self.timeout)
.expect("unsupported timeout value");
let mut state = CloseState::new(time_limit, self.backoff);
state.close(&self.closee).await
}
.into_future(),
)
}
}

pub(crate) struct CloseState {
time_limit: Instant,
backoff: CloseBackoff,
}

impl CloseState {
pub(crate) fn new(time_limit: Instant, backoff: CloseBackoff) -> Self {
Self {
time_limit,
backoff,
}
}

pub(crate) async fn close(&mut self, closee: &impl Closee) -> ZResult<()> {
closee.close_inner(self).await
}

pub(crate) async fn close_future<T: Future<Output = ()>>(&mut self, future: T) -> ZResult<()> {
select! {
val = future => Ok(val),
val = self.timeout_async() => val,
}
}

async fn timeout_async(&mut self) -> ZResult<()> {
loop {
sleep(self.timeout()?).await;
}
}

fn timeout(&mut self) -> ZResult<Duration> {
let now = Instant::now();
if now > self.time_limit {
self.do_backoff(now)?;
debug_assert!(now >= self.time_limit);
}
Ok(self.time_limit - now)
}

fn do_backoff(&mut self, now: Instant) -> ZResult<()> {
match self.backoff {
CloseBackoff::Fail => bail!("Backoff failure reached"),
CloseBackoff::Wait(duration) => {
self.time_limit = now + duration;
self.backoff = CloseBackoff::Fail;
warn!(
"Backoff reached, switching to {:?} duration. Next timeout will be {:?}",
duration, self.time_limit
);
Ok(())
}
}
}
}

#[async_trait]
pub(crate) trait Closee: Send + Sync + 'static {
async fn close_inner(&self, state: &mut CloseState) -> ZResult<()>;
}

#[async_trait]
impl Closee for TaskController {
async fn close_inner(&self, state: &mut CloseState) -> ZResult<()> {
while self.terminate_all_async(state.timeout()?).await != 0 {}
Ok(())
}
}

pub(crate) trait Closeable {
type TClosee: Closee;
fn get_closee(&self) -> Self::TClosee;
}
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub(crate) mod close;
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
Expand Down
75 changes: 44 additions & 31 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use async_trait::async_trait;
use tracing::{error, info, trace, warn};
use uhlc::Timestamp;
#[cfg(feature = "internal")]
Expand Down Expand Up @@ -67,6 +68,7 @@ use zenoh_result::ZResult;
use zenoh_shm::api::client_storage::ShmClientStorage;
use zenoh_task::TaskController;

use super::builders::close::{CloseBuilder, CloseState, Closeable, Closee};
#[cfg(feature = "unstable")]
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -615,8 +617,8 @@ impl Session {
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
pub fn close(&self) -> CloseBuilder<Self> {
CloseBuilder::new(self)
}

/// Check if the session has been closed.
Expand Down Expand Up @@ -1073,40 +1075,12 @@ impl Session {
})
}
}

impl SessionInner {
pub fn zid(&self) -> ZenohId {
self.runtime.zid()
}

fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
ResolveFuture::new(async move {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
if self.owns_runtime {
info!(zid = %self.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
} else {
primitives.send_close();
}
let mut state = zwrite!(self.state);
state.queryables.clear();
state.subscribers.clear();
state.liveliness_subscribers.clear();
state.local_resources.clear();
state.remote_resources.clear();
#[cfg(feature = "unstable")]
{
state.tokens.clear();
state.matching_listeners.clear();
}
Ok(())
})
}

pub(crate) fn declare_prefix<'a>(
&'a self,
prefix: &'a str,
Expand Down Expand Up @@ -2880,3 +2854,42 @@ where
{
OpenBuilder::new(config)
}

#[async_trait]
impl Closee for Arc<SessionInner> {
async fn close_inner(&self, state: &mut CloseState) -> ZResult<()> {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};

if self.owns_runtime {
info!(zid = %self.zid(), "close session");
state.close(&self.task_controller).await?;
state.close(&self.runtime.get_closee()).await?;
} else {
state.close(&self.task_controller).await?;
primitives.send_close();
}

let mut state = zwrite!(self.state);
state.queryables.clear();
state.subscribers.clear();
state.liveliness_subscribers.clear();
state.local_resources.clear();
state.remote_resources.clear();
#[cfg(feature = "unstable")]
{
state.tokens.clear();
state.matching_listeners.clear();
}
Ok(())
}
}

impl Closeable for Session {
type TClosee = Arc<SessionInner>;

fn get_closee(&self) -> Self::TClosee {
self.0.clone()
}
}
1 change: 1 addition & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub mod session {
pub use crate::api::builders::session::{init, InitBuilder};
pub use crate::api::{
builders::{
close::{CloseBackoff, CloseBuilder},
info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder},
publisher::{SessionDeleteBuilder, SessionPutBuilder},
query::SessionGetBuilder,
Expand Down
Loading