File tree Expand file tree Collapse file tree 1 file changed +17
-1
lines changed
Expand file tree Collapse file tree 1 file changed +17
-1
lines changed Original file line number Diff line number Diff line change @@ -925,6 +925,9 @@ class PushGenerator {
925925 };
926926
927927 struct StateWithBackpressure : public State {
928+ using State::consumer_fut;
929+ using State::finished;
930+ using State::result_q;
928931 explicit StateWithBackpressure (acero::BackpressureHandler handler)
929932 : handler_(std::move(handler)) {}
930933
@@ -945,8 +948,21 @@ class PushGenerator {
945948
946949 bool Push (Result<T> result) override {
947950 auto lock = State::mutex.Lock ();
951+ if (finished) {
952+ // Closed early
953+ return false ;
954+ }
955+ if (consumer_fut.has_value ()) {
956+ auto fut = std::move (consumer_fut.value ());
957+ consumer_fut.reset ();
958+ lock.Unlock (); // unlock before potentially invoking a callback
959+ fut.MarkFinished (std::move (result));
960+ return true ;
961+ }
962+ // do_handle must be in always locked state but only needed when queue size changes
948963 DoHandle do_handle (*this );
949- return State::PushUnlocked (std::move (result), std::move (lock));
964+ result_q.push_back (std::move (result));
965+ return true ;
950966 }
951967
952968 Future<T> Pop () override {
You can’t perform that action at this time.
0 commit comments