Skip to content

Commit 02fc532

Browse files
committed
refactor(agnocastlib): extract event-specific logic from agnocast_epoll
Previously, all event-specific logic was hardcoded in `agnocast_epoll.cpp` and `.hpp`. This caused dependency issues and made it difficult to add new event types. This commit refactors the code by categorizing events and moving their implementations into separate source files. Key changes include: - Change epoll_data format from u32 to u64 to hold both event kind and local identifier. - Introduce `EpollManager` to manage and dispatch events from epoll. - Introduce `EpollEventHandler` as an abstract base class for specific event handlers. Relates to: #969 Signed-off-by: Takumi Jin <primenumber_2_3_5@yahoo.co.jp>
1 parent 32cf5c2 commit 02fc532

14 files changed

Lines changed: 611 additions & 316 deletions

src/agnocastlib/include/agnocast/agnocast_callback_info.hpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll.hpp"
34
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
45
#include "agnocast/agnocast_smart_pointer.hpp"
56

7+
#include <cstdint>
68
#include <mutex>
79
#include <type_traits>
810

911
namespace agnocast
1012
{
1113

14+
// Capped slightly below UINT32_MAX to provide a safe margin against
15+
// atomic wrap-around (overflow back to 0) during concurrent fetch_add calls.
16+
constexpr uint32_t MAX_CALLBACK_INFO_ID_SAFETY_MARGIN = 1000;
17+
constexpr uint32_t MAX_CALLBACK_INFO_ID = UINT32_MAX - MAX_CALLBACK_INFO_ID_SAFETY_MARGIN;
18+
1219
struct AgnocastExecutable;
1320

1421
// Base class for a type-erased object
@@ -124,4 +131,27 @@ void enqueue_receive_and_execute(
124131
std::mutex & ready_agnocast_executables_mutex,
125132
std::vector<AgnocastExecutable> & ready_agnocast_executables);
126133

134+
class SubscriptionEventHandler : public EpollEventHandler
135+
{
136+
pid_t my_pid_;
137+
std::mutex * ready_agnocast_executables_mutex_;
138+
std::vector<AgnocastExecutable> * ready_agnocast_executables_;
139+
140+
public:
141+
SubscriptionEventHandler(
142+
const pid_t my_pid, std::mutex * ready_agnocast_executables_mutex,
143+
std::vector<AgnocastExecutable> * ready_agnocast_executables)
144+
: my_pid_(my_pid),
145+
ready_agnocast_executables_mutex_(ready_agnocast_executables_mutex),
146+
ready_agnocast_executables_(ready_agnocast_executables)
147+
{
148+
}
149+
150+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Subscription; }
151+
152+
void prepare_epoll(int epoll_fd, const CallbackGroupValidator & validate_callback_group) override;
153+
154+
bool handle(EpollEventLocalID event_local_id) override;
155+
};
156+
127157
} // namespace agnocast
Lines changed: 77 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,119 +1,95 @@
11
#pragma once
22

3-
#include "agnocast/agnocast_callback_info.hpp"
4-
#include "agnocast/agnocast_timer_info.hpp"
5-
#include "sys/epoll.h"
3+
#include "agnocast/agnocast_epoll_event.hpp"
64

7-
#include <atomic>
8-
#include <mutex>
9-
#include <shared_mutex>
10-
#include <vector>
5+
#include <rclcpp/callback_group.hpp>
6+
7+
#include <array>
8+
#include <functional>
9+
#include <memory>
1110

1211
namespace agnocast
1312
{
1413

15-
struct AgnocastExecutable;
14+
using CallbackGroupValidator = std::function<bool(const rclcpp::CallbackGroup::SharedPtr &)>;
15+
16+
class EpollEventHandler
17+
{
18+
public:
19+
EpollEventHandler() = default;
20+
21+
virtual ~EpollEventHandler() = default;
22+
23+
EpollEventHandler(const EpollEventHandler &) = delete;
24+
EpollEventHandler & operator=(const EpollEventHandler &) = delete;
25+
26+
EpollEventHandler(EpollEventHandler &&) = delete;
27+
EpollEventHandler & operator=(EpollEventHandler &&) = delete;
1628

17-
constexpr uint32_t TIMER_EVENT_FLAG = 0x80000000;
18-
constexpr uint32_t CLOCK_EVENT_FLAG = 0x40000000; // For clock_eventfd events (ROS_TIME timers)
19-
constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x20000000; // For shutdown events (AgnocastOnlyExecutor)
20-
constexpr uint32_t EPOLL_EVENT_ID_RESERVED_MASK =
21-
TIMER_EVENT_FLAG | CLOCK_EVENT_FLAG | SHUTDOWN_EVENT_FLAG;
29+
[[nodiscard]] virtual EpollEventType get_type() const = 0;
2230

23-
// @return true if shutdown event detected, false otherwise
24-
bool wait_and_handle_epoll_event(
25-
int epoll_fd, pid_t my_pid, int timeout_ms, std::mutex & ready_agnocast_executables_mutex,
26-
std::vector<AgnocastExecutable> & ready_agnocast_executables);
31+
virtual void prepare_epoll(
32+
int epoll_fd, const CallbackGroupValidator & validate_callback_group) = 0;
2733

28-
template <class ValidateFn>
29-
void prepare_epoll_impl(
30-
const int epoll_fd, const pid_t my_pid, std::mutex & ready_agnocast_executables_mutex,
31-
std::vector<AgnocastExecutable> & ready_agnocast_executables,
32-
ValidateFn && validate_callback_group)
34+
virtual bool handle(EpollEventLocalID event_local_id) = 0;
35+
};
36+
37+
// Shutdown event - only used by AgnocastOnlyExecutor
38+
class ShutdownEventHandler : public EpollEventHandler
3339
{
34-
// Register subscription callbacks to epoll
40+
public:
41+
ShutdownEventHandler() = default;
42+
43+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Shutdown; }
44+
45+
void prepare_epoll(int epoll_fd, const CallbackGroupValidator & validate_callback_group) override
3546
{
36-
std::lock_guard<std::mutex> lock(id2_callback_info_mtx);
37-
38-
for (auto & it : id2_callback_info) {
39-
const uint32_t callback_info_id = it.first;
40-
CallbackInfo & callback_info = it.second;
41-
42-
if (!callback_info.need_epoll_update) {
43-
continue;
44-
}
45-
46-
if (!validate_callback_group(callback_info.callback_group)) {
47-
continue;
48-
}
49-
50-
struct epoll_event ev = {};
51-
ev.events = EPOLLIN;
52-
ev.data.u32 = callback_info_id;
53-
54-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, callback_info.mqdes, &ev) == -1) {
55-
RCLCPP_ERROR(logger, "epoll_ctl failed: %s", strerror(errno));
56-
close(agnocast_fd);
57-
exit(EXIT_FAILURE);
58-
}
59-
60-
if (callback_info.is_transient_local) {
61-
agnocast::enqueue_receive_and_execute(
62-
callback_info_id, my_pid, callback_info, ready_agnocast_executables_mutex,
63-
ready_agnocast_executables);
64-
}
65-
66-
callback_info.need_epoll_update = false;
67-
}
47+
(void)epoll_fd;
48+
(void)validate_callback_group;
6849
}
6950

70-
// Register timers to epoll
51+
bool handle(EpollEventLocalID /*event_local_id*/) override { return true; }
52+
};
53+
54+
class DummyEventHandler : public EpollEventHandler
55+
{
56+
public:
57+
DummyEventHandler() = default;
58+
59+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Dummy; }
60+
61+
void prepare_epoll(int epoll_fd, const CallbackGroupValidator & validate_callback_group) override
7162
{
72-
std::lock_guard<std::mutex> lock(id2_timer_info_mtx);
73-
74-
for (auto & it : id2_timer_info) {
75-
const uint32_t timer_id = it.first;
76-
TimerInfo & timer_info = *it.second;
77-
78-
if (!timer_info.need_epoll_update) {
79-
continue;
80-
}
81-
82-
if (!timer_info.timer.lock() || !validate_callback_group(timer_info.callback_group)) {
83-
continue;
84-
}
85-
86-
std::shared_lock fd_lock(timer_info.fd_mutex);
87-
88-
// Register clock_eventfd for ROS_TIME timers (simulation time support)
89-
if (timer_info.clock_eventfd >= 0) {
90-
struct epoll_event clock_ev = {};
91-
clock_ev.events = EPOLLIN;
92-
clock_ev.data.u32 = timer_id | CLOCK_EVENT_FLAG;
93-
94-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.clock_eventfd, &clock_ev) == -1) {
95-
RCLCPP_ERROR(logger, "epoll_ctl failed for clock_eventfd: %s", strerror(errno));
96-
close(agnocast_fd);
97-
exit(EXIT_FAILURE);
98-
}
99-
}
100-
101-
// Register timerfd (wall clock based firing)
102-
if (timer_info.timer_fd >= 0) {
103-
struct epoll_event ev = {};
104-
ev.events = EPOLLIN;
105-
ev.data.u32 = timer_id | TIMER_EVENT_FLAG;
106-
107-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.timer_fd, &ev) == -1) {
108-
RCLCPP_ERROR(logger, "epoll_ctl failed for timer: %s", strerror(errno));
109-
close(agnocast_fd);
110-
exit(EXIT_FAILURE);
111-
}
112-
}
113-
114-
timer_info.need_epoll_update = false;
115-
}
63+
(void)epoll_fd;
64+
(void)validate_callback_group;
11665
}
117-
}
66+
67+
bool handle(EpollEventLocalID event_local_id) override;
68+
};
69+
70+
using EventHandlerArray =
71+
std::array<std::unique_ptr<EpollEventHandler>, static_cast<size_t>(EpollEventType::NrEventType)>;
72+
73+
class EpollManager final
74+
{
75+
public:
76+
explicit EpollManager(EventHandlerArray sources);
77+
~EpollManager();
78+
EpollManager(const EpollManager &) = delete;
79+
EpollManager & operator=(const EpollManager &) = delete;
80+
EpollManager(EpollManager &&) = delete;
81+
EpollManager & operator=(EpollManager &&) = delete;
82+
83+
[[nodiscard]] int add_event(int fd, EpollEventType type, EpollEventLocalID local_id) const;
84+
85+
void prepare_epoll(const CallbackGroupValidator & validate_callback_group);
86+
87+
/// @return true if shutdown event detected, false otherwise
88+
bool wait_and_handle_epoll_event(int timeout_ms);
89+
90+
private:
91+
int epoll_fd_{-1};
92+
EventHandlerArray sources_{nullptr};
93+
};
11894

11995
} // namespace agnocast
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <utility>
5+
6+
namespace agnocast
7+
{
8+
9+
enum class EpollEventType : uint32_t {
10+
Subscription = 0,
11+
Timer,
12+
Clock,
13+
Shutdown,
14+
NrEventType,
15+
16+
Dummy = 0xFFFFFFFF,
17+
};
18+
19+
using EpollEventLocalID = uint32_t;
20+
21+
constexpr uint32_t EPOLL_DATA_TYPE_SHIFT = 32;
22+
constexpr uint32_t EPOLL_DATA_ID_BITMASK = 0xFFFFFFFF;
23+
24+
inline uint64_t pack_epoll_data(EpollEventType type, EpollEventLocalID id)
25+
{
26+
return (static_cast<uint64_t>(type) << EPOLL_DATA_TYPE_SHIFT) | id;
27+
}
28+
29+
inline std::pair<EpollEventType, EpollEventLocalID> unpack_epoll_data(uint64_t data)
30+
{
31+
auto type = static_cast<EpollEventType>(data >> EPOLL_DATA_TYPE_SHIFT);
32+
auto id = static_cast<uint32_t>(data & EPOLL_DATA_ID_BITMASK);
33+
return {type, id};
34+
}
35+
36+
} // namespace agnocast

src/agnocastlib/include/agnocast/agnocast_executor.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include "agnocast/agnocast_public_api.hpp"
66
#include "rclcpp/rclcpp.hpp"
77

8+
#include <memory>
9+
810
namespace agnocast
911
{
1012

@@ -34,7 +36,7 @@ class AgnocastExecutor : public rclcpp::Executor
3436
virtual bool validate_callback_group(const rclcpp::CallbackGroup::SharedPtr & group) const = 0;
3537

3638
protected:
37-
int epoll_fd_;
39+
std::unique_ptr<EpollManager> epoll_manager_;
3840
pid_t my_pid_;
3941
std::mutex wait_mutex_;
4042

src/agnocastlib/include/agnocast/agnocast_timer_info.hpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll.hpp"
34
#include "agnocast/agnocast_timer.hpp"
45
#include "rclcpp/rclcpp.hpp"
56

67
#include <atomic>
78
#include <chrono>
9+
#include <cstdint>
810
#include <functional>
911
#include <memory>
1012
#include <mutex>
@@ -15,6 +17,12 @@ namespace agnocast
1517
{
1618

1719
constexpr int64_t NANOSECONDS_PER_SECOND = 1000000000;
20+
// Capped slightly below UINT32_MAX to provide a safe margin against
21+
// atomic wrap-around (overflow back to 0) during concurrent fetch_add calls.
22+
constexpr uint32_t MAX_TIMER_ID_SAFETY_MARGIN = 1000;
23+
constexpr uint32_t MAX_TIMER_ID = UINT32_MAX - MAX_TIMER_ID_SAFETY_MARGIN;
24+
25+
struct AgnocastExecutable;
1826

1927
struct TimerInfo
2028
{
@@ -27,7 +35,9 @@ struct TimerInfo
2735

2836
uint32_t timer_id = 0;
2937
int timer_fd = -1;
38+
bool timer_fd_need_update = false;
3039
int clock_eventfd = -1; // eventfd to wake epoll on clock updates (ROS_TIME only)
40+
bool clock_eventfd_need_update = false;
3141
std::weak_ptr<TimerBase> timer;
3242
rclcpp::CallbackGroup::SharedPtr callback_group;
3343
std::atomic<int64_t> last_call_time_ns;
@@ -59,4 +69,50 @@ void register_timer_info(
5969

6070
void unregister_timer_info(uint32_t timer_id);
6171

72+
class TimerEventHandler : public EpollEventHandler
73+
{
74+
pid_t my_pid_;
75+
std::mutex * ready_agnocast_executables_mutex_;
76+
std::vector<AgnocastExecutable> * ready_agnocast_executables_;
77+
78+
public:
79+
TimerEventHandler(
80+
const pid_t my_pid, std::mutex * ready_agnocast_executables_mutex,
81+
std::vector<AgnocastExecutable> * ready_agnocast_executables)
82+
: my_pid_(my_pid),
83+
ready_agnocast_executables_mutex_(ready_agnocast_executables_mutex),
84+
ready_agnocast_executables_(ready_agnocast_executables)
85+
{
86+
}
87+
88+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Timer; }
89+
90+
void prepare_epoll(int epoll_fd, const CallbackGroupValidator & validate_callback_group) override;
91+
92+
bool handle(EpollEventLocalID event_local_id) override;
93+
};
94+
95+
class ClockEventHandler : public EpollEventHandler
96+
{
97+
pid_t my_pid_;
98+
std::mutex * ready_agnocast_executables_mutex_;
99+
std::vector<AgnocastExecutable> * ready_agnocast_executables_;
100+
101+
public:
102+
ClockEventHandler(
103+
const pid_t my_pid, std::mutex * ready_agnocast_executables_mutex,
104+
std::vector<AgnocastExecutable> * ready_agnocast_executables)
105+
: my_pid_(my_pid),
106+
ready_agnocast_executables_mutex_(ready_agnocast_executables_mutex),
107+
ready_agnocast_executables_(ready_agnocast_executables)
108+
{
109+
}
110+
111+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Clock; }
112+
113+
void prepare_epoll(int epoll_fd, const CallbackGroupValidator & validate_callback_group) override;
114+
115+
bool handle(EpollEventLocalID event_local_id) override;
116+
};
117+
62118
} // namespace agnocast

0 commit comments

Comments
 (0)