Skip to content

Commit 33b1278

Browse files
author
Rafał Hibner
committed
Merge remote-tracking branch 'gitmodimo/SerialSequencingBackpressure' into combined3
2 parents 54cc3eb + b588656 commit 33b1278

File tree

2 files changed

+98
-2
lines changed

2 files changed

+98
-2
lines changed

cpp/src/arrow/acero/accumulation_queue.cc

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <queue>
2323
#include <vector>
2424

25+
#include "arrow/acero/query_context.h"
2526
#include "arrow/compute/exec.h"
2627
#include "arrow/util/logging_internal.h"
2728

@@ -160,6 +161,82 @@ class SerialSequencingQueueImpl : public SerialSequencingQueue {
160161
bool is_processing_ = false;
161162
};
162163

164+
class BackpressureProcessor : public SerialSequencingQueue::Processor {
165+
private:
166+
struct DoHandle {
167+
explicit DoHandle(BackpressureProcessor& queue)
168+
: queue_(queue), start_size_(queue_.SizeUnlocked()) {}
169+
170+
~DoHandle() {
171+
// unsynced access is safe since DoHandle is internally only used when the
172+
// lock is held
173+
size_t end_size = queue_.SizeUnlocked();
174+
queue_.handler_.Handle(start_size_, end_size);
175+
}
176+
177+
BackpressureProcessor& queue_;
178+
size_t start_size_;
179+
};
180+
181+
public:
182+
explicit BackpressureProcessor(SerialSequencingQueue::Processor* processor,
183+
BackpressureHandler handler, ExecPlan* plan,
184+
bool requires_io = true)
185+
: processor_(processor),
186+
handler_(std::move(handler)),
187+
plan_(plan),
188+
requires_io_(requires_io) {}
189+
190+
void Schedule() {
191+
if (requires_io_) {
192+
plan_->query_context()->ScheduleIOTask([this]() { return DoProcess(); },
193+
"BackpressureProcessor::DoProcessIO");
194+
} else {
195+
plan_->query_context()->ScheduleTask([this]() { return DoProcess(); },
196+
"BackpressureProcessor::DoProcess");
197+
}
198+
}
199+
200+
Status Process(ExecBatch batch) override {
201+
std::unique_lock lk(mutex_);
202+
{
203+
DoHandle do_handle(*this);
204+
sequenced_queue_.push(batch);
205+
}
206+
if (!is_processing_) {
207+
is_processing_ = true;
208+
Schedule();
209+
}
210+
return Status::OK();
211+
}
212+
213+
private:
214+
Status DoProcess() {
215+
std::unique_lock lk(mutex_);
216+
while (!sequenced_queue_.empty()) {
217+
ExecBatch next(sequenced_queue_.front());
218+
{
219+
DoHandle do_handle(*this);
220+
sequenced_queue_.pop();
221+
}
222+
lk.unlock();
223+
ARROW_RETURN_NOT_OK(processor_->Process(std::move(next)));
224+
lk.lock();
225+
}
226+
is_processing_ = false;
227+
return Status::OK();
228+
}
229+
size_t SizeUnlocked() const { return sequenced_queue_.size(); }
230+
231+
Processor* processor_;
232+
BackpressureHandler handler_;
233+
ExecPlan* plan_;
234+
bool requires_io_;
235+
std::mutex mutex_;
236+
std::queue<ExecBatch> sequenced_queue_;
237+
bool is_processing_ = false;
238+
};
239+
163240
} // namespace
164241

165242
std::unique_ptr<SequencingQueue> SequencingQueue::Make(Processor* processor) {
@@ -170,6 +247,15 @@ std::unique_ptr<SerialSequencingQueue> SerialSequencingQueue::Make(Processor* pr
170247
return std::make_unique<SerialSequencingQueueImpl>(processor);
171248
}
172249

250+
std::unique_ptr<SerialSequencingQueue::Processor>
251+
SerialSequencingQueue::Processor::MakeBackpressureWrapper(Processor* processor,
252+
BackpressureHandler handler,
253+
ExecPlan* plan,
254+
bool requires_io) {
255+
return std::make_unique<util::BackpressureProcessor>(processor, std::move(handler),
256+
plan);
257+
}
258+
173259
} // namespace util
174260
} // namespace acero
175261
} // namespace arrow

cpp/src/arrow/acero/accumulation_queue.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include <functional>
2222
#include <optional>
2323
#include <vector>
24-
24+
#include "arrow/acero/backpressure_handler.h"
2525
#include "arrow/acero/visibility.h"
2626
#include "arrow/compute/exec.h"
2727
#include "arrow/result.h"
@@ -127,7 +127,7 @@ class ARROW_ACERO_EXPORT SequencingQueue {
127127
class ARROW_ACERO_EXPORT SerialSequencingQueue {
128128
public:
129129
/// Strategy that describes how to handle items
130-
class Processor {
130+
class ARROW_ACERO_EXPORT Processor {
131131
public:
132132
virtual ~Processor() = default;
133133
/// Process the batch
@@ -141,6 +141,16 @@ class ARROW_ACERO_EXPORT SerialSequencingQueue {
141141
/// TODO: Could add backpressure if needed but right now all uses of this should
142142
/// be pretty fast and so are unlikely to block.
143143
virtual Status Process(ExecBatch batch) = 0;
144+
145+
/// Wrapper for processor with backpressure
146+
///
147+
/// This wrapper adds backpressure logic acting on number of sequenced batches.
148+
// Also batches are Processed on new scheduled tasks. The tasks will be scheduled on
149+
/// IO executor when requires_io==true.
150+
static std::unique_ptr<Processor> MakeBackpressureWrapper(Processor* processor,
151+
BackpressureHandler handler,
152+
ExecPlan* plan,
153+
bool requires_io = false);
144154
};
145155

146156
virtual ~SerialSequencingQueue() = default;

0 commit comments

Comments
 (0)