Skip to content

Commit

Permalink
Merge branch 'rolling' into ahcorde/rolling/windows_warning
Browse files Browse the repository at this point in the history
  • Loading branch information
Yadunund committed Mar 7, 2025
2 parents 391fc2b + bab4c93 commit 508169c
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 33 deletions.
29 changes: 22 additions & 7 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@
},

/// The default timeout to apply to queries in milliseconds.
queries_default_timeout: 10000,
/// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together.
/// Note: the requests to services and actions are hard-coded with an infinite timeout. Hence, this setting
/// only applies to the queries made by the Advanced Subscriber for TRANSIENT_LOCAL implementation.
queries_default_timeout: 60000,

/// The routing strategy to use and it's configuration.
routing: {
Expand Down Expand Up @@ -366,6 +369,8 @@
// [
// /// Each policy associates one or multiple rules to one or multiple subject combinations
// {
// /// Id is optional. If provided, it has to be unique within the policies list
// "id": "policy1",
// /// Rules and Subjects are identified with their unique IDs declared above
// "rules": ["rule1"],
// "subjects": ["subject1", "subject2"],
Expand All @@ -381,13 +386,17 @@
transport: {
unicast: {
/// Timeout in milliseconds when opening a link
open_timeout: 10000,
/// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together
open_timeout: 60000,
/// Timeout in milliseconds when accepting a link
accept_timeout: 10000,
/// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together
accept_timeout: 60000,
/// Maximum number of links in pending state while performing the handshake for accepting it
accept_pending: 100,
/// ROS setting: increase the value to support a large number of Nodes starting all together
accept_pending: 10000,
/// Maximum number of transports that can be simultaneously alive for a single zenoh sessions
max_sessions: 1000,
/// ROS setting: increase the value to support a large number of Nodes starting all together
max_sessions: 10000,
/// Maximum number of incoming links that are admitted per transport
max_links: 1,
/// Enables the LowLatency transport
Expand Down Expand Up @@ -446,7 +455,8 @@
/// Accepted values: 8bit, 16bit, 32bit, 64bit.
sequence_number_resolution: "32bit",
/// Link lease duration in milliseconds to announce to other zenoh nodes
lease: 10000,
/// ROS setting: increase the value to avoid lease expiration at launch time with a large number of Nodes starting all together
lease: 60000,
/// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive
/// messages will be sent at the configured time interval.
/// NOTE: In order to consider eventual packet loss and transmission latency and jitter,
Expand All @@ -456,7 +466,9 @@
/// This is in-line with the ITU-T G.8013/Y.1731 specification on continuous connectivity
/// check which considers a link as failed when no messages are received in 3.5 times the
/// target interval.
keep_alive: 4,
/// ROS setting: decrease the value since Nodes are communicating over the loopback
/// where keep-alive messages have less chances to be lost.
keep_alive: 2,
/// Batch size in bytes is expressed as a 16bit unsigned integer.
/// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
/// The default batch size value is the maximum batch size: 65535.
Expand Down Expand Up @@ -492,6 +504,9 @@
block: {
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
/// ROS setting: unlike DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5, no change here:
/// as the router is routing messages to outside the robot, possibly over WiFi,
/// keeping a lower value ensure the router is not blocked for too long in case of congestioned WiFi.
wait_before_close: 5000000,
},
},
Expand Down
30 changes: 22 additions & 8 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@
},

/// The default timeout to apply to queries in milliseconds.
queries_default_timeout: 10000,
/// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together.
/// Note: the requests to services and actions are hard-coded with an infinite timeout. Hence, this setting
/// only applies to the queries made by the Advanced Subscriber for TRANSIENT_LOCAL implementation.
queries_default_timeout: 60000,

/// The routing strategy to use and it's configuration.
routing: {
Expand Down Expand Up @@ -374,6 +377,8 @@
// [
// /// Each policy associates one or multiple rules to one or multiple subject combinations
// {
// /// Id is optional. If provided, it has to be unique within the policies list
// "id": "policy1",
// /// Rules and Subjects are identified with their unique IDs declared above
// "rules": ["rule1"],
// "subjects": ["subject1", "subject2"],
Expand All @@ -389,13 +394,17 @@
transport: {
unicast: {
/// Timeout in milliseconds when opening a link
open_timeout: 10000,
/// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together
open_timeout: 60000,
/// Timeout in milliseconds when accepting a link
accept_timeout: 10000,
/// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together
accept_timeout: 60000,
/// Maximum number of links in pending state while performing the handshake for accepting it
accept_pending: 100,
/// ROS setting: increase the value to support a large number of Nodes starting all together
accept_pending: 10000,
/// Maximum number of transports that can be simultaneously alive for a single zenoh sessions
max_sessions: 1000,
/// ROS setting: increase the value to support a large number of Nodes starting all together
max_sessions: 10000,
/// Maximum number of incoming links that are admitted per transport
max_links: 1,
/// Enables the LowLatency transport
Expand Down Expand Up @@ -454,7 +463,8 @@
/// Accepted values: 8bit, 16bit, 32bit, 64bit.
sequence_number_resolution: "32bit",
/// Link lease duration in milliseconds to announce to other zenoh nodes
lease: 10000,
/// ROS setting: increase the value to avoid lease expiration at launch time with a large number of Nodes starting all together
lease: 60000,
/// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive
/// messages will be sent at the configured time interval.
/// NOTE: In order to consider eventual packet loss and transmission latency and jitter,
Expand All @@ -464,7 +474,9 @@
/// This is in-line with the ITU-T G.8013/Y.1731 specification on continuous connectivity
/// check which considers a link as failed when no messages are received in 3.5 times the
/// target interval.
keep_alive: 4,
/// ROS setting: decrease the value since Nodes are communicating over the loopback
/// where keep-alive messages have less chances to be lost.
keep_alive: 2,
/// Batch size in bytes is expressed as a 16bit unsigned integer.
/// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
/// The default batch size value is the maximum batch size: 65535.
Expand Down Expand Up @@ -500,7 +512,9 @@
block: {
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
/// ROS setting: increase the value to avoid unecessary link closure at launch time where congestion is likely
/// to occur even over the loopback since all the Nodes are starting at the same time.
wait_before_close: 60000000,
},
},
/// Perform batching of messages if they are smaller of the batch_size
Expand Down
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ bool NodeData::create_sub_data(
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options)
{
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
Expand All @@ -216,7 +217,8 @@ bool NodeData::create_sub_data(
std::move(id),
std::move(topic_name),
type_support,
qos_profile);
qos_profile,
sub_options);
if (sub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class NodeData final
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile);
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options);

// Retrieve the SubscriptionData for a given rmw_subscription_t if present.
SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const subscription);
Expand Down
19 changes: 16 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::size_t subscription_id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options)
{
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = QoS::get().best_available_qos(
Expand Down Expand Up @@ -121,7 +122,8 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::move(entity),
std::move(session),
type_support->data,
std::move(message_type_support)
std::move(message_type_support),
sub_options
});

if (!sub_data->init()) {
Expand All @@ -139,13 +141,15 @@ SubscriptionData::SubscriptionData(
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
std::unique_ptr<MessageTypeSupport> type_support,
rmw_subscription_options_t sub_options)
: rmw_node_(rmw_node),
graph_cache_(std::move(graph_cache)),
entity_(std::move(entity)),
sess_(std::move(session)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
sub_options_(std::move(sub_options)),
last_known_published_msg_({}),
wait_set_data_(nullptr),
is_shutdown_(false),
Expand All @@ -154,6 +158,7 @@ SubscriptionData::SubscriptionData(
events_mgr_ = std::make_shared<EventsManager>();
}

///=============================================================================
// We have to use an "init" function here, rather than do this in the constructor, because we use
// enable_shared_from_this, which is not available in constructors.
bool SubscriptionData::init()
Expand All @@ -172,6 +177,14 @@ bool SubscriptionData::init()
using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions;
auto adv_sub_opts = AdvancedSubscriberOptions::create_default();

// By default, this subscription will receive publications from publishers within and outside of
// the same Zenoh session as this subscription.
// If ignore_local_publications is true, we restrict this subscription to only receive samples
// from publishers in remote sessions.
if (sub_options_.ignore_local_publications) {
adv_sub_opts.subscriber_options.allowed_origin = ZC_LOCALITY_REMOTE;
}

// Instantiate the subscription with suitable options depending on the
// adapted_qos_profile.
if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::size_t Subscription_id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile);
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options);

// Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down Expand Up @@ -124,7 +125,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);
std::unique_ptr<MessageTypeSupport> type_support,
rmw_subscription_options_t sub_options);

bool init();

Expand All @@ -145,6 +147,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
// Subscription options.
rmw_subscription_options_t sub_options_;
std::deque<std::unique_ptr<Message>> message_queue_;
// Map GID of a subscription to the sequence number of the message it published.
std::unordered_map<size_t, int64_t> last_known_published_msg_;
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ rmw_create_subscription(
context_impl->get_next_entity_id(),
topic_name,
type_support,
qos_profile))
qos_profile,
*subscription_options))
{
// Error already handled.
return nullptr;
Expand Down
27 changes: 18 additions & 9 deletions zenoh_cpp_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@ find_package(ament_cmake_vendor_package REQUIRED)
# when expanded.
set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_udp zenoh/transport_tls")

# Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from :
# - https://github.com/eclipse-zenoh/zenoh/pull/1742, https://github.com/eclipse-zenoh/zenoh/pull/1765
# (Add autoconnect_strategy config allowing to optimize peers interconnections)
# - https://github.com/eclipse-zenoh/zenoh/pull/1753
# (Improve AdvancedSub for faster delivery of first receveived data)
# - https://github.com/eclipse-zenoh/zenoh-cpp/pull/407, https://github.com/eclipse-zenoh/zenoh-c/pull/913
# (Fix potential loss of request/reply messages in case of network congestion)
# Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from:
# - Reword SHM warning log about "setting scheduling priority":
# - https://github.com/eclipse-zenoh/zenoh/pull/1778
# - Performances improvements at launch time:
# - https://github.com/eclipse-zenoh/zenoh/pull/1786
# - https://github.com/eclipse-zenoh/zenoh/pull/1789
# - https://github.com/eclipse-zenoh/zenoh/pull/1793
# - Fixed open timeout
# - https://github.com/eclipse-zenoh/zenoh/pull/1796
# - Improve ACL behaviour, notably for S-ROS
# - https://github.com/eclipse-zenoh/zenoh/pull/1781
# - https://github.com/eclipse-zenoh/zenoh/pull/1785
# - https://github.com/eclipse-zenoh/zenoh/pull/1795
# - https://github.com/eclipse-zenoh/zenoh/pull/1806
# - Reduce the number of threads in case of scouting
# - https://github.com/eclipse-zenoh/zenoh-c/pull/937
ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION 261493682c7dc54db3a07079315e009a2e7c1573
VCS_VERSION 3540a3ce8126e071236352446bc1564787d3fb04
CMAKE_ARGS
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
Expand All @@ -37,7 +46,7 @@ ament_export_dependencies(zenohc)

ament_vendor(zenoh_cpp_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp
VCS_VERSION 5dfb68c9ac966925e59bcb52f39b9bc26c0ad6d3
VCS_VERSION 2a127bb0f537e0028359caf1084c879330341592
CMAKE_ARGS
-DZENOHCXX_ZENOHC=OFF
)
Expand Down

0 comments on commit 508169c

Please sign in to comment.