Skip to content

Commit a4d50fd

Browse files
author
Rafał Hibner
committed
Move unpause when stop logic to PipeSource
1 parent 0c12744 commit a4d50fd

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

cpp/src/arrow/acero/pipe_node.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,11 @@ Status PipeSource::Initialize(Pipe* pipe) {
242242

243243
void PipeSource::Pause(int32_t counter) {
244244
auto lock = mutex_.Lock();
245-
if (backpressure_counter < counter) {
246-
backpressure_counter = counter;
247-
backpressure_source_.Pause();
245+
if (!stopped) {
246+
if (backpressure_counter < counter) {
247+
backpressure_counter = counter;
248+
backpressure_source_.Pause();
249+
}
248250
}
249251
}
250252
void PipeSource::Resume(int32_t counter) {
@@ -255,8 +257,14 @@ void PipeSource::Resume(int32_t counter) {
255257
}
256258
}
257259
Status PipeSource::StopProducing() {
258-
if (pipe_) return pipe_->StopProducing(this);
259-
// stopped before initialization
260+
if (pipe_) {
261+
{
262+
auto lock = mutex_.Lock();
263+
stopped = true;
264+
backpressure_source_.Resume();
265+
}
266+
return pipe_->StopProducing(this);
267+
}
260268
return Status::OK();
261269
}
262270

@@ -290,7 +298,6 @@ Status Pipe::StopProducing(PipeSource* output) {
290298
auto lock = mutex_.Lock();
291299
auto& state = state_[output];
292300
DCHECK(!state.stopped);
293-
BackpressureCombiner::Stop();
294301
state.stopped = true;
295302
size_t stopped_count = ++stopped_count_;
296303
if (stop_on_any_) {

cpp/src/arrow/acero/pipe_node.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class PipeSource {
5151
BackpressureCombiner::Source backpressure_source_;
5252
Mutex mutex_;
5353
int backpressure_counter{0};
54+
std::atomic<bool> stopped{false};
5455
};
5556

5657
/// @brief Provides pipe like infastructure for Acero. It isa center element for

0 commit comments

Comments
 (0)