Skip to content

Commit

Permalink
Merge pull request #891 from DenisBiryukov91/advanced-pub-sub-default…
Browse files Browse the repository at this point in the history
…-recovery

support for advanced pubs default recovery option without last sample miss detection
  • Loading branch information
Mallets authored Jan 16, 2025
2 parents 4f68a89 + 13a21fd commit 328736f
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 29 deletions.
3 changes: 3 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,8 @@ Types
:members:
.. doxygenstruct:: ze_advanced_subscriber_recovery_options_t
:members:
.. doxygenstruct:: ze_advanced_subscriber_last_sample_miss_detection_options_t
:members:
.. doxygenstruct:: ze_advanced_subscriber_options_t
:members:

Expand All @@ -1007,6 +1009,7 @@ Functions

.. doxygenfunction:: ze_advanced_subscriber_history_options_default
.. doxygenfunction:: ze_advanced_subscriber_recovery_options_default
.. doxygenfunction:: ze_advanced_subscriber_last_sample_miss_detection_options_default
.. doxygenfunction:: ze_advanced_subscriber_options_default

Publication Cache (deprecated)
Expand Down
4 changes: 3 additions & 1 deletion examples/z_advanced_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ int main(int argc, char** argv) {
ze_advanced_subscriber_history_options_default(&sub_opts.history); // or sub_opts.history.is_enabled = true;
sub_opts.history.detect_late_publishers = true;
ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery); // or sub_opts.recovery.is_enabled = true;
ze_advanced_subscriber_last_sample_miss_detection_options_default(&sub_opts.recovery.last_sample_miss_detection);
// or sub_opts.recovery.last_sample_miss_detection.is_enabled = true;
// use publisher heartbeats by default, otherwise enable periodic queries as follows:
// sub_opts.recovery.periodic_queries_period_ms = 1000;
// sub_opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 1000;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
Expand Down
44 changes: 35 additions & 9 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -1058,13 +1058,13 @@ typedef struct ze_moved_advanced_publisher_t {
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_publisher_sample_miss_detection_options_t {
/**
* Must be set to ``true``, to enable sample miss_detection.
* Must be set to ``true``, to enable sample miss detection.
*/
bool is_enabled;
/**
* If different from zero, the publisher will send heartbeats with the specified period, which
* can be used by Advanced Subscribers for missed sample detection (if recovery with zero query period is enabled).
* Otherwise, missed samples will be retransmitted based on Advanced Subscribers periodic queries.
* can be used by Advanced Subscribers for last sample(s) miss detection (if last sample miss detection with zero query period is enabled).
* Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries.
*/
uint64_t heartbeat_period_ms;
} ze_advanced_publisher_sample_miss_detection_options_t;
Expand All @@ -1084,7 +1084,7 @@ typedef struct ze_advanced_publisher_options_t {
*/
struct ze_advanced_publisher_cache_options_t cache;
/**
* Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
* Settings to allow matching Subscribers to detect lost samples and optionally ask for retransimission.
*
* Retransmission can only be done if cache is enabled.
*/
Expand Down Expand Up @@ -1191,23 +1191,41 @@ typedef struct ze_advanced_subscriber_history_options_t {
#endif
/**
* @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
* @brief Settings for recovering lost messages for Advanced Subscriber.
* @brief Settings for detection of the last sample(s) miss by Advanced Subscriber.
*/
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_subscriber_recovery_options_t {
typedef struct ze_advanced_subscriber_last_sample_miss_detection_options_t {
/**
* Must be set to ``true``, to enable the lost sample recovery.
* Must be set to ``true``, to enable the last sample(s) miss detection.
*/
bool is_enabled;
/**
* Period for queries for not yet received Samples.
*
* These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
* So it is useful for sporadic publications but useless for periodic publications
* with a period smaller or equal to this period. If set to 0, the missed samples will be retrieved
* based on publisher's heartbeat.
* with a period smaller or equal to this period. If set to 0, the last sample(s) miss detection will be performed
* based on publisher's heartbeat if the latter is enabled.
*/
uint64_t periodic_queries_period_ms;
} ze_advanced_subscriber_last_sample_miss_detection_options_t;
#endif
/**
* @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
* @brief Settings for recovering lost messages for Advanced Subscriber.
*/
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_subscriber_recovery_options_t {
/**
* Must be set to ``true``, to enable the lost sample recovery.
*/
bool is_enabled;
/**
* Setting for detecting last sample(s) miss.
* Note that it does not affect intermediate sample miss detection/retrieval (which is performed automatically as long as recovery is enabled).
* If this option is disabled, subscriber will be unable to detect/request retransmission of missed sample until it receives a more recent one from the same publisher.
*/
struct ze_advanced_subscriber_last_sample_miss_detection_options_t last_sample_miss_detection;
} ze_advanced_subscriber_recovery_options_t;
#endif
/**
Expand Down Expand Up @@ -5764,6 +5782,14 @@ struct z_entity_global_id_t ze_advanced_subscriber_id(const struct ze_loaned_adv
ZENOHC_API
const struct z_loaned_keyexpr_t *ze_advanced_subscriber_keyexpr(const struct ze_loaned_advanced_subscriber_t *subscriber);
#endif
/**
* @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
* @brief Constructs the default value for `ze_advanced_subscriber_last_sample_miss_detection_options_t`.
*/
#if defined(Z_FEATURE_UNSTABLE_API)
ZENOHC_API
void ze_advanced_subscriber_last_sample_miss_detection_options_default(struct ze_advanced_subscriber_last_sample_miss_detection_options_t *this_);
#endif
/**
* Borrows subscriber.
*/
Expand Down
8 changes: 4 additions & 4 deletions src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ impl From<&ze_advanced_publisher_cache_options_t> for CacheConfig {
/// @brief Settings for sample miss detection on Advanced Publisher.
#[repr(C)]
pub struct ze_advanced_publisher_sample_miss_detection_options_t {
/// Must be set to ``true``, to enable sample miss_detection.
/// Must be set to ``true``, to enable sample miss detection.
pub is_enabled: bool,
/// If different from zero, the publisher will send heartbeats with the specified period, which
/// can be used by Advanced Subscribers for missed sample detection (if recovery with zero query period is enabled).
/// Otherwise, missed samples will be retransmitted based on Advanced Subscribers periodic queries.
/// can be used by Advanced Subscribers for last sample(s) miss detection (if last sample miss detection with zero query period is enabled).
/// Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries.
pub heartbeat_period_ms: u64,
}

Expand Down Expand Up @@ -131,7 +131,7 @@ pub struct ze_advanced_publisher_options_t {
pub publisher_options: z_publisher_options_t,
/// Publisher cache settings.
pub cache: ze_advanced_publisher_cache_options_t,
/// Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
/// Settings to allow matching Subscribers to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be done if cache is enabled.
pub sample_miss_detection: ze_advanced_publisher_sample_miss_detection_options_t,
Expand Down
68 changes: 55 additions & 13 deletions src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,22 @@ impl From<&ze_advanced_subscriber_history_options_t> for HistoryConfig {
}
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for recovering lost messages for Advanced Subscriber.
#[repr(C)]
pub struct ze_advanced_subscriber_recovery_options_t {
/// Must be set to ``true``, to enable the lost sample recovery.
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for detection of the last sample(s) miss by Advanced Subscriber.
pub struct ze_advanced_subscriber_last_sample_miss_detection_options_t {
/// Must be set to ``true``, to enable the last sample(s) miss detection.
pub is_enabled: bool,
/// Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period. If set to 0, the missed samples will be retrieved
/// based on publisher's heartbeat.
/// with a period smaller or equal to this period. If set to 0, the last sample(s) miss detection will be performed
/// based on publisher's heartbeat if the latter is enabled.
pub periodic_queries_period_ms: u64,
}

impl Default for ze_advanced_subscriber_recovery_options_t {
impl Default for ze_advanced_subscriber_last_sample_miss_detection_options_t {
fn default() -> Self {
Self {
is_enabled: true,
Expand All @@ -104,6 +104,40 @@ impl Default for ze_advanced_subscriber_recovery_options_t {
}
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Constructs the default value for `ze_advanced_subscriber_last_sample_miss_detection_options_t`.
#[no_mangle]
pub extern "C" fn ze_advanced_subscriber_last_sample_miss_detection_options_default(
this: &mut MaybeUninit<ze_advanced_subscriber_last_sample_miss_detection_options_t>,
) {
this.write(ze_advanced_subscriber_last_sample_miss_detection_options_t::default());
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for recovering lost messages for Advanced Subscriber.
#[repr(C)]
pub struct ze_advanced_subscriber_recovery_options_t {
/// Must be set to ``true``, to enable the lost sample recovery.
pub is_enabled: bool,
/// Setting for detecting last sample(s) miss.
/// Note that it does not affect intermediate sample miss detection/retrieval (which is performed automatically as long as recovery is enabled).
/// If this option is disabled, subscriber will be unable to detect/request retransmission of missed sample until it receives a more recent one from the same publisher.
pub last_sample_miss_detection: ze_advanced_subscriber_last_sample_miss_detection_options_t,
}

impl Default for ze_advanced_subscriber_recovery_options_t {
fn default() -> Self {
Self {
is_enabled: true,
last_sample_miss_detection:
ze_advanced_subscriber_last_sample_miss_detection_options_t {
is_enabled: false,
..Default::default()
},
}
}
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Constructs the default value for `ze_advanced_subscriber_recovery_options_t`.
#[no_mangle]
Expand Down Expand Up @@ -185,12 +219,20 @@ fn _declare_advanced_subscriber_inner(
sub = sub.history((&options.history).into());
}
if options.recovery.is_enabled {
sub = match options.recovery.periodic_queries_period_ms {
0 => sub.recovery(RecoveryConfig::default().heartbeat()),
_ => sub.recovery(RecoveryConfig::default().periodic_queries(
Duration::from_millis(options.recovery.periodic_queries_period_ms),
)),
};
if options.recovery.last_sample_miss_detection.is_enabled {
sub = match options
.recovery
.last_sample_miss_detection
.periodic_queries_period_ms
{
0 => sub.recovery(RecoveryConfig::default().heartbeat()),
p => sub.recovery(
RecoveryConfig::default().periodic_queries(Duration::from_millis(p)),
),
};
} else {
sub = sub.recovery(RecoveryConfig::default())
}
}
}
sub
Expand Down
5 changes: 3 additions & 2 deletions tests/z_int_advanced_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ int run_publisher() {
ze_advanced_publisher_cache_options_default(&pub_opts.cache);
pub_opts.cache.max_samples = values_count;
pub_opts.publisher_detection = true;
pub_opts.sample_miss_detection.is_enabled = true; // periodic queries are expected by default
pub_opts.sample_miss_detection.is_enabled = true; // heartbeats are disabled by default

if (ze_declare_advanced_publisher(z_loan(s), &pub, z_loan(ke), &pub_opts) < 0) {
printf("Unable to declare AdvancedPublisher for key expression!\n");
Expand Down Expand Up @@ -143,7 +143,8 @@ int run_subscriber() {
sub_opts.history.detect_late_publishers = true;

ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery);
sub_opts.recovery.periodic_queries_period_ms = 1000;
ze_advanced_subscriber_last_sample_miss_detection_options_default(&sub_opts.recovery.last_sample_miss_detection);
sub_opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 1000;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
Expand Down

0 comments on commit 328736f

Please sign in to comment.