File tree Expand file tree Collapse file tree 2 files changed +11
-0
lines changed
Expand file tree Collapse file tree 2 files changed +11
-0
lines changed Original file line number Diff line number Diff line change @@ -57,7 +57,15 @@ void BackpressureCombiner::Resume(Source* output) {
5757 }
5858}
5959
60+ void BackpressureCombiner::Stop () {
61+ std::lock_guard<std::mutex> lg (mutex_);
62+ stopped = true ;
63+ backpressure_control_->Resume ();
64+ paused = false ;
65+ }
66+
6067void BackpressureCombiner::UpdatePauseStateUnlocked () {
68+ if (stopped) return ;
6169 bool should_be_paused = (paused_count_ > 0 );
6270 if (!pause_on_any_) {
6371 should_be_paused = should_be_paused && (paused_count_ == paused_.size ());
Original file line number Diff line number Diff line change @@ -77,6 +77,8 @@ class ARROW_ACERO_EXPORT BackpressureCombiner {
7777 std::vector<BackpressureCombiner*> connections_;
7878 };
7979
80+ void Stop ();
81+
8082 private:
8183 friend class Source ;
8284 void Pause (Source* output);
@@ -89,6 +91,7 @@ class ARROW_ACERO_EXPORT BackpressureCombiner {
8991 std::unordered_map<Source*, bool > paused_;
9092 size_t paused_count_{0 };
9193 bool paused{false };
94+ bool stopped{false };
9295};
9396
9497} // namespace arrow::acero
You can’t perform that action at this time.
0 commit comments