diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 index 1c394ff8..36cd85b7 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5 @@ -205,7 +205,10 @@ }, /// The default timeout to apply to queries in milliseconds. - queries_default_timeout: 10000, + /// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together. + /// Note: the requests to services and actions are hard-coded with an infinite timeout. Hence, this setting + /// only applies to the queries made by the Advanced Subscriber for TRANSIENT_LOCAL implementation. + queries_default_timeout: 60000, /// The routing strategy to use and it's configuration. routing: { @@ -366,6 +369,8 @@ // [ // /// Each policy associates one or multiple rules to one or multiple subject combinations // { + // /// Id is optional. If provided, it has to be unique within the policies list + // "id": "policy1", // /// Rules and Subjects are identified with their unique IDs declared above // "rules": ["rule1"], // "subjects": ["subject1", "subject2"], @@ -381,13 +386,17 @@ transport: { unicast: { /// Timeout in milliseconds when opening a link - open_timeout: 10000, + /// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together + open_timeout: 60000, /// Timeout in milliseconds when accepting a link - accept_timeout: 10000, + /// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together + accept_timeout: 60000, /// Maximum number of links in pending state while performing the handshake for accepting it - accept_pending: 100, + /// ROS setting: increase the value to support a large number of Nodes starting all together + accept_pending: 10000, /// Maximum number of transports that can be simultaneously alive for a single zenoh sessions - max_sessions: 1000, + /// ROS setting: increase the value to support a large number of Nodes starting all together + max_sessions: 10000, /// Maximum number of incoming links that are admitted per transport max_links: 1, /// Enables the LowLatency transport @@ -446,7 +455,8 @@ /// Accepted values: 8bit, 16bit, 32bit, 64bit. sequence_number_resolution: "32bit", /// Link lease duration in milliseconds to announce to other zenoh nodes - lease: 10000, + /// ROS setting: increase the value to avoid lease expiration at launch time with a large number of Nodes starting all together + lease: 60000, /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive /// messages will be sent at the configured time interval. /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, @@ -456,7 +466,9 @@ /// This is in-line with the ITU-T G.8013/Y.1731 specification on continuous connectivity /// check which considers a link as failed when no messages are received in 3.5 times the /// target interval. - keep_alive: 4, + /// ROS setting: decrease the value since Nodes are communicating over the loopback + /// where keep-alive messages have less chances to be lost. + keep_alive: 2, /// Batch size in bytes is expressed as a 16bit unsigned integer. /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). /// The default batch size value is the maximum batch size: 65535. @@ -492,6 +504,9 @@ block: { /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message /// if still no batch is available. + /// ROS setting: unlike DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5, no change here: + /// as the router is routing messages to outside the robot, possibly over WiFi, + /// keeping a lower value ensure the router is not blocked for too long in case of congestioned WiFi. wait_before_close: 5000000, }, }, diff --git a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 index d795f941..46fd77bb 100644 --- a/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 +++ b/rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5 @@ -215,7 +215,10 @@ }, /// The default timeout to apply to queries in milliseconds. - queries_default_timeout: 10000, + /// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together. + /// Note: the requests to services and actions are hard-coded with an infinite timeout. Hence, this setting + /// only applies to the queries made by the Advanced Subscriber for TRANSIENT_LOCAL implementation. + queries_default_timeout: 60000, /// The routing strategy to use and it's configuration. routing: { @@ -374,6 +377,8 @@ // [ // /// Each policy associates one or multiple rules to one or multiple subject combinations // { + // /// Id is optional. If provided, it has to be unique within the policies list + // "id": "policy1", // /// Rules and Subjects are identified with their unique IDs declared above // "rules": ["rule1"], // "subjects": ["subject1", "subject2"], @@ -389,13 +394,17 @@ transport: { unicast: { /// Timeout in milliseconds when opening a link - open_timeout: 10000, + /// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together + open_timeout: 60000, /// Timeout in milliseconds when accepting a link - accept_timeout: 10000, + /// ROS setting: increase the value to avoid timeout at launch time with a large number of Nodes starting all together + accept_timeout: 60000, /// Maximum number of links in pending state while performing the handshake for accepting it - accept_pending: 100, + /// ROS setting: increase the value to support a large number of Nodes starting all together + accept_pending: 10000, /// Maximum number of transports that can be simultaneously alive for a single zenoh sessions - max_sessions: 1000, + /// ROS setting: increase the value to support a large number of Nodes starting all together + max_sessions: 10000, /// Maximum number of incoming links that are admitted per transport max_links: 1, /// Enables the LowLatency transport @@ -454,7 +463,8 @@ /// Accepted values: 8bit, 16bit, 32bit, 64bit. sequence_number_resolution: "32bit", /// Link lease duration in milliseconds to announce to other zenoh nodes - lease: 10000, + /// ROS setting: increase the value to avoid lease expiration at launch time with a large number of Nodes starting all together + lease: 60000, /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive /// messages will be sent at the configured time interval. /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, @@ -464,7 +474,9 @@ /// This is in-line with the ITU-T G.8013/Y.1731 specification on continuous connectivity /// check which considers a link as failed when no messages are received in 3.5 times the /// target interval. - keep_alive: 4, + /// ROS setting: decrease the value since Nodes are communicating over the loopback + /// where keep-alive messages have less chances to be lost. + keep_alive: 2, /// Batch size in bytes is expressed as a 16bit unsigned integer. /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). /// The default batch size value is the maximum batch size: 65535. @@ -500,7 +512,9 @@ block: { /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message /// if still no batch is available. - wait_before_close: 5000000, + /// ROS setting: increase the value to avoid unecessary link closure at launch time where congestion is likely + /// to occur even over the loopback since all the Nodes are starting at the same time. + wait_before_close: 60000000, }, }, /// Perform batching of messages if they are smaller of the batch_size diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 23ca5e78..534b2341 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -190,7 +190,8 @@ bool NodeData::create_sub_data( std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rmw_qos_profile_t * qos_profile, + const rmw_subscription_options_t & sub_options) { std::lock_guard lock_guard(mutex_); if (is_shutdown_) { @@ -216,7 +217,8 @@ bool NodeData::create_sub_data( std::move(id), std::move(topic_name), type_support, - qos_profile); + qos_profile, + sub_options); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index e6fbede5..a76ce8b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -75,7 +75,8 @@ class NodeData final std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile); + const rmw_qos_profile_t * qos_profile, + const rmw_subscription_options_t & sub_options); // Retrieve the SubscriptionData for a given rmw_subscription_t if present. SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const subscription); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index cea729e5..697e71dc 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -60,7 +60,8 @@ std::shared_ptr SubscriptionData::make( std::size_t subscription_id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rmw_qos_profile_t * qos_profile, + const rmw_subscription_options_t & sub_options) { rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = QoS::get().best_available_qos( @@ -121,7 +122,8 @@ std::shared_ptr SubscriptionData::make( std::move(entity), std::move(session), type_support->data, - std::move(message_type_support) + std::move(message_type_support), + sub_options }); if (!sub_data->init()) { @@ -139,13 +141,15 @@ SubscriptionData::SubscriptionData( std::shared_ptr entity, std::shared_ptr session, const void * type_support_impl, - std::unique_ptr type_support) + std::unique_ptr type_support, + rmw_subscription_options_t sub_options) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), sess_(std::move(session)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), + sub_options_(std::move(sub_options)), last_known_published_msg_({}), wait_set_data_(nullptr), is_shutdown_(false), @@ -154,6 +158,7 @@ SubscriptionData::SubscriptionData( events_mgr_ = std::make_shared(); } +///============================================================================= // We have to use an "init" function here, rather than do this in the constructor, because we use // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() @@ -172,6 +177,14 @@ bool SubscriptionData::init() using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions; auto adv_sub_opts = AdvancedSubscriberOptions::create_default(); + // By default, this subscription will receive publications from publishers within and outside of + // the same Zenoh session as this subscription. + // If ignore_local_publications is true, we restrict this subscription to only receive samples + // from publishers in remote sessions. + if (sub_options_.ignore_local_publications) { + adv_sub_opts.subscriber_options.allowed_origin = ZC_LOCALITY_REMOTE; + } + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 8f37dc25..88c8f2b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -71,7 +71,8 @@ class SubscriptionData final : public std::enable_shared_from_this entity, std::shared_ptr session, const void * type_support_impl, - std::unique_ptr type_support); + std::unique_ptr type_support, + rmw_subscription_options_t sub_options); bool init(); @@ -145,6 +147,8 @@ class SubscriptionData final : public std::enable_shared_from_this type_support_; + // Subscription options. + rmw_subscription_options_t sub_options_; std::deque> message_queue_; // Map GID of a subscription to the sequence number of the message it published. std::unordered_map last_known_published_msg_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index d7310fab..0536dda1 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -985,7 +985,8 @@ rmw_create_subscription( context_impl->get_next_entity_id(), topic_name, type_support, - qos_profile)) + qos_profile, + *subscription_options)) { // Error already handled. return nullptr; diff --git a/zenoh_cpp_vendor/CMakeLists.txt b/zenoh_cpp_vendor/CMakeLists.txt index c7369920..eca36224 100644 --- a/zenoh_cpp_vendor/CMakeLists.txt +++ b/zenoh_cpp_vendor/CMakeLists.txt @@ -17,16 +17,25 @@ find_package(ament_cmake_vendor_package REQUIRED) # when expanded. set(ZENOHC_CARGO_FLAGS "--no-default-features$--features=shared-memory zenoh/transport_compression zenoh/transport_tcp zenoh/transport_udp zenoh/transport_tls") -# Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from : -# - https://github.com/eclipse-zenoh/zenoh/pull/1742, https://github.com/eclipse-zenoh/zenoh/pull/1765 -# (Add autoconnect_strategy config allowing to optimize peers interconnections) -# - https://github.com/eclipse-zenoh/zenoh/pull/1753 -# (Improve AdvancedSub for faster delivery of first receveived data) -# - https://github.com/eclipse-zenoh/zenoh-cpp/pull/407, https://github.com/eclipse-zenoh/zenoh-c/pull/913 -# (Fix potential loss of request/reply messages in case of network congestion) +# Set VCS_VERSION to include latest changes from zenoh/zenoh-c/zenoh-cpp to benefit from: +# - Reword SHM warning log about "setting scheduling priority": +# - https://github.com/eclipse-zenoh/zenoh/pull/1778 +# - Performances improvements at launch time: +# - https://github.com/eclipse-zenoh/zenoh/pull/1786 +# - https://github.com/eclipse-zenoh/zenoh/pull/1789 +# - https://github.com/eclipse-zenoh/zenoh/pull/1793 +# - Fixed open timeout +# - https://github.com/eclipse-zenoh/zenoh/pull/1796 +# - Improve ACL behaviour, notably for S-ROS +# - https://github.com/eclipse-zenoh/zenoh/pull/1781 +# - https://github.com/eclipse-zenoh/zenoh/pull/1785 +# - https://github.com/eclipse-zenoh/zenoh/pull/1795 +# - https://github.com/eclipse-zenoh/zenoh/pull/1806 +# - Reduce the number of threads in case of scouting +# - https://github.com/eclipse-zenoh/zenoh-c/pull/937 ament_vendor(zenoh_c_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git - VCS_VERSION 261493682c7dc54db3a07079315e009a2e7c1573 + VCS_VERSION 3540a3ce8126e071236352446bc1564787d3fb04 CMAKE_ARGS "-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}" "-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE" @@ -37,7 +46,7 @@ ament_export_dependencies(zenohc) ament_vendor(zenoh_cpp_vendor VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp - VCS_VERSION 5dfb68c9ac966925e59bcb52f39b9bc26c0ad6d3 + VCS_VERSION 2a127bb0f537e0028359caf1084c879330341592 CMAKE_ARGS -DZENOHCXX_ZENOHC=OFF )