Skip to content

Commit 8b52207

Browse files
author
Rafał Hibner
committed
Handler in locked state
1 parent 075e9f8 commit 8b52207

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

cpp/src/arrow/util/async_generator.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,9 @@ class PushGenerator {
925925
};
926926

927927
struct StateWithBackpressure : public State {
928+
using State::consumer_fut;
929+
using State::finished;
930+
using State::result_q;
928931
explicit StateWithBackpressure(acero::BackpressureHandler handler)
929932
: handler_(std::move(handler)) {}
930933

@@ -945,8 +948,21 @@ class PushGenerator {
945948

946949
bool Push(Result<T> result) override {
947950
auto lock = State::mutex.Lock();
951+
if (finished) {
952+
// Closed early
953+
return false;
954+
}
955+
if (consumer_fut.has_value()) {
956+
auto fut = std::move(consumer_fut.value());
957+
consumer_fut.reset();
958+
lock.Unlock(); // unlock before potentially invoking a callback
959+
fut.MarkFinished(std::move(result));
960+
return true;
961+
}
962+
// do_handle must be in always locked state but only needed when queue size changes
948963
DoHandle do_handle(*this);
949-
return State::PushUnlocked(std::move(result), std::move(lock));
964+
result_q.push_back(std::move(result));
965+
return true;
950966
}
951967

952968
Future<T> Pop() override {

0 commit comments

Comments
 (0)