diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index ccc79dfa9bf..0f1b8b52f5b 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -17,6 +17,7 @@ #include "arrow/dataset/file_base.h" +#include "arrow/acero/accumulation_queue.h" #include "arrow/acero/exec_plan.h" #include @@ -559,13 +560,18 @@ Result MakeWriteNode(acero::ExecPlan* plan, return node; } -class TeeNode : public acero::MapNode { +class TeeNode : public acero::MapNode, + public arrow::acero::util::SerialSequencingQueue::Processor { public: TeeNode(acero::ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, FileSystemDatasetWriteOptions write_options) : MapNode(plan, std::move(inputs), std::move(output_schema)), - write_options_(std::move(write_options)) {} + write_options_(std::move(write_options)) { + if (write_options.preserve_order) { + sequencer_ = acero::util::SerialSequencingQueue::Make(this); + } + } Status StartProducing() override { ARROW_ASSIGN_OR_RAISE( @@ -592,6 +598,18 @@ class TeeNode : public acero::MapNode { const char* kind_name() const override { return "TeeNode"; } + Status InputReceived(ExecNode* input, ExecBatch batch) override { + DCHECK_EQ(input, inputs_[0]); + if (sequencer_) { + return sequencer_->InsertBatch(std::move(batch)); + } + return Process(std::move(batch)); + } + + Status Process(ExecBatch batch) override { + return acero::MapNode::InputReceived(inputs_[0], batch); + } + void Finish() override { dataset_writer_->Finish(); } Result ProcessBatch(compute::ExecBatch batch) override { @@ -625,6 +643,7 @@ class TeeNode : public acero::MapNode { std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; std::atomic backpressure_counter_ = 0; + std::unique_ptr sequencer_{nullptr}; }; } // namespace