1919
2020#include " rpc/WorkQueue.hpp"
2121
22+ #include " util/Assert.hpp"
23+ #include " util/Spawn.hpp"
2224#include " util/log/Logger.hpp"
2325#include " util/prometheus/Label.hpp"
2426#include " util/prometheus/Prometheus.hpp"
2527
28+ #include < boost/asio/dispatch.hpp>
29+ #include < boost/asio/post.hpp>
30+ #include < boost/asio/spawn.hpp>
31+ #include < boost/asio/strand.hpp>
2632#include < boost/json/object.hpp>
2733
34+ #include < chrono>
2835#include < cstddef>
2936#include < cstdint>
3037#include < functional>
3138#include < utility>
39+ #include < vector>
3240
3341namespace rpc {
3442
35- void
36- WorkQueue::OneTimeCallable::setCallable (std::function<void ()> func)
37- {
38- func_ = func;
39- }
40-
41- void
42- WorkQueue::OneTimeCallable::operator ()()
43- {
44- if (not called_) {
45- func_ ();
46- called_ = true ;
47- }
48- }
49- WorkQueue::OneTimeCallable::
50- operator bool () const
51- {
52- return func_.operator bool ();
53- }
54-
55- WorkQueue::WorkQueue (std::uint32_t numWorkers, uint32_t maxSize)
43+ WorkQueue::WorkQueue (DontStartProcessingTag, std::uint32_t numWorkers, uint32_t maxSize)
5644 : queued_{PrometheusService::counterInt (
5745 " work_queue_queued_total_number" ,
5846 util::prometheus::Labels (),
@@ -69,25 +57,156 @@ WorkQueue::WorkQueue(std::uint32_t numWorkers, uint32_t maxSize)
6957 " The current number of tasks in the queue"
7058 )}
7159 , ioc_{numWorkers}
60+ , strand_{ioc_.get_executor ()}
61+ , waitTimer_(ioc_)
7262{
7363 if (maxSize != 0 )
7464 maxSize_ = maxSize;
7565}
7666
67+ WorkQueue::WorkQueue (std::uint32_t numWorkers, uint32_t maxSize)
68+ : WorkQueue(kDONT_START_PROCESSING_TAG , numWorkers, maxSize)
69+ {
70+ startProcessing ();
71+ }
72+
7773WorkQueue::~WorkQueue ()
7874{
79- join ();
75+ stop ();
8076}
8177
8278void
83- WorkQueue::stop (std::function<void ()> onQueueEmpty)
79+ WorkQueue::startProcessing ()
80+ {
81+ util::spawn (strand_, [this ](auto yield) {
82+ ASSERT (not hasDispatcher_, " Dispatcher already running" );
83+
84+ hasDispatcher_ = true ;
85+ dispatcherLoop (yield);
86+ });
87+ }
88+
89+ bool
90+ WorkQueue::postCoro (TaskType func, bool isWhiteListed, Priority priority)
91+ {
92+ if (stopping_) {
93+ LOG (log_.warn ()) << " Queue is stopping, rejecting incoming task." ;
94+ return false ;
95+ }
96+
97+ if (size () >= maxSize_ && !isWhiteListed) {
98+ LOG (log_.warn ()) << " Queue is full. rejecting job. current size = " << size () << " ; max size = " << maxSize_;
99+ return false ;
100+ }
101+
102+ ++curSize_.get ();
103+ auto needsWakeup = false ;
104+
105+ {
106+ auto state = dispatcherState_.lock ();
107+
108+ needsWakeup = std::exchange (state->isIdle , false );
109+
110+ state->push (priority, std::move (func));
111+ }
112+
113+ if (needsWakeup)
114+ boost::asio::post (strand_, [this ] { waitTimer_.cancel (); });
115+
116+ return true ;
117+ }
118+
119+ void
120+ WorkQueue::dispatcherLoop (boost::asio::yield_context yield)
121+ {
122+ LOG (log_.info ()) << " WorkQueue dispatcher starting" ;
123+
124+ // all ongoing tasks must be completed before stopping fully
125+ while (not stopping_ or size () > 0 ) {
126+ std::vector<TaskType> batch;
127+
128+ {
129+ auto state = dispatcherState_.lock ();
130+
131+ if (state->empty ()) {
132+ state->isIdle = true ;
133+ } else {
134+ for (auto count = 0uz; count < kTAKE_HIGH_PRIO and not state->high .empty (); ++count) {
135+ batch.push_back (std::move (state->high .front ()));
136+ state->high .pop ();
137+ }
138+
139+ if (not state->normal .empty ()) {
140+ batch.push_back (std::move (state->normal .front ()));
141+ state->normal .pop ();
142+ }
143+ }
144+ }
145+
146+ if (not stopping_ and batch.empty ()) {
147+ waitTimer_.expires_at (std::chrono::steady_clock::time_point::max ());
148+ boost::system::error_code ec;
149+ waitTimer_.async_wait (yield[ec]);
150+ } else {
151+ for (auto task : std::move (batch)) {
152+ util::spawn (
153+ ioc_,
154+ [this , spawnedAt = std::chrono::system_clock::now (), task = std::move (task)](auto yield) mutable {
155+ auto const takenAt = std::chrono::system_clock::now ();
156+ auto const waited =
157+ std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count ();
158+
159+ ++queued_.get ();
160+ durationUs_.get () += waited;
161+ LOG (log_.info ()) << " WorkQueue wait time: " << waited << " , queue size: " << size ();
162+
163+ task (yield);
164+
165+ --curSize_.get ();
166+ }
167+ );
168+ }
169+
170+ boost::asio::post (ioc_.get_executor (), yield); // yield back to avoid hijacking the thread
171+ }
172+ }
173+
174+ LOG (log_.info ()) << " WorkQueue dispatcher shutdown requested - time to execute onTasksComplete" ;
175+
176+ {
177+ auto onTasksComplete = onQueueEmpty_.lock ();
178+ ASSERT (onTasksComplete->operator bool (), " onTasksComplete must be set when stopping is true." );
179+ onTasksComplete->operator ()();
180+ }
181+
182+ LOG (log_.info ()) << " WorkQueue dispatcher finished" ;
183+ }
184+
185+ void
186+ WorkQueue::requestStop (std::function<void ()> onQueueEmpty)
84187{
85188 auto handler = onQueueEmpty_.lock ();
86- handler->setCallable (std::move (onQueueEmpty));
189+ *handler = std::move (onQueueEmpty);
190+
87191 stopping_ = true ;
88- if (size () == 0 ) {
89- handler->operator ()();
192+ auto needsWakeup = false ;
193+
194+ {
195+ auto state = dispatcherState_.lock ();
196+ needsWakeup = std::exchange (state->isIdle , false );
90197 }
198+
199+ if (needsWakeup)
200+ boost::asio::post (strand_, [this ] { waitTimer_.cancel (); });
201+ }
202+
203+ void
204+ WorkQueue::stop ()
205+ {
206+ if (not stopping_.exchange (true ))
207+ requestStop ();
208+
209+ ioc_.join ();
91210}
92211
93212WorkQueue
@@ -115,12 +234,6 @@ WorkQueue::report() const
115234 return obj;
116235}
117236
118- void
119- WorkQueue::join ()
120- {
121- ioc_.join ();
122- }
123-
124237size_t
125238WorkQueue::size () const
126239{
0 commit comments