44
55namespace FREYR_NAMESPACE
66{
7+ template <typename TEvent>
8+ class Publisher ;
9+
10+ struct ListenerHandle
11+ {
12+ size_t id { 0 };
13+ bool IsValid () const { return id != 0 ; }
14+ };
15+
716 class IPublisher
817 {
918 public:
10- virtual ~IPublisher () = default ;
19+ virtual ~IPublisher () = default ;
20+ virtual void ClearInactiveListeners () = 0;
1121 };
1222
1323 template <typename TEvent>
1424 class Publisher final : public IPublisher
1525 {
26+ private:
27+ struct Listener
28+ {
29+ fr::function<void (const TEvent&)> callback;
30+ std::weak_ptr<ListenerHandle> handle;
31+
32+ Listener (fr::function<void (const TEvent&)>&& cb, Ref<ListenerHandle> handle) :
33+ callback (std::move(cb)), handle(handle)
34+ {
35+ }
36+ };
37+
38+ mutable std::shared_mutex mMutex ;
39+ std::vector<Listener> mListeners ;
40+ std::atomic<size_t > mNextId { 1 };
41+ std::atomic<bool > mNeedsCleanup { false };
42+
43+ std::mutex mPendingMutex ;
44+ std::vector<Listener> mPendingListeners ;
45+
1646 public:
1747 ~Publisher () override = default ;
1848
19- void Subscribe (auto && listener)
49+ [[nodiscard]] Ref<ListenerHandle> Subscribe (auto && listener)
2050 {
21- mListeners .emplace_back (std::move (listener));
51+ const size_t id = mNextId .fetch_add (1 , std::memory_order_relaxed);
52+
53+ auto handle = skr::MakeRef<ListenerHandle>(id);
54+ {
55+ std::lock_guard lock (mPendingMutex );
56+ mPendingListeners .emplace_back (
57+ fr::function<void (const TEvent&)>(std::forward<decltype (listener)>(listener)),
58+ handle);
59+ }
60+
61+ return handle;
2262 }
2363
24- void Publish (TEvent event)
64+ void Publish (const TEvent& event)
2565 {
26- for (auto i = 0 ; i < mListeners .size (); i++)
66+ MergePendingListeners ();
67+
68+ std::shared_lock lock (mMutex );
69+
70+ for (Listener& listener : mListeners )
71+ {
72+ if (!listener.handle .expired ())
73+ {
74+ listener.callback (event);
75+ }
76+ }
77+
78+ // Clean up after publishing if needed
79+ if (mNeedsCleanup .load (std::memory_order_acquire))
2780 {
28- mListeners [i](event);
81+ lock.unlock ();
82+ ClearInactiveListeners ();
2983 }
3084 }
3185
32- private:
33- std::vector<fr::function<void (TEvent)>> mListeners ;
86+ // Overload for rvalue events
87+ void Publish (TEvent&& event) { Publish (static_cast <const TEvent&>(event)); }
88+
89+ void ClearInactiveListeners () override
90+ {
91+ std::unique_lock lock (mMutex );
92+
93+ mListeners .erase (std::remove_if (mListeners .begin (),
94+ mListeners .end (),
95+ [](const Listener& l) { return l.handle .expired (); }),
96+ mListeners .end ());
97+
98+ mNeedsCleanup .store (false , std::memory_order_release);
99+ }
100+
101+ size_t ListenerCount () const
102+ {
103+ std::shared_lock lock (mMutex );
104+ return std::count_if (mListeners .begin (), mListeners .end (), [](const Listener& l) {
105+ return !l.handle .expired ();
106+ });
107+ }
108+
109+ private:
110+ void MergePendingListeners ()
111+ {
112+ std::vector<Listener> toMerge;
113+
114+ {
115+ std::lock_guard lock (mPendingMutex );
116+ if (mPendingListeners .empty ())
117+ return ;
118+ toMerge = std::move (mPendingListeners );
119+ mPendingListeners .clear ();
120+ }
121+
122+ if (!toMerge.empty ())
123+ {
124+ std::unique_lock lock (mMutex );
125+ mListeners .reserve (mListeners .size () + toMerge.size ());
126+ mListeners .insert (mListeners .end (),
127+ std::make_move_iterator (toMerge.begin ()),
128+ std::make_move_iterator (toMerge.end ()));
129+ }
130+ }
34131 };
35132
36133 class EventManager
37134 {
135+ private:
136+ mutable std::shared_mutex mMutex ;
137+ std::vector<std::unique_ptr<IPublisher>> mPublishers ;
138+ std::unordered_map<size_t , size_t > mEventIdToIndex ;
139+
38140 public:
39- EventManager () : mPublishers (256 ) {}
40- ~EventManager ()
141+ EventManager () { mPublishers .reserve (256 ); }
142+
143+ ~EventManager () = default ;
144+
145+ template <typename T>
146+ requires IsEvent<T>
147+ [[nodiscard]] Ref<ListenerHandle> Subscribe (auto && listener)
41148 {
42- for (const auto publisher : mPublishers )
43- {
44- delete publisher;
45- }
149+ return GetPublisher<T>()->Subscribe (std::forward<decltype (listener)>(listener));
46150 }
47151
48152 template <typename T>
49153 requires IsEvent<T>
50- void Subscribe ( auto && listener )
154+ void Send ( const T& event )
51155 {
52- GetPublisher<T>()->Subscribe (listener );
156+ GetPublisher<T>()->Publish (event );
53157 }
54158
55159 template <typename T>
56160 requires IsEvent<T>
57- void Send (T event)
161+ void Send (T&& event)
58162 {
59- GetPublisher<T>()->Publish (event);
163+ GetPublisher<T>()->Publish (std::forward<T>(event));
164+ }
165+
166+ void Cleanup ()
167+ {
168+ std::shared_lock lock (mMutex );
169+ for (auto & publisher : mPublishers )
170+ {
171+ if (publisher)
172+ {
173+ publisher->ClearInactiveListeners ();
174+ }
175+ }
60176 }
61177
62178 private:
63179 template <typename T>
64180 requires IsEvent<T>
65181 Publisher<T>* GetPublisher ()
66182 {
67- if (GetEventId<T>() + 1 > mPublishers .size ())
68- mPublishers .resize (GetEventId<T>() + 16 );
183+ const size_t eventId = GetEventId<T>();
69184
70- if (!mPublishers [GetEventId<T>()])
71185 {
72- mPublishers [GetEventId<T>()] = new Publisher<T>();
186+ std::shared_lock lock (mMutex );
187+ auto it = mEventIdToIndex .find (eventId);
188+ if (it != mEventIdToIndex .end ())
189+ {
190+ return static_cast <Publisher<T>*>(mPublishers [it->second ].get ());
191+ }
73192 }
74193
75- return static_cast <Publisher<T>*>(mPublishers [GetEventId<T>()]);
76- }
194+ std::unique_lock lock (mMutex );
77195
78- std::vector<IPublisher*> mPublishers ;
196+ auto it = mEventIdToIndex .find (eventId);
197+ if (it != mEventIdToIndex .end ())
198+ {
199+ return static_cast <Publisher<T>*>(mPublishers [it->second ].get ());
200+ }
201+
202+ size_t index = mPublishers .size ();
203+ mPublishers .push_back (std::make_unique<Publisher<T>>());
204+ mEventIdToIndex [eventId] = index;
205+
206+ return static_cast <Publisher<T>*>(mPublishers [index].get ());
207+ }
79208 };
80209
81- } // namespace FREYR_NAMESPACE
210+ } // namespace FREYR_NAMESPACE
0 commit comments