Skip to content

Commit e1fce09

Browse files
committed
refactor(agnocastlib): remove the global variable need_epoll_updates
Previously, when events managed by epoll changed, we notified each Executor to call `prepare_epoll_impl()` by setting the global atomic variable `need_epoll_updates` to true. However, this implementation will become problematic for future refactoring efforts aimed at extracting event-specific processing from `agnocast_epoll.hpp` and `.cpp`. This commit removes this global variable and introduces an alternative notification mechanism. While the current implementation only supports broadcasting notifications to all Executors—which leaves some performance challenges—it establishes a 1-to-1 tracking structure for each Executor. This lays the groundwork for implementing targeted 1-to-1 notifications in the future. Relates to: #969 Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp>
1 parent 9b1ca62 commit e1fce09

15 files changed

Lines changed: 161 additions & 36 deletions

src/agnocastlib/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ find_package(glog REQUIRED)
3232

3333
add_library(agnocast SHARED
3434
src/agnocast.cpp src/agnocast_utils.cpp src/agnocast_publisher.cpp src/agnocast_subscription.cpp
35-
src/agnocast_smart_pointer.cpp src/agnocast_callback_info.cpp src/agnocast_epoll.cpp src/agnocast_timer_info.cpp src/agnocast_timer.cpp src/agnocast_executor.cpp
35+
src/agnocast_smart_pointer.cpp src/agnocast_callback_info.cpp src/agnocast_epoll.cpp src/agnocast_epoll_update_dispatcher.cpp src/agnocast_timer_info.cpp src/agnocast_timer.cpp src/agnocast_executor.cpp
3636
src/agnocast_single_threaded_executor.cpp src/agnocast_multi_threaded_executor.cpp
3737
src/agnocast_callback_isolated_executor.cpp
3838
src/agnocast_tracepoint_wrapper.c src/agnocast_client.cpp

src/agnocastlib/include/agnocast/agnocast_callback_info.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
34
#include "agnocast/agnocast_smart_pointer.hpp"
45

56
#include <mutex>
@@ -57,7 +58,6 @@ std::vector<std::string> get_agnocast_topics_by_group(
5758
extern std::mutex id2_callback_info_mtx;
5859
extern std::unordered_map<uint32_t, CallbackInfo> id2_callback_info;
5960
extern std::atomic<uint32_t> next_callback_info_id;
60-
extern std::atomic<bool> need_epoll_updates;
6161

6262
uint32_t allocate_callback_info_id();
6363

@@ -109,7 +109,7 @@ uint32_t register_callback(
109109
callback_group, erased_callback, message_creator};
110110
}
111111

112-
need_epoll_updates.store(true);
112+
EpollUpdateDispatcher::get_instance().notify_all();
113113

114114
return callback_info_id;
115115
}

src/agnocastlib/include/agnocast/agnocast_epoll.hpp

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ namespace agnocast
1414

1515
struct AgnocastExecutable;
1616

17-
extern std::atomic<bool> need_epoll_updates;
18-
1917
constexpr uint32_t TIMER_EVENT_FLAG = 0x80000000;
2018
constexpr uint32_t CLOCK_EVENT_FLAG = 0x40000000; // For clock_eventfd events (ROS_TIME timers)
2119
constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x20000000; // For shutdown events (AgnocastOnlyExecutor)
@@ -116,28 +114,6 @@ void prepare_epoll_impl(
116114
timer_info.need_epoll_update = false;
117115
}
118116
}
119-
120-
// Check if all updates are done. Both locks must be held simultaneously to prevent
121-
// a TOCTOU race: without this, a new subscription could set need_epoll_updates=true
122-
// between the two checks (or between the last check and the store), and that update
123-
// would be lost when need_epoll_updates is overwritten to false.
124-
// Lock ordering: id2_callback_info_mtx before id2_timer_info_mtx (see declarations).
125-
{
126-
std::lock_guard<std::mutex> cb_lock(id2_callback_info_mtx);
127-
std::lock_guard<std::mutex> timer_lock(id2_timer_info_mtx);
128-
129-
const bool all_callbacks_updated = std::none_of(
130-
id2_callback_info.begin(), id2_callback_info.end(),
131-
[](const auto & it) { return it.second.need_epoll_update; });
132-
133-
const bool all_timers_updated = std::none_of(
134-
id2_timer_info.begin(), id2_timer_info.end(),
135-
[](const auto & it) { return it.second->need_epoll_update; });
136-
137-
if (all_callbacks_updated && all_timers_updated) {
138-
need_epoll_updates.store(false);
139-
}
140-
}
141117
}
142118

143119
} // namespace agnocast
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#pragma once
2+
3+
#include "sys/epoll.h"
4+
5+
#include <atomic>
6+
#include <memory>
7+
#include <mutex>
8+
#include <unordered_map>
9+
10+
namespace agnocast
11+
{
12+
13+
class EpollUpdateTracker;
14+
15+
struct TrackerContext
16+
{
17+
std::atomic<bool> need_update{true};
18+
};
19+
20+
class EpollUpdateDispatcher
21+
{
22+
public:
23+
static EpollUpdateDispatcher & get_instance()
24+
{
25+
static EpollUpdateDispatcher instance;
26+
return instance;
27+
}
28+
29+
void notify_all();
30+
31+
EpollUpdateTracker create_tracker();
32+
33+
private:
34+
EpollUpdateDispatcher() = default;
35+
friend class EpollUpdateTracker;
36+
37+
void unregister(int tracker_id);
38+
39+
std::atomic<int> next_tracker_id_{1};
40+
41+
std::mutex mutex_;
42+
std::unordered_map<int, std::shared_ptr<TrackerContext>> trackers_;
43+
};
44+
45+
class EpollUpdateTracker
46+
{
47+
public:
48+
EpollUpdateTracker(const EpollUpdateTracker &) = delete;
49+
EpollUpdateTracker & operator=(const EpollUpdateTracker &) = delete;
50+
51+
EpollUpdateTracker(EpollUpdateTracker && other) noexcept;
52+
EpollUpdateTracker & operator=(EpollUpdateTracker && other) noexcept;
53+
54+
~EpollUpdateTracker();
55+
56+
[[nodiscard]] bool need_update() const;
57+
58+
private:
59+
friend class EpollUpdateDispatcher;
60+
61+
EpollUpdateTracker(int tracker_id, std::shared_ptr<TrackerContext> context)
62+
: id_(tracker_id), context_(std::move(context))
63+
{
64+
}
65+
66+
int id_;
67+
std::shared_ptr<TrackerContext> context_;
68+
};
69+
70+
} // namespace agnocast

src/agnocastlib/include/agnocast/agnocast_executor.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class AgnocastExecutor : public rclcpp::Executor
4141
bool get_next_agnocast_executable(AgnocastExecutable & agnocast_executable, const int timeout_ms);
4242
static void execute_agnocast_executable(AgnocastExecutable & agnocast_executable);
4343

44+
EpollUpdateTracker epoll_update_tracker_;
45+
4446
public:
4547
/// Construct the executor.
4648
/// @param options Executor options.

src/agnocastlib/include/agnocast/node/agnocast_only_executor.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
34
#include "agnocast/agnocast_public_api.hpp"
45
#include "rclcpp/callback_group.hpp"
56
#include "rclcpp/node_interfaces/node_base_interface.hpp"
@@ -33,6 +34,8 @@ class AgnocastOnlyExecutor
3334
int shutdown_event_fd_;
3435
pid_t my_pid_;
3536

37+
EpollUpdateTracker epoll_update_tracker_;
38+
3639
// Lock ordering: When both mutexes are needed, always acquire
3740
// ready_agnocast_executables_mutex_ before mutex_ to prevent deadlocks.
3841
std::mutex ready_agnocast_executables_mutex_;

src/agnocastlib/src/agnocast_epoll.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
namespace agnocast
88
{
99

10-
std::atomic<bool> need_epoll_updates{false};
11-
1210
bool wait_and_handle_epoll_event(
1311
const int epoll_fd, const pid_t my_pid, const int timeout_ms,
1412
std::mutex & ready_agnocast_executables_mutex,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
2+
3+
#include <atomic>
4+
5+
namespace agnocast
6+
{
7+
8+
EpollUpdateTracker EpollUpdateDispatcher::create_tracker()
9+
{
10+
int new_id = next_tracker_id_.fetch_add(1, std::memory_order_relaxed);
11+
12+
auto context = std::make_shared<TrackerContext>();
13+
14+
{
15+
std::lock_guard<std::mutex> lock(mutex_);
16+
trackers_.emplace(new_id, context);
17+
}
18+
19+
return {new_id, context};
20+
}
21+
22+
void EpollUpdateDispatcher::notify_all()
23+
{
24+
std::lock_guard<std::mutex> lock(mutex_);
25+
for (auto & [id, context] : trackers_) {
26+
context->need_update.store(true, std::memory_order_release);
27+
}
28+
}
29+
30+
void EpollUpdateDispatcher::unregister(int tracker_id)
31+
{
32+
std::lock_guard<std::mutex> lock(mutex_);
33+
trackers_.erase(tracker_id);
34+
}
35+
36+
EpollUpdateTracker::EpollUpdateTracker(EpollUpdateTracker && other) noexcept
37+
: id_(other.id_), context_(std::move(other.context_))
38+
{
39+
other.id_ = 0;
40+
}
41+
42+
EpollUpdateTracker & EpollUpdateTracker::operator=(EpollUpdateTracker && other) noexcept
43+
{
44+
if (this != &other) {
45+
if (id_ != 0) {
46+
EpollUpdateDispatcher::get_instance().unregister(id_);
47+
}
48+
id_ = other.id_;
49+
context_ = std::move(other.context_);
50+
51+
other.id_ = 0;
52+
}
53+
return *this;
54+
}
55+
56+
EpollUpdateTracker::~EpollUpdateTracker()
57+
{
58+
if (id_ != 0) {
59+
EpollUpdateDispatcher::get_instance().unregister(id_);
60+
}
61+
}
62+
63+
bool EpollUpdateTracker::need_update() const
64+
{
65+
if (!context_) {
66+
return false;
67+
}
68+
return context_->need_update.exchange(false, std::memory_order_acquire);
69+
}
70+
71+
} // namespace agnocast

src/agnocastlib/src/agnocast_executor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "agnocast/agnocast_executor.hpp"
22

33
#include "agnocast/agnocast.hpp"
4+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
45
#include "agnocast/agnocast_tracepoint_wrapper.h"
56
#include "rclcpp/rclcpp.hpp"
67
#include "rclcpp/version.h"
@@ -12,7 +13,10 @@ namespace agnocast
1213
{
1314

1415
AgnocastExecutor::AgnocastExecutor(const rclcpp::ExecutorOptions & options)
15-
: rclcpp::Executor(options), epoll_fd_(epoll_create1(0)), my_pid_(getpid())
16+
: rclcpp::Executor(options),
17+
epoll_fd_(epoll_create1(0)),
18+
my_pid_(getpid()),
19+
epoll_update_tracker_(EpollUpdateDispatcher::get_instance().create_tracker())
1620
{
1721
if (epoll_fd_ == -1) {
1822
RCLCPP_ERROR(logger, "epoll_create1 failed: %s", strerror(errno));

src/agnocastlib/src/agnocast_multi_threaded_executor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ void MultiThreadedAgnocastExecutor::ros2_spin()
156156
void MultiThreadedAgnocastExecutor::agnocast_spin()
157157
{
158158
while (rclcpp::ok(this->context_) && spinning.load()) {
159-
if (need_epoll_updates.load()) {
159+
if (epoll_update_tracker_.need_update()) {
160160
prepare_epoll();
161161
}
162162

0 commit comments

Comments
 (0)