Skip to content

Commit e9d6ae7

Browse files
author
Rafał Hibner
committed
StopProducing after shutdown
1 parent ed76cbe commit e9d6ae7

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,8 +1046,10 @@ class AsofJoinNode : public ExecNode {
10461046
if (st.ok()) {
10471047
st = output_->InputFinished(this, batches_produced_);
10481048
}
1049-
for (const auto& s : state_) {
1049+
for (size_t i = 0; i < state_.size(); ++i) {
1050+
const auto& s = state_[i];
10501051
s->ForceShutdown();
1052+
st &= inputs_[i]->StopProducing();
10511053
}
10521054
}));
10531055
}
@@ -1499,8 +1501,11 @@ class AsofJoinNode : public ExecNode {
14991501
if (st.ok()) {
15001502
st = output_->InputFinished(this, batches_produced_);
15011503
}
1502-
for (const auto& s : state_) {
1503-
st &= s->ForceShutdown();
1504+
1505+
for (size_t i = 0; i < state_.size(); ++i) {
1506+
const auto& s = state_[i];
1507+
s->ForceShutdown();
1508+
st &= inputs_[i]->StopProducing();
15041509
}
15051510
}
15061511

cpp/src/arrow/acero/util_test.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ TEST(BackpressureConcurrentQueue, BackpressureTest) {
299299
ASSERT_FALSE(dummy_node.stopped);
300300
queue.ForceShutdown();
301301
ASSERT_FALSE(dummy_node.paused);
302-
ASSERT_TRUE(dummy_node.stopped);
303302
}
304303

305304
} // namespace acero

0 commit comments

Comments
 (0)