Skip to content

Commit b9c8566

Browse files
author
Rafał Hibner
committed
Check for duplicate pipe sinks
1 parent 5a7dca3 commit b9c8566

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

cpp/src/arrow/acero/pipe_node.cc

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ class PipeTeeNode : public PipeSource, public PipeSinkNode {
186186
PipeTeeNode(ExecPlan* plan, std::vector<ExecNode*> inputs, std::string pipe_name)
187187
: PipeSinkNode(plan, inputs, pipe_name) {
188188
output_schema_ = inputs[0]->output_schema();
189-
PipeSinkNode::pipe_->addSource(this);
189+
auto st = PipeSinkNode::pipe_->addSource(this);
190+
if (ARROW_PREDICT_FALSE(!st.ok())) { // this should never happen
191+
internal::DieWithMessage(std::string("PipeTee unexpected error: ") + st.ToString());
192+
}
190193
}
191194

192195
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
@@ -236,7 +239,11 @@ const char PipeTeeNode::kKindName[] = "PipeTeeNode";
236239
} // namespace
237240

238241
PipeSource::PipeSource() {}
239-
void PipeSource::Initialize(Pipe* pipe) { pipe_ = pipe; }
242+
Status PipeSource::Initialize(Pipe* pipe) {
243+
if (pipe_) return Status::Invalid("Pipe:" + pipe->PipeName() + " has multiple sinks");
244+
pipe_ = pipe;
245+
return Status::OK();
246+
}
240247

241248
void PipeSource::Pause(int32_t counter) { pipe_->Pause(this, counter); }
242249
void PipeSource::Resume(int32_t counter) { pipe_->Resume(this, counter); }
@@ -307,15 +314,16 @@ Status Pipe::InputFinished(int total_batches) {
307314
return Status::OK();
308315
}
309316

310-
void Pipe::addSource(PipeSource* source) {
311-
source->Initialize(this);
317+
Status Pipe::addSource(PipeSource* source) {
318+
ARROW_RETURN_NOT_OK(source->Initialize(this));
312319
// First added source is handled in receiving task. All additional sources are delivered
313320
// in their own sutmit tasks
314321
if (!last_source_node_)
315322
last_source_node_ = source;
316323
else {
317324
source_nodes_.push_back(source);
318325
}
326+
return Status::OK();
319327
}
320328

321329
Status Pipe::Init(const std::shared_ptr<Schema> schema) {
@@ -326,7 +334,7 @@ Status Pipe::Init(const std::shared_ptr<Schema> schema) {
326334
if (!schema->Equals(node->output_schema())) {
327335
return Status::Invalid("Pipe schema does not match for " + pipe_name_);
328336
}
329-
addSource(pipe_source);
337+
ARROW_RETURN_NOT_OK(addSource(pipe_source));
330338
}
331339
}
332340
}

cpp/src/arrow/acero/pipe_node.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class PipeSource {
4040

4141
private:
4242
friend class Pipe;
43-
void Initialize(Pipe* pipe);
43+
Status Initialize(Pipe* pipe);
4444

4545
virtual Status HandleInputReceived(ExecBatch batch) = 0;
4646
virtual Status HandleInputFinished(int total_batches) = 0;
@@ -66,13 +66,15 @@ class ARROW_ACERO_EXPORT Pipe {
6666
// Called from pipe_sink
6767
Status InputFinished(int total_batches);
6868

69-
void addSource(PipeSource* source);
69+
Status addSource(PipeSource* source);
7070

7171
// Called from pipe_sink Init
7272
Status Init(const std::shared_ptr<Schema> schema);
7373

7474
bool HasSources() const;
7575

76+
std::string PipeName() const { return pipe_name_; }
77+
7678
private:
7779
// pipe
7880
ExecPlan* plan_;

cpp/src/arrow/acero/pipe_node_test.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,32 @@ TEST(ExecPlanExecution, PipeErrorSink) {
131131
HasSubstr("Pipe 'named_pipe_missing_sink' error: Pipe does not have sink")));
132132
}
133133

134+
TEST(ExecPlanExecution, PipeErrorDuplicateSink) {
135+
auto basic_data = MakeBasicBatches();
136+
137+
AsyncGenerator<std::optional<ExecBatch>> main_sink_gen;
138+
AsyncGenerator<std::optional<ExecBatch>> dup_sink_gen;
139+
Declaration decl = Declaration::Sequence(
140+
{{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false,
141+
/*slow=*/false)}},
142+
{"pipe_tee", PipeSinkNodeOptions{"named_pipe_1"}},
143+
{"pipe_tee", PipeSinkNodeOptions{"named_pipe_1"}},
144+
{"sink", SinkNodeOptions{&main_sink_gen}}});
145+
146+
Declaration dup = Declaration::Sequence(
147+
{{"pipe_source", PipeSourceNodeOptions{"named_pipe_1", basic_data.schema}},
148+
{"sink", SinkNodeOptions{&dup_sink_gen}}});
149+
150+
// fail on planning
151+
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
152+
ASSERT_OK(decl.AddToPlan(plan.get()));
153+
ASSERT_OK(dup.AddToPlan(plan.get()));
154+
plan->StartProducing();
155+
ASSERT_THAT(
156+
plan->finished().result().status(),
157+
Raises(StatusCode::Invalid, HasSubstr("Pipe:named_pipe_1 has multiple sinks")));
158+
}
159+
134160
TEST(ExecPlanExecution, PipeFilterSink) {
135161
auto basic_data = MakeBasicBatches();
136162
AsyncGenerator<std::optional<ExecBatch>> main_sink_gen;

0 commit comments

Comments
 (0)