Skip to content

Commit 32cf5c2

Browse files
ruth561Koichi98
andauthored
refactor(agnocastlib): remove the global variable need_epoll_updates (#1271)
* refactor(agnocastlib): remove the global variable need_epoll_updates Previously, when events managed by epoll changed, we notified each Executor to call `prepare_epoll_impl()` by setting the global atomic variable `need_epoll_updates` to true. However, this implementation will become problematic for future refactoring efforts aimed at extracting event-specific processing from `agnocast_epoll.hpp` and `.cpp`. This commit removes this global variable and introduces an alternative notification mechanism. While the current implementation only supports broadcasting notifications to all Executors—which leaves some performance challenges—it establishes a 1-to-1 tracking structure for each Executor. This lays the groundwork for implementing targeted 1-to-1 notifications in the future. Relates to: #969 Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> * test(agnocastlib): Add integration tests for epoll updates Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> * test(agnocastlib): Add unit tests for epoll updates Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> * refactor(agnocastlib): reorganize header includes Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> * refactor(agnocastlib): remove epoll_update_tracker_id_ from AgnocastOnlyExecutor Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> * fix(agnocastlib): fix segfault by moving stop_spinning to test body This commit fixes a segmentation fault in the tests. [Problem] Previously, the Executor was stopped in `TearDown()`. Since `TearDown()` runs after the test fixture is destroyed, shared variables `callback_started` referenced by timer callbacks were already deleted. When the Executor tried to run a callback using these deleted variables, it caused a crash. [Solution] We moved the Executor clean-up (`stop_spinning`) from `TearDown()` into the test body. This ensures that the Executor stops completely before the test variables are destroyed. Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> * fix(executor): explicitly add callback groups when building for Humble version When building `MultiThreadedAgnocastExecutor` for the ROS 2 Humble version, callback groups created with `automatically_add_to_executor_with_node==true` after a node has been associated with an executor are not automatically picked up. This commit explicitly calls `add_callback_groups_from_nodes_associated_to_executor()` to fix this behavior, matching the automatic handling within `get_all_callback_groups()` in the Jazzy version. Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> --------- Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp> Co-authored-by: Koichi Imai <45482193+Koichi98@users.noreply.github.com>
1 parent eff2639 commit 32cf5c2

17 files changed

Lines changed: 644 additions & 36 deletions

src/agnocastlib/CMakeLists.txt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ find_package(glog REQUIRED)
3232

3333
add_library(agnocast SHARED
3434
src/agnocast.cpp src/agnocast_utils.cpp src/agnocast_publisher.cpp src/agnocast_subscription.cpp
35-
src/agnocast_smart_pointer.cpp src/agnocast_callback_info.cpp src/agnocast_epoll.cpp src/agnocast_timer_info.cpp src/agnocast_timer.cpp src/agnocast_executor.cpp
35+
src/agnocast_smart_pointer.cpp src/agnocast_callback_info.cpp src/agnocast_epoll.cpp src/agnocast_epoll_update_dispatcher.cpp src/agnocast_timer_info.cpp src/agnocast_timer.cpp src/agnocast_executor.cpp
3636
src/agnocast_single_threaded_executor.cpp src/agnocast_multi_threaded_executor.cpp
3737
src/agnocast_callback_isolated_executor.cpp
3838
src/agnocast_tracepoint_wrapper.c src/agnocast_client.cpp
@@ -130,6 +130,7 @@ if(BUILD_TESTING)
130130
test/unit/test_agnocast_subscription.cpp
131131
test/unit/test_mocked_agnocast.cpp
132132
test/unit/test_agnocast_executors.cpp
133+
test/unit/test_agnocast_epoll_update.cpp
133134
test/unit/test_node_topics.cpp
134135
test/unit/test_cie_client_utils.cpp)
135136
target_include_directories(test_unit_${PROJECT_NAME} PRIVATE include)
@@ -194,6 +195,18 @@ if(BUILD_TESTING)
194195
LABELS "requires_kernel_module;requires_heaphook"
195196
)
196197

198+
# Epoll update integration test (requires kernel module, no mock)
199+
ament_add_gmock(test_integration_agnocast_epoll_update_${PROJECT_NAME}
200+
test/integration/test_agnocast_epoll_update.cpp)
201+
target_link_libraries(test_integration_agnocast_epoll_update_${PROJECT_NAME} agnocast)
202+
ament_target_dependencies(test_integration_agnocast_epoll_update_${PROJECT_NAME}
203+
agnocast_cie_config_msgs)
204+
set_tests_properties(test_integration_agnocast_epoll_update_${PROJECT_NAME} PROPERTIES
205+
ENVIRONMENT "GTEST_DEATH_TEST_STYLE=threadsafe;LD_PRELOAD=${CMAKE_INSTALL_PREFIX}/lib/libagnocast_heaphook.so:$ENV{LD_PRELOAD}"
206+
TIMEOUT 120
207+
LABELS "requires_kernel_module;requires_heaphook"
208+
)
209+
197210
# Integration tests for agnocast_heaphook
198211
add_library(initialize_shutdown_mock SHARED test/integration/src/initialize_shutdown_mock.cpp)
199212
set_target_properties(initialize_shutdown_mock PROPERTIES

src/agnocastlib/include/agnocast/agnocast_callback_info.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
34
#include "agnocast/agnocast_smart_pointer.hpp"
45

56
#include <mutex>
@@ -57,7 +58,6 @@ std::vector<std::string> get_agnocast_topics_by_group(
5758
extern std::mutex id2_callback_info_mtx;
5859
extern std::unordered_map<uint32_t, CallbackInfo> id2_callback_info;
5960
extern std::atomic<uint32_t> next_callback_info_id;
60-
extern std::atomic<bool> need_epoll_updates;
6161

6262
uint32_t allocate_callback_info_id();
6363

@@ -109,7 +109,7 @@ uint32_t register_callback(
109109
callback_group, erased_callback, message_creator};
110110
}
111111

112-
need_epoll_updates.store(true);
112+
EpollUpdateDispatcher::get_instance().request_update_all();
113113

114114
return callback_info_id;
115115
}

src/agnocastlib/include/agnocast/agnocast_epoll.hpp

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ namespace agnocast
1414

1515
struct AgnocastExecutable;
1616

17-
extern std::atomic<bool> need_epoll_updates;
18-
1917
constexpr uint32_t TIMER_EVENT_FLAG = 0x80000000;
2018
constexpr uint32_t CLOCK_EVENT_FLAG = 0x40000000; // For clock_eventfd events (ROS_TIME timers)
2119
constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x20000000; // For shutdown events (AgnocastOnlyExecutor)
@@ -116,28 +114,6 @@ void prepare_epoll_impl(
116114
timer_info.need_epoll_update = false;
117115
}
118116
}
119-
120-
// Check if all updates are done. Both locks must be held simultaneously to prevent
121-
// a TOCTOU race: without this, a new subscription could set need_epoll_updates=true
122-
// between the two checks (or between the last check and the store), and that update
123-
// would be lost when need_epoll_updates is overwritten to false.
124-
// Lock ordering: id2_callback_info_mtx before id2_timer_info_mtx (see declarations).
125-
{
126-
std::lock_guard<std::mutex> cb_lock(id2_callback_info_mtx);
127-
std::lock_guard<std::mutex> timer_lock(id2_timer_info_mtx);
128-
129-
const bool all_callbacks_updated = std::none_of(
130-
id2_callback_info.begin(), id2_callback_info.end(),
131-
[](const auto & it) { return it.second.need_epoll_update; });
132-
133-
const bool all_timers_updated = std::none_of(
134-
id2_timer_info.begin(), id2_timer_info.end(),
135-
[](const auto & it) { return it.second->need_epoll_update; });
136-
137-
if (all_callbacks_updated && all_timers_updated) {
138-
need_epoll_updates.store(false);
139-
}
140-
}
141117
}
142118

143119
} // namespace agnocast
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
3+
#include <atomic>
4+
#include <cstdint>
5+
#include <memory>
6+
#include <mutex>
7+
#include <unordered_map>
8+
9+
namespace agnocast
10+
{
11+
12+
class EpollUpdateTracker;
13+
14+
struct TrackerContext
15+
{
16+
std::atomic<bool> need_update{true};
17+
};
18+
19+
class EpollUpdateDispatcher
20+
{
21+
public:
22+
static EpollUpdateDispatcher & get_instance()
23+
{
24+
static EpollUpdateDispatcher instance;
25+
return instance;
26+
}
27+
28+
void request_update_all();
29+
30+
void request_update(uint64_t tracker_id);
31+
32+
EpollUpdateTracker register_tracker();
33+
34+
private:
35+
EpollUpdateDispatcher() = default;
36+
friend class EpollUpdateTracker;
37+
38+
void unregister_tracker(uint64_t tracker_id);
39+
40+
std::atomic<uint64_t> next_tracker_id_{1};
41+
42+
std::mutex mutex_;
43+
std::unordered_map<uint64_t, std::shared_ptr<TrackerContext>> trackers_;
44+
};
45+
46+
class EpollUpdateTracker
47+
{
48+
public:
49+
EpollUpdateTracker(const EpollUpdateTracker &) = delete;
50+
EpollUpdateTracker & operator=(const EpollUpdateTracker &) = delete;
51+
EpollUpdateTracker(EpollUpdateTracker && other) = delete;
52+
EpollUpdateTracker & operator=(EpollUpdateTracker && other) = delete;
53+
~EpollUpdateTracker();
54+
55+
[[nodiscard]] bool take_update_request();
56+
57+
[[nodiscard]] uint64_t id() const { return id_; }
58+
59+
private:
60+
friend class EpollUpdateDispatcher;
61+
62+
EpollUpdateTracker(uint64_t tracker_id, std::shared_ptr<TrackerContext> context)
63+
: id_(tracker_id), context_(std::move(context))
64+
{
65+
}
66+
67+
uint64_t id_;
68+
std::shared_ptr<TrackerContext> context_;
69+
};
70+
71+
} // namespace agnocast

src/agnocastlib/include/agnocast/agnocast_executor.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "agnocast/agnocast_epoll.hpp"
4+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
45
#include "agnocast/agnocast_public_api.hpp"
56
#include "rclcpp/rclcpp.hpp"
67

@@ -41,6 +42,8 @@ class AgnocastExecutor : public rclcpp::Executor
4142
bool get_next_agnocast_executable(AgnocastExecutable & agnocast_executable, const int timeout_ms);
4243
static void execute_agnocast_executable(AgnocastExecutable & agnocast_executable);
4344

45+
EpollUpdateTracker epoll_update_tracker_;
46+
4447
public:
4548
/// Construct the executor.
4649
/// @param options Executor options.

src/agnocastlib/include/agnocast/node/agnocast_only_executor.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
34
#include "agnocast/agnocast_public_api.hpp"
45
#include "rclcpp/callback_group.hpp"
56
#include "rclcpp/node_interfaces/node_base_interface.hpp"
@@ -33,6 +34,8 @@ class AgnocastOnlyExecutor
3334
int shutdown_event_fd_;
3435
pid_t my_pid_;
3536

37+
EpollUpdateTracker epoll_update_tracker_;
38+
3639
// Lock ordering: When both mutexes are needed, always acquire
3740
// ready_agnocast_executables_mutex_ before mutex_ to prevent deadlocks.
3841
std::mutex ready_agnocast_executables_mutex_;

src/agnocastlib/src/agnocast_epoll.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
namespace agnocast
88
{
99

10-
std::atomic<bool> need_epoll_updates{false};
11-
1210
bool wait_and_handle_epoll_event(
1311
const int epoll_fd, const pid_t my_pid, const int timeout_ms,
1412
std::mutex & ready_agnocast_executables_mutex,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
2+
3+
#include <atomic>
4+
#include <cstdint>
5+
6+
namespace agnocast
7+
{
8+
9+
EpollUpdateTracker EpollUpdateDispatcher::register_tracker()
10+
{
11+
uint64_t new_id = next_tracker_id_.fetch_add(1, std::memory_order_relaxed);
12+
13+
auto context = std::make_shared<TrackerContext>();
14+
15+
{
16+
std::lock_guard<std::mutex> lock(mutex_);
17+
trackers_.emplace(new_id, context);
18+
}
19+
20+
return {new_id, context};
21+
}
22+
23+
void EpollUpdateDispatcher::request_update_all()
24+
{
25+
std::lock_guard<std::mutex> lock(mutex_);
26+
for (auto & [id, context] : trackers_) {
27+
context->need_update.store(true, std::memory_order_release);
28+
}
29+
}
30+
31+
void EpollUpdateDispatcher::request_update(uint64_t tracker_id)
32+
{
33+
std::lock_guard<std::mutex> lock(mutex_);
34+
auto it = trackers_.find(tracker_id);
35+
if (it != trackers_.end()) {
36+
it->second->need_update.store(true, std::memory_order_release);
37+
}
38+
}
39+
40+
void EpollUpdateDispatcher::unregister_tracker(uint64_t tracker_id)
41+
{
42+
std::lock_guard<std::mutex> lock(mutex_);
43+
trackers_.erase(tracker_id);
44+
}
45+
46+
EpollUpdateTracker::~EpollUpdateTracker()
47+
{
48+
if (id_ != 0) {
49+
EpollUpdateDispatcher::get_instance().unregister_tracker(id_);
50+
}
51+
}
52+
53+
bool EpollUpdateTracker::take_update_request()
54+
{
55+
if (!context_) {
56+
return false;
57+
}
58+
return context_->need_update.exchange(false, std::memory_order_acquire);
59+
}
60+
61+
} // namespace agnocast

src/agnocastlib/src/agnocast_executor.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "agnocast/agnocast_executor.hpp"
22

33
#include "agnocast/agnocast.hpp"
4+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
45
#include "agnocast/agnocast_tracepoint_wrapper.h"
56
#include "rclcpp/rclcpp.hpp"
67
#include "rclcpp/version.h"
@@ -12,7 +13,10 @@ namespace agnocast
1213
{
1314

1415
AgnocastExecutor::AgnocastExecutor(const rclcpp::ExecutorOptions & options)
15-
: rclcpp::Executor(options), epoll_fd_(epoll_create1(0)), my_pid_(getpid())
16+
: rclcpp::Executor(options),
17+
epoll_fd_(epoll_create1(0)),
18+
my_pid_(getpid()),
19+
epoll_update_tracker_(EpollUpdateDispatcher::get_instance().register_tracker())
1620
{
1721
if (epoll_fd_ == -1) {
1822
RCLCPP_ERROR(logger, "epoll_create1 failed: %s", strerror(errno));
@@ -79,6 +83,15 @@ bool AgnocastExecutor::get_next_ready_agnocast_executable(AgnocastExecutable & a
7983
continue;
8084
}
8185
#else
86+
// We need to call add_callback_groups_from_nodes_associated_to_executor() to handle the
87+
// case where a CallbackGroup is created with automatically_add_to_executor_with_node==true
88+
// after the Node has been associated with the Executor.
89+
// In Jazzy, this process is handled internally within get_all_callback_groups(), so it
90+
// works fine, but in Humble, it must be called explicitly.
91+
{
92+
std::lock_guard<std::mutex> guard(mutex_);
93+
add_callback_groups_from_nodes_associated_to_executor();
94+
}
8295
if (
8396
rclcpp::Executor::get_node_by_group(
8497
rclcpp::Executor::weak_groups_to_nodes_, it->callback_group) == nullptr) {

src/agnocastlib/src/agnocast_multi_threaded_executor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ void MultiThreadedAgnocastExecutor::ros2_spin()
156156
void MultiThreadedAgnocastExecutor::agnocast_spin()
157157
{
158158
while (rclcpp::ok(this->context_) && spinning.load()) {
159-
if (need_epoll_updates.load()) {
159+
if (epoll_update_tracker_.take_update_request()) {
160160
prepare_epoll();
161161
}
162162

0 commit comments

Comments
 (0)