From d83c4ed6e8787c7e8b51f47b85cce2e761b8f695 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 3 Dec 2024 03:02:37 -0500 Subject: [PATCH 1/6] Fix warnings on Windows. (#2692) For reasons I admit I do not understand, the deprecation warnings for StaticSingleThreadedExecutor on Windows happen when we construct a shared_ptr for it in the tests. If we construct a regular object, then it is fine. Luckily this test does not require a shared_ptr, so just make it a regular object here, which rixes the warning. While we are in here, make all of the tests camel case to be consistent. Signed-off-by: Chris Lalancette Signed-off-by: HarunTeper --- .../test/rclcpp/executors/test_executors.cpp | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 90508c732c..88ae1a16ba 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -388,7 +388,7 @@ to_nanoseconds_helper(DurationT duration) // - works nominally (it can execute entities) // - it can execute multiple items at once // - it does not wait for work to be available before returning -TYPED_TEST(TestExecutors, spin_some) +TYPED_TEST(TestExecutors, spinSome) { using ExecutorType = TypeParam; @@ -484,7 +484,7 @@ TYPED_TEST(TestExecutors, spin_some) // do not properly implement max_duration (it seems), so disable this test // for them in the meantime. // see: https://github.com/ros2/rclcpp/issues/2462 -TYPED_TEST(TestExecutorsStable, spin_some_max_duration) +TYPED_TEST(TestExecutorsStable, spinSomeMaxDuration) { using ExecutorType = TypeParam; @@ -834,7 +834,7 @@ TEST(TestExecutors, testSpinWithNonDefaultContext) rclcpp::shutdown(non_default_context); } -TYPED_TEST(TestExecutors, release_ownership_entity_after_spinning_cancel) +TYPED_TEST(TestExecutors, releaseOwnershipEntityAfterSpinningCancel) { using ExecutorType = TypeParam; ExecutorType executor; @@ -870,19 +870,20 @@ TYPED_TEST(TestExecutors, testRaceDropCallbackGroupFromSecondThread) } // Create an executor - auto executor = std::make_shared(); - executor->add_node(this->node); + ExecutorType executor; + executor.add_node(this->node); // Start spinning auto executor_thread = std::thread( - [executor]() { - executor->spin(); + [&executor]() { + executor.spin(); }); // As the problem is a race, we do this multiple times, // to raise our chances of hitting the problem - for(size_t i = 0; i < 10; i++) { - auto cg = this->node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + for (size_t i = 0; i < 10; i++) { + auto cg = this->node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); auto timer = this->node->create_timer(1s, [] {}, cg); // sleep a bit, so that the spin thread can pick up the callback group // and add it to the executor @@ -893,6 +894,6 @@ TYPED_TEST(TestExecutors, testRaceDropCallbackGroupFromSecondThread) // If the executor has a race, we will experience a segfault at this point. } - executor->cancel(); + executor.cancel(); executor_thread.join(); } From 72bf25aee46b13a5f53c400ab2fc3606a2b9b8d1 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 3 Dec 2024 03:03:03 -0500 Subject: [PATCH 2/6] Re-enable executor test on rmw_connextdds. (#2693) It supports the events executor now, so re-enable the test. Signed-off-by: Chris Lalancette Signed-off-by: HarunTeper --- rclcpp/test/rclcpp/executors/test_executors.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 88ae1a16ba..901806f3d2 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -861,13 +861,6 @@ TYPED_TEST(TestExecutors, releaseOwnershipEntityAfterSpinningCancel) TYPED_TEST(TestExecutors, testRaceDropCallbackGroupFromSecondThread) { using ExecutorType = TypeParam; - // rmw_connextdds doesn't support events-executor - if ( - std::is_same() && - std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) - { - GTEST_SKIP(); - } // Create an executor ExecutorType executor; From 75643b89485b4c145bc5c6408218da2c007054ec Mon Sep 17 00:00:00 2001 From: Patrick Roncagliolo Date: Wed, 4 Dec 2024 18:24:50 +0100 Subject: [PATCH 3/6] Update docstring for `rclcpp::Node::now()` (#2696) Signed-off-by: Patrick Roncagliolo Signed-off-by: HarunTeper --- rclcpp/include/rclcpp/node.hpp | 2 +- rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 930bf419f1..37f78e8351 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -1430,7 +1430,7 @@ class Node : public std::enable_shared_from_this rclcpp::Clock::ConstSharedPtr get_clock() const; - /// Returns current time from the time source specified by clock_type. + /// Returns current time from the node clock. /** * \sa rclcpp::Clock::now */ diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp index d63cc726f9..6e196c887b 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp @@ -731,7 +731,7 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, rclcpp::Clock::ConstSharedPtr get_clock() const; - /// Returns current time from the time source specified by clock_type. + /// Returns current time from the node clock. /** * \sa rclcpp::Clock::now */ From 3e91d82c7f507a9d3a0d9cb68b44b4b794d77f39 Mon Sep 17 00:00:00 2001 From: HarunTeper Date: Mon, 9 Dec 2024 14:35:06 +0100 Subject: [PATCH 4/6] Added test for starvation in the multi-threaded executor Signed-off-by: HarunTeper --- .../test_multi_threaded_executor.cpp | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index 09dfa03f90..32872482c4 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -102,3 +102,66 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { executor.add_node(node); executor.spin(); } + +/* + Test that no tasks are starved + */ +TEST_F(TestMultiThreadedExecutor, starvation) { + rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), + 2u); + + std::shared_ptr node = + std::make_shared("test_multi_threaded_executor_starvation"); + + std::atomic_int timer_one_count{0}; + std::atomic_int timer_two_count{0}; + + rclcpp::TimerBase::SharedPtr timer_one; + rclcpp::TimerBase::SharedPtr timer_two; + + auto timer_one_callback = [&timer_one_count, &timer_two_count]() { + std::cout << "Timer one callback executed. Count: " + << timer_one_count.load() << std::endl; + + auto start_time = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start_time < 100ms) { + } + + timer_one_count++; + + auto diff = std::abs(timer_one_count - timer_two_count); + + std::cout << "Difference in counts: " << diff << std::endl; + + if (diff > 1) { + rclcpp::shutdown(); + ASSERT_LE(diff, 1); + } + }; + + auto timer_two_callback = [&timer_one_count, &timer_two_count]() { + std::cout << "Timer two callback executed. Count: " + << timer_two_count.load() << std::endl; + + auto start_time = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start_time < 100ms) { + } + + timer_two_count++; + + auto diff = std::abs(timer_one_count - timer_two_count); + + std::cout << "Difference in counts: " << diff << std::endl; + + if (diff > 1) { + rclcpp::shutdown(); + ASSERT_LE(diff, 1); + } + }; + + timer_one = node->create_wall_timer(0ms, timer_one_callback); + timer_two = node->create_wall_timer(0ms, timer_two_callback); + + executor.add_node(node); + executor.spin(); +} From c5b5f2ed687ac6610e2c12f7fa93741f0469079b Mon Sep 17 00:00:00 2001 From: HarunTeper Date: Mon, 6 Jan 2025 12:24:02 +0000 Subject: [PATCH 5/6] Refactor starvation test in multi-threaded executor to use sleep instead of busy wait Signed-off-by: HarunTeper --- .../test_multi_threaded_executor.cpp | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index 32872482c4..59c9735458 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -120,12 +120,7 @@ TEST_F(TestMultiThreadedExecutor, starvation) { rclcpp::TimerBase::SharedPtr timer_two; auto timer_one_callback = [&timer_one_count, &timer_two_count]() { - std::cout << "Timer one callback executed. Count: " - << timer_one_count.load() << std::endl; - - auto start_time = std::chrono::steady_clock::now(); - while (std::chrono::steady_clock::now() - start_time < 100ms) { - } + std::this_thread::sleep_for(100ms); timer_one_count++; @@ -133,34 +128,27 @@ TEST_F(TestMultiThreadedExecutor, starvation) { std::cout << "Difference in counts: " << diff << std::endl; - if (diff > 1) { + if (timer_one_count > 10 || timer_two_count > 10) { rclcpp::shutdown(); ASSERT_LE(diff, 1); } }; auto timer_two_callback = [&timer_one_count, &timer_two_count]() { - std::cout << "Timer two callback executed. Count: " - << timer_two_count.load() << std::endl; - - auto start_time = std::chrono::steady_clock::now(); - while (std::chrono::steady_clock::now() - start_time < 100ms) { - } + std::this_thread::sleep_for(100ms); timer_two_count++; auto diff = std::abs(timer_one_count - timer_two_count); - std::cout << "Difference in counts: " << diff << std::endl; - - if (diff > 1) { + if (timer_one_count > 10 || timer_two_count > 10) { rclcpp::shutdown(); ASSERT_LE(diff, 1); } }; - timer_one = node->create_wall_timer(0ms, timer_one_callback); - timer_two = node->create_wall_timer(0ms, timer_two_callback); + timer_one = node->create_wall_timer(10ms, timer_one_callback); + timer_two = node->create_wall_timer(10ms, timer_two_callback); executor.add_node(node); executor.spin(); From 1d28b3ac20c1c13a488f7a3e9d19d5ed02eb6098 Mon Sep 17 00:00:00 2001 From: HarunTeper Date: Mon, 3 Mar 2025 13:07:04 +0000 Subject: [PATCH 6/6] Add thread-safe methods to trigger notifications and prepare work in executor Signed-off-by: HarunTeper --- rclcpp/include/rclcpp/executor.hpp | 37 ++++++++++ rclcpp/src/rclcpp/executor.cpp | 67 ++++++++++++++++++- .../executors/multi_threaded_executor.cpp | 16 ++--- 3 files changed, 110 insertions(+), 10 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index ae2087bbc5..186d8affe9 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -455,6 +455,14 @@ class Executor void execute_any_executable(AnyExecutable & any_exec); + /// Trigger the notify guard condition to wake up the executor. + /** + * This function is thread safe. + */ + RCLCPP_PUBLIC + void + trigger_executor_notify(); + /// Run subscription executable. /** * Do necessary setup and tear-down as well as executing the subscription. @@ -508,6 +516,27 @@ class Executor void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// Prepare the wait set to bee waited on. + /** + * Builds a set of waitable entities, which are passed to the middleware. + * After building wait set, waits on middleware to notify. + * \throws std::runtime_error if the wait set can be cleared + */ + RCLCPP_PUBLIC + void + prepare_work(); + + /// Block until more work becomes avilable or timeout is reached. + /** + * Builds a set of waitable entities, which are passed to the middleware. + * After building wait set, waits on middleware to notify. + * \param[in] timeout duration to wait for new work to become available. + * \throws std::runtime_error if the wait set can be cleared + */ + RCLCPP_PUBLIC + void + wait_for_work_simple(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// Check for executable in ready state and populate union structure. /** * \param[out] any_executable populated union structure of ready executable @@ -556,6 +585,10 @@ class Executor mutable std::mutex mutex_; + mutable std::mutex notify_mutex_; + + std::vector wait_for_work_cbgs_; + /// The context associated with this executor. std::shared_ptr context_; @@ -583,6 +616,10 @@ class Executor rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_); std::optional> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Previous WaitSet to be waited on. + rclcpp::WaitSet previous_wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::optional> previous_wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// Hold the current state of the collection being waited on by the waitset rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY( mutex_); diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index fd2a46368e..934510719b 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -26,6 +26,7 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" #include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/subscription_wait_set_mask.hpp" #include "rcpputils/scope_exit.hpp" @@ -480,9 +481,18 @@ Executor::execute_any_executable(AnyExecutable & any_exec) const std::shared_ptr & const_data = any_exec.data; any_exec.waitable->execute(const_data); } +} - // Reset the callback_group, regardless of type - any_exec.callback_group->can_be_taken_from().store(true); +void +Executor::trigger_executor_notify() +{ + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group change: ") + ex.what()); + } } template @@ -770,6 +780,53 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) } } +void Executor::prepare_work() +{ + { + std::lock_guard guard(mutex_); + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Rebuilding executor entities"); + this->collect_entities(); + } + + auto callback_groups = this->collector_.get_all_callback_groups(); + wait_for_work_cbgs_.resize(callback_groups.size()); + for(const auto & w_ptr : callback_groups) { + auto shr_ptr = w_ptr.lock(); + if(shr_ptr) { + wait_for_work_cbgs_.push_back(std::move(shr_ptr)); + } + } + } +} + +void +Executor::wait_for_work_simple(std::chrono::nanoseconds timeout) +{ + TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); + + this->wait_result_.reset(); + this->wait_result_.emplace(wait_set_.wait(timeout)); + + // drop references to the callback groups, before trying to execute anything + wait_for_work_cbgs_.clear(); + + if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in wait(). This should never happen."); + } else { + if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Getting wait set"); + auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); + if (current_notify_waitable_->is_ready(rcl_wait_set)) { + RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Executing notify waitable"); + current_notify_waitable_->execute(current_notify_waitable_->take_data()); + } + } + } +} + bool Executor::get_next_ready_executable(AnyExecutable & any_executable) { @@ -896,6 +953,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) bool Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout) { + notify_mutex_.lock(); bool success = false; // Check to see if there are any subscriptions or timers needing service // TODO(wjwwood): improve run to run efficiency of this function @@ -903,6 +961,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos // If there are none if (!success) { // Wait for subscriptions or timers to work on + prepare_work(); + notify_mutex_.unlock(); wait_for_work(timeout); if (!spinning.load()) { return false; @@ -910,6 +970,9 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos // Try again success = get_next_ready_executable(any_executable); } + else { + notify_mutex_.unlock(); + } return success; } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 130b4d953f..e244319860 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -99,15 +99,15 @@ MultiThreadedExecutor::run(size_t this_thread_number) execute_any_executable(any_exec); - if (any_exec.callback_group && - any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group change: ") + ex.what()); + std::lock_guard wait_lock{notify_mutex_}; + + any_exec.callback_group->can_be_taken_from().store(true); + + if (any_exec.callback_group && + any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive) + { + trigger_executor_notify(); } }