Skip to content

Commit f8c0de0

Browse files
committed
refactor(agnocastlib): extract event-specific logic from agnocast_epoll
Previously, all event-specific logic was hardcoded in `agnocast_epoll.cpp` and `.hpp`. This caused dependency issues and made it difficult to add new event types. This commit refactors the code by categorizing events and moving their implementations into separate source files. Key changes include: - Change epoll_data format from u32 to u64 to hold both event kind and local identifier. - Introduce `EpollManager` to manage and dispatch events from the Epoll class. - Introduce `EpollEventSource` as an abstract base class for specific event handlers. - Replace raw file descriptor `epoll_fd_` usage with the encapsulated `Epoll` class. - Move implementation details from headers to `.cpp` files. Relates to: #969
1 parent 6df1c6a commit f8c0de0

16 files changed

Lines changed: 473 additions & 313 deletions

src/agnocastlib/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ find_package(glog REQUIRED)
3232

3333
add_library(agnocast SHARED
3434
src/agnocast.cpp src/agnocast_utils.cpp src/agnocast_publisher.cpp src/agnocast_subscription.cpp
35-
src/agnocast_smart_pointer.cpp src/agnocast_callback_info.cpp src/agnocast_epoll.cpp src/agnocast_epoll_update_dispatcher.cpp src/agnocast_timer_info.cpp src/agnocast_timer.cpp src/agnocast_executor.cpp
35+
src/agnocast_smart_pointer.cpp src/agnocast_callback_info.cpp src/agnocast_epoll.cpp src/agnocast_epoll_event.cpp src/agnocast_epoll_update_dispatcher.cpp src/agnocast_timer_info.cpp src/agnocast_timer.cpp src/agnocast_executor.cpp
3636
src/agnocast_single_threaded_executor.cpp src/agnocast_multi_threaded_executor.cpp
3737
src/agnocast_callback_isolated_executor.cpp
3838
src/agnocast_tracepoint_wrapper.c src/agnocast_client.cpp

src/agnocastlib/include/agnocast/agnocast_callback_info.hpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll.hpp"
34
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
45
#include "agnocast/agnocast_smart_pointer.hpp"
56

@@ -9,6 +10,8 @@
910
namespace agnocast
1011
{
1112

13+
constexpr uint32_t MAX_CALLBACK_INFO_ID = 0x10000000;
14+
1215
struct AgnocastExecutable;
1316

1417
// Base class for a type-erased object
@@ -124,4 +127,28 @@ void enqueue_receive_and_execute(
124127
std::mutex & ready_agnocast_executables_mutex,
125128
std::vector<AgnocastExecutable> & ready_agnocast_executables);
126129

130+
class SubscriptionEventSource : public EpollEventSource
131+
{
132+
pid_t my_pid_;
133+
std::mutex * ready_agnocast_executables_mutex_;
134+
std::vector<AgnocastExecutable> * ready_agnocast_executables_;
135+
136+
public:
137+
SubscriptionEventSource(
138+
const pid_t my_pid, std::mutex * ready_agnocast_executables_mutex,
139+
std::vector<AgnocastExecutable> * ready_agnocast_executables)
140+
: my_pid_(my_pid),
141+
ready_agnocast_executables_mutex_(ready_agnocast_executables_mutex),
142+
ready_agnocast_executables_(ready_agnocast_executables)
143+
{
144+
}
145+
146+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Subscription; }
147+
148+
void prepare_epoll(
149+
Epoll & epoll, const CallbackGroupValidator & validate_callback_group) override;
150+
151+
bool handle(EpollEventLocalID event_local_id) override;
152+
};
153+
127154
} // namespace agnocast
Lines changed: 56 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#pragma once
22

3-
#include "agnocast/agnocast_callback_info.hpp"
4-
#include "agnocast/agnocast_timer_info.hpp"
5-
#include "sys/epoll.h"
3+
#include "agnocast/agnocast_epoll_event.hpp"
4+
5+
#include <rclcpp/callback_group.hpp>
66

77
#include <atomic>
88
#include <mutex>
@@ -12,108 +12,67 @@
1212
namespace agnocast
1313
{
1414

15-
struct AgnocastExecutable;
15+
using CallbackGroupValidator = std::function<bool(const rclcpp::CallbackGroup::SharedPtr &)>;
16+
17+
class EpollEventSource
18+
{
19+
public:
20+
EpollEventSource() = default;
21+
22+
virtual ~EpollEventSource() = default;
1623

17-
constexpr uint32_t TIMER_EVENT_FLAG = 0x80000000;
18-
constexpr uint32_t CLOCK_EVENT_FLAG = 0x40000000; // For clock_eventfd events (ROS_TIME timers)
19-
constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x20000000; // For shutdown events (AgnocastOnlyExecutor)
20-
constexpr uint32_t EPOLL_EVENT_ID_RESERVED_MASK =
21-
TIMER_EVENT_FLAG | CLOCK_EVENT_FLAG | SHUTDOWN_EVENT_FLAG;
24+
EpollEventSource(const EpollEventSource &) = delete;
25+
EpollEventSource & operator=(const EpollEventSource &) = delete;
2226

23-
// @return true if shutdown event detected, false otherwise
24-
bool wait_and_handle_epoll_event(
25-
int epoll_fd, pid_t my_pid, int timeout_ms, std::mutex & ready_agnocast_executables_mutex,
26-
std::vector<AgnocastExecutable> & ready_agnocast_executables);
27+
EpollEventSource(EpollEventSource &&) = delete;
28+
EpollEventSource & operator=(EpollEventSource &&) = delete;
2729

28-
template <class ValidateFn>
29-
void prepare_epoll_impl(
30-
const int epoll_fd, const pid_t my_pid, std::mutex & ready_agnocast_executables_mutex,
31-
std::vector<AgnocastExecutable> & ready_agnocast_executables,
32-
ValidateFn && validate_callback_group)
30+
[[nodiscard]] virtual EpollEventType get_type() const = 0;
31+
32+
virtual void prepare_epoll(
33+
Epoll & epoll, const CallbackGroupValidator & validate_callback_group) = 0;
34+
35+
virtual bool handle(EpollEventLocalID event_local_id) = 0;
36+
};
37+
38+
// Shutdown event - only used by AgnocastOnlyExecutor
39+
class ShutdownEventSource : public EpollEventSource
3340
{
34-
// Register subscription callbacks to epoll
41+
public:
42+
ShutdownEventSource() = default;
43+
44+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Shutdown; }
45+
46+
void prepare_epoll(Epoll & epoll, const CallbackGroupValidator & validate_callback_group) override
3547
{
36-
std::lock_guard<std::mutex> lock(id2_callback_info_mtx);
37-
38-
for (auto & it : id2_callback_info) {
39-
const uint32_t callback_info_id = it.first;
40-
CallbackInfo & callback_info = it.second;
41-
42-
if (!callback_info.need_epoll_update) {
43-
continue;
44-
}
45-
46-
if (!validate_callback_group(callback_info.callback_group)) {
47-
continue;
48-
}
49-
50-
struct epoll_event ev = {};
51-
ev.events = EPOLLIN;
52-
ev.data.u32 = callback_info_id;
53-
54-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, callback_info.mqdes, &ev) == -1) {
55-
RCLCPP_ERROR(logger, "epoll_ctl failed: %s", strerror(errno));
56-
close(agnocast_fd);
57-
exit(EXIT_FAILURE);
58-
}
59-
60-
if (callback_info.is_transient_local) {
61-
agnocast::enqueue_receive_and_execute(
62-
callback_info_id, my_pid, callback_info, ready_agnocast_executables_mutex,
63-
ready_agnocast_executables);
64-
}
65-
66-
callback_info.need_epoll_update = false;
67-
}
48+
(void)epoll;
49+
(void)validate_callback_group;
6850
}
6951

70-
// Register timers to epoll
52+
bool handle(EpollEventLocalID /*event_local_id*/) override { return true; }
53+
};
54+
55+
using EventSourceArray =
56+
std::array<std::unique_ptr<EpollEventSource>, static_cast<size_t>(EpollEventType::NrEventType)>;
57+
58+
class EpollManager
59+
{
60+
public:
61+
explicit EpollManager(EventSourceArray sources);
62+
63+
int add_event(int fd, EpollEventType type, EpollEventLocalID local_id)
7164
{
72-
std::lock_guard<std::mutex> lock(id2_timer_info_mtx);
73-
74-
for (auto & it : id2_timer_info) {
75-
const uint32_t timer_id = it.first;
76-
TimerInfo & timer_info = *it.second;
77-
78-
if (!timer_info.need_epoll_update) {
79-
continue;
80-
}
81-
82-
if (!timer_info.timer.lock() || !validate_callback_group(timer_info.callback_group)) {
83-
continue;
84-
}
85-
86-
std::shared_lock fd_lock(timer_info.fd_mutex);
87-
88-
// Register clock_eventfd for ROS_TIME timers (simulation time support)
89-
if (timer_info.clock_eventfd >= 0) {
90-
struct epoll_event clock_ev = {};
91-
clock_ev.events = EPOLLIN;
92-
clock_ev.data.u32 = timer_id | CLOCK_EVENT_FLAG;
93-
94-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.clock_eventfd, &clock_ev) == -1) {
95-
RCLCPP_ERROR(logger, "epoll_ctl failed for clock_eventfd: %s", strerror(errno));
96-
close(agnocast_fd);
97-
exit(EXIT_FAILURE);
98-
}
99-
}
100-
101-
// Register timerfd (wall clock based firing)
102-
if (timer_info.timer_fd >= 0) {
103-
struct epoll_event ev = {};
104-
ev.events = EPOLLIN;
105-
ev.data.u32 = timer_id | TIMER_EVENT_FLAG;
106-
107-
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.timer_fd, &ev) == -1) {
108-
RCLCPP_ERROR(logger, "epoll_ctl failed for timer: %s", strerror(errno));
109-
close(agnocast_fd);
110-
exit(EXIT_FAILURE);
111-
}
112-
}
113-
114-
timer_info.need_epoll_update = false;
115-
}
65+
return epoll_.add_source(fd, type, local_id);
11666
}
117-
}
67+
68+
void prepare_epoll(const CallbackGroupValidator & validate_callback_group);
69+
70+
/// @return true if shutdown event detected, false otherwise
71+
bool wait_and_handle_epoll_event(int timeout_ms);
72+
73+
private:
74+
Epoll epoll_;
75+
EventSourceArray sources_{nullptr};
76+
};
11877

11978
} // namespace agnocast
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <vector>
5+
6+
namespace agnocast
7+
{
8+
9+
enum class EpollEventType : uint32_t {
10+
Subscription = 0,
11+
Timer = 1,
12+
Shutdown = 2,
13+
NrEventType,
14+
};
15+
16+
using EpollEventLocalID = uint32_t;
17+
18+
constexpr uint32_t EPOLL_DATA_TYPE_SHIFT = 32;
19+
constexpr uint32_t EPOLL_DATA_ID_BITMASK = 0xFFFFFFFF;
20+
21+
inline uint64_t pack_epoll_data(EpollEventType type, EpollEventLocalID id)
22+
{
23+
return (static_cast<uint64_t>(type) << EPOLL_DATA_TYPE_SHIFT) | id;
24+
}
25+
26+
inline std::pair<EpollEventType, EpollEventLocalID> unpack_epoll_data(uint64_t data)
27+
{
28+
auto type = static_cast<EpollEventType>(data >> EPOLL_DATA_TYPE_SHIFT);
29+
auto id = static_cast<uint32_t>(data & EPOLL_DATA_ID_BITMASK);
30+
return {type, id};
31+
}
32+
33+
using EpollResult = std::pair<EpollEventType, EpollEventLocalID>;
34+
35+
class Epoll
36+
{
37+
static constexpr int EPOLL_WAIT_MAX_EVENTS = 1;
38+
int epoll_fd_;
39+
40+
public:
41+
Epoll();
42+
43+
Epoll(const Epoll &) = delete;
44+
Epoll & operator=(const Epoll &) = delete;
45+
46+
Epoll(Epoll && other) noexcept : epoll_fd_(other.epoll_fd_) { other.epoll_fd_ = -1; }
47+
Epoll & operator=(Epoll && other) noexcept;
48+
49+
~Epoll();
50+
51+
[[nodiscard]] int add_source(int fd, EpollEventType type, EpollEventLocalID local_id) const;
52+
53+
void remove(int fd) const;
54+
55+
int wait(std::vector<EpollResult> & results, int timeout_ms) const;
56+
57+
[[nodiscard]] int fd() const { return epoll_fd_; }
58+
59+
[[nodiscard]] bool is_valid() const { return epoll_fd_ >= 0; }
60+
};
61+
62+
} // namespace agnocast

src/agnocastlib/include/agnocast/agnocast_executor.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
#pragma once
22

33
#include "agnocast/agnocast_epoll.hpp"
4+
#include "agnocast/agnocast_epoll_event.hpp"
5+
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
46
#include "agnocast/agnocast_public_api.hpp"
57
#include "rclcpp/rclcpp.hpp"
68

9+
#include <memory>
10+
711
namespace agnocast
812
{
913

@@ -33,7 +37,7 @@ class AgnocastExecutor : public rclcpp::Executor
3337
virtual bool validate_callback_group(const rclcpp::CallbackGroup::SharedPtr & group) const = 0;
3438

3539
protected:
36-
int epoll_fd_;
40+
std::unique_ptr<EpollManager> epoll_manager_;
3741
pid_t my_pid_;
3842
std::mutex wait_mutex_;
3943

src/agnocastlib/include/agnocast/agnocast_timer_info.hpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll.hpp"
34
#include "agnocast/agnocast_timer.hpp"
45
#include "rclcpp/rclcpp.hpp"
56

@@ -15,6 +16,9 @@ namespace agnocast
1516
{
1617

1718
constexpr int64_t NANOSECONDS_PER_SECOND = 1000000000;
19+
constexpr uint32_t MAX_TIMER_ID = 0x10000000;
20+
21+
struct AgnocastExecutable;
1822

1923
struct TimerInfo
2024
{
@@ -59,4 +63,28 @@ void register_timer_info(
5963

6064
void unregister_timer_info(uint32_t timer_id);
6165

66+
class TimerEventSource : public EpollEventSource
67+
{
68+
pid_t my_pid_;
69+
std::mutex * ready_agnocast_executables_mutex_;
70+
std::vector<AgnocastExecutable> * ready_agnocast_executables_;
71+
72+
public:
73+
TimerEventSource(
74+
const pid_t my_pid, std::mutex * ready_agnocast_executables_mutex,
75+
std::vector<AgnocastExecutable> * ready_agnocast_executables)
76+
: my_pid_(my_pid),
77+
ready_agnocast_executables_mutex_(ready_agnocast_executables_mutex),
78+
ready_agnocast_executables_(ready_agnocast_executables)
79+
{
80+
}
81+
82+
[[nodiscard]] EpollEventType get_type() const override { return EpollEventType::Timer; }
83+
84+
void prepare_epoll(
85+
Epoll & epoll, const CallbackGroupValidator & validate_callback_group) override;
86+
87+
bool handle(EpollEventLocalID event_local_id) override;
88+
};
89+
6290
} // namespace agnocast

src/agnocastlib/include/agnocast/node/agnocast_only_executor.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include "agnocast/agnocast_epoll.hpp"
4+
#include "agnocast/agnocast_epoll_event.hpp"
35
#include "agnocast/agnocast_epoll_update_dispatcher.hpp"
46
#include "agnocast/agnocast_public_api.hpp"
57
#include "rclcpp/callback_group.hpp"
@@ -30,7 +32,7 @@ class AgnocastOnlyExecutor
3032
{
3133
protected:
3234
std::atomic_bool spinning_;
33-
int epoll_fd_;
35+
std::unique_ptr<EpollManager> epoll_manager_;
3436
int shutdown_event_fd_;
3537
pid_t my_pid_;
3638

0 commit comments

Comments
 (0)