Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make zenoh-ext unstable except serialization #1492

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ prost = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "time", "io-std"] }
zenoh = { workspace = true, default-features = true }
zenoh-ext = { workspace = true }
zenoh-ext = { workspace = true, features = ["unstable"] }

[dev-dependencies]
rand = { workspace = true, features = ["default"] }
Expand Down
4 changes: 1 addition & 3 deletions zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ description = "Zenoh: extensions to the client API."
maintenance = { status = "actively-developed" }

[features]
unstable = []
default = []
shared-memory = ["zenoh/shared-memory"]
unstable = ["zenoh/unstable", "zenoh/internal"]

[dependencies]
tokio = { workspace = true, features = [
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tokio = { workspace = true, features = ["rt", "sync", "time", "macros", "io-std"
futures = { workspace = true }
zenoh = { workspace = true, features = ["unstable"], default-features = false }
clap = { workspace = true, features = ["derive"] }
zenoh-ext = { workspace = true }
zenoh-ext = { workspace = true, features = ["unstable"] }

[dev-dependencies]
zenoh-config = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions zenoh-ext/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,37 @@ const VIEW_REFRESH_LEASE_RATIO: f32 = 0.75f32;
const DEFAULT_LEASE: Duration = Duration::from_secs(18);
const DEFAULT_PRIORITY: Priority = Priority::DataHigh;

#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct JoinEvent {
pub member: Member,
}

#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Debug)]
pub struct LeaseExpiredEvent {
pub mid: OwnedKeyExpr,
}

#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct LeaveEvent {
pub mid: OwnedKeyExpr,
}

#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Debug)]
pub struct NewLeaderEvent {
pub mid: OwnedKeyExpr,
}

#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Debug)]
struct KeepAliveEvent {
pub mid: OwnedKeyExpr,
}

#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Debug)]
enum GroupNetEvent {
Join(JoinEvent),
Expand All @@ -74,6 +80,7 @@ enum GroupNetEvent {

/// Events exposed to the user to be informed for relevant
/// changes in the group.
#[zenoh_macros::unstable]
#[derive(Serialize, Deserialize, Debug)]
pub enum GroupEvent {
Join(JoinEvent),
Expand All @@ -83,12 +90,14 @@ pub enum GroupEvent {
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[zenoh_macros::unstable]
pub enum MemberLiveliness {
Auto,
Manual,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[zenoh_macros::unstable]
pub struct Member {
mid: OwnedKeyExpr,
info: Option<String>,
Expand Down Expand Up @@ -161,6 +170,7 @@ struct GroupState {
cond: Condition,
}

#[zenoh_macros::unstable]
pub struct Group {
state: Arc<GroupState>,
task_controller: TaskController,
Expand Down
62 changes: 16 additions & 46 deletions zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,59 +11,29 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(feature = "unstable")]
pub mod group;
#[cfg(feature = "unstable")]
mod publication_cache;
#[cfg(feature = "unstable")]
mod querying_subscriber;
mod serialization;
#[cfg(feature = "unstable")]
mod session_ext;
#[cfg(feature = "unstable")]
mod subscriber_ext;

pub use publication_cache::{PublicationCache, PublicationCacheBuilder};
pub use querying_subscriber::{
FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder,
};
pub use serialization::{
pub use crate::serialization::{
z_deserialize, z_serialize, Deserialize, Serialize, VarInt, ZDeserializeError, ZDeserializer,
ZReadIter, ZSerializer,
};
pub use session_ext::SessionExt;
pub use subscriber_ext::{SubscriberBuilderExt, SubscriberForward};
use zenoh::{internal::zerror, query::Reply, sample::Sample, Result as ZResult};

/// The space of keys to use in a [`FetchingSubscriber`].
pub enum KeySpace {
User,
Liveliness,
}

/// The key space for user data.
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub struct UserSpace;

impl From<UserSpace> for KeySpace {
fn from(_: UserSpace) -> Self {
KeySpace::User
}
}

/// The key space for liveliness tokens.
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub struct LivelinessSpace;

impl From<LivelinessSpace> for KeySpace {
fn from(_: LivelinessSpace) -> Self {
KeySpace::Liveliness
}
}

pub trait ExtractSample {
fn extract(self) -> ZResult<Sample>;
}

impl ExtractSample for Reply {
fn extract(self) -> ZResult<Sample> {
self.into_result().map_err(|e| zerror!("{:?}", e).into())
}
}
#[cfg(feature = "unstable")]
pub use crate::{
publication_cache::{PublicationCache, PublicationCacheBuilder},
querying_subscriber::{
ExtractSample, FetchingSubscriber, FetchingSubscriberBuilder, KeySpace, LivelinessSpace,
QueryingSubscriberBuilder, UserSpace,
},
session_ext::SessionExt,
subscriber_ext::{SubscriberBuilderExt, SubscriberForward},
};
2 changes: 2 additions & 0 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use zenoh::{
};

/// The builder of PublicationCache, allowing to configure it.
#[zenoh_macros::unstable]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: &'a Session,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
}
}

#[zenoh_macros::unstable]
pub struct PublicationCache {
local_sub: FlumeSubscriber,
_queryable: Queryable<flume::Receiver<Query>>,
Expand Down
72 changes: 58 additions & 14 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,48 @@ use std::{

use zenoh::{
handlers::{locked, Callback, DefaultHandler, IntoHandler},
internal::zlock,
internal::{zerror, zlock},
key_expr::KeyExpr,
pubsub::Subscriber,
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
query::{QueryConsolidation, QueryTarget, Reply, ReplyKeyExpr, Selector},
sample::{Locality, Sample, SampleBuilder},
time::Timestamp,
Error, Resolvable, Resolve, Result as ZResult, Session, Wait,
};

use crate::ExtractSample;
/// The space of keys to use in a [`FetchingSubscriber`].
#[zenoh_macros::unstable]
pub enum KeySpace {
User,
Liveliness,
}

/// The key space for user data.
#[zenoh_macros::unstable]
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub struct UserSpace;

impl From<UserSpace> for KeySpace {
fn from(_: UserSpace) -> Self {
KeySpace::User
}
}

/// The key space for liveliness tokens.
#[zenoh_macros::unstable]
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub struct LivelinessSpace;

impl From<LivelinessSpace> for KeySpace {
fn from(_: LivelinessSpace) -> Self {
KeySpace::Liveliness
}
}

/// The builder of [`FetchingSubscriber`], allowing to configure it.
#[zenoh_macros::unstable]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
pub(crate) session: &'a Session,
Expand Down Expand Up @@ -125,7 +155,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
}
}

impl<'b, Handler> QueryingSubscriberBuilder<'_, 'b, crate::UserSpace, Handler> {
impl<'b, Handler> QueryingSubscriberBuilder<'_, 'b, UserSpace, Handler> {
///
///
/// Restrict the matching publications that will be receive by this [`Subscriber`]
Expand Down Expand Up @@ -204,7 +234,7 @@ where

impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
KeySpace: Into<self::KeySpace> + Clone,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
Expand All @@ -226,7 +256,7 @@ where
key_space: self.key_space,
origin: self.origin,
fetch: |cb| match key_space {
crate::KeySpace::User => match query_selector {
self::KeySpace::User => match query_selector {
Some(s) => session.get(s),
None => session.get(key_expr),
}
Expand All @@ -236,7 +266,7 @@ where
.accept_replies(query_accept_replies)
.timeout(query_timeout)
.wait(),
crate::KeySpace::Liveliness => session
self::KeySpace::Liveliness => session
.liveliness()
.get(key_expr)
.callback(cb)
Expand All @@ -253,7 +283,7 @@ where

impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
KeySpace: Into<self::KeySpace> + Clone,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
Expand Down Expand Up @@ -326,6 +356,7 @@ struct InnerState {
}

/// The builder of [`FetchingSubscriber`], allowing to configure it.
#[zenoh_macros::unstable]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct FetchingSubscriberBuilder<
'a,
Expand Down Expand Up @@ -464,7 +495,7 @@ impl<
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'_, '_, crate::UserSpace, Handler, Fetch, TryIntoSample>
> FetchingSubscriberBuilder<'_, '_, UserSpace, Handler, Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
Expand Down Expand Up @@ -521,7 +552,7 @@ impl<
TryIntoSample,
> Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
KeySpace: Into<self::KeySpace>,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
Expand All @@ -538,7 +569,7 @@ impl<
TryIntoSample,
> IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
KeySpace: Into<self::KeySpace>,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
Expand Down Expand Up @@ -582,6 +613,7 @@ where
/// }
/// # }
/// ```
#[zenoh_macros::unstable]
pub struct FetchingSubscriber<Handler> {
subscriber: Subscriber<()>,
callback: Callback<Sample>,
Expand Down Expand Up @@ -613,7 +645,7 @@ impl<Handler> FetchingSubscriber<Handler> {
conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
KeySpace: Into<self::KeySpace>,
InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand Down Expand Up @@ -657,14 +689,14 @@ impl<Handler> FetchingSubscriber<Handler> {
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.key_space.into() {
crate::KeySpace::User => conf
self::KeySpace::User => conf
.session
.declare_subscriber(&key_expr)
.with(sub_callback)
.undeclare_on_drop(conf.undeclare_on_drop)
.allowed_origin(conf.origin)
.wait()?,
crate::KeySpace::Liveliness => conf
self::KeySpace::Liveliness => conf
.session
.liveliness()
.declare_subscriber(&key_expr)
Expand Down Expand Up @@ -811,6 +843,7 @@ impl Drop for RepliesHandler {
/// .unwrap();
/// # }
/// ```
#[zenoh_macros::unstable]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct FetchBuilder<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Expand Down Expand Up @@ -882,3 +915,14 @@ where
Err(e) => tracing::debug!("Received error fetching data: {}", e),
}))
}

#[zenoh_macros::unstable]
pub trait ExtractSample {
fn extract(self) -> ZResult<Sample>;
}

impl ExtractSample for Reply {
fn extract(self) -> ZResult<Sample> {
self.into_result().map_err(|e| zerror!("{:?}", e).into())
}
}
4 changes: 4 additions & 0 deletions zenoh-ext/src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ impl ZSerializer {
}
}

pub fn serialize_n<T: Serialize>(&mut self, ts: &[T]) {
T::serialize_n(ts, self);
}

pub fn finish(self) -> ZBytes {
self.0.finish()
}
Expand Down
1 change: 1 addition & 0 deletions zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use zenoh::{key_expr::KeyExpr, session::Session, Error};
use super::PublicationCacheBuilder;

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
#[zenoh_macros::unstable]
pub trait SessionExt {
// REVIEW(fuzzypixelz): this doc test is the only one to use the programmatic configuration API..
/// Examples:
Expand Down
Loading
Loading