Skip to content

Commit 629185d

Browse files
author
Rafał Hibner
committed
Merge remote-tracking branch 'gitmodimo/dataset_max_rows_queued' into combined3
2 parents de16004 + 12f97f7 commit 629185d

File tree

3 files changed

+31
-21
lines changed

3 files changed

+31
-21
lines changed

cpp/src/arrow/dataset/dataset_writer.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ namespace arrow {
2929
namespace dataset {
3030
namespace internal {
3131

32-
// This lines up with our other defaults in the scanner and execution plan
33-
constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 8 * 1024 * 1024;
34-
3532
/// \brief Utility class that manages a set of writers to different paths
3633
///
3734
/// Writers may be closed and reopened (and a new file created) based on the dataset

cpp/src/arrow/dataset/file_base.cc

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -452,9 +452,11 @@ Status ValidateAndPrepareSchema(const WriteNodeOptions& write_node_options,
452452
class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
453453
public:
454454
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
455-
FileSystemDatasetWriteOptions write_options)
455+
FileSystemDatasetWriteOptions write_options,
456+
uint64_t max_rows_queued)
456457
: custom_schema_(std::move(custom_schema)),
457-
write_options_(std::move(write_options)) {}
458+
write_options_(std::move(write_options)),
459+
max_rows_queued_(max_rows_queued) {}
458460

459461
Status Init(const std::shared_ptr<Schema>& schema,
460462
acero::BackpressureControl* backpressure_control,
@@ -464,12 +466,12 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
464466
} else {
465467
schema_ = schema;
466468
}
467-
ARROW_ASSIGN_OR_RAISE(
468-
dataset_writer_,
469-
internal::DatasetWriter::Make(
470-
write_options_, plan->query_context()->async_scheduler(),
471-
[backpressure_control] { backpressure_control->Pause(); },
472-
[backpressure_control] { backpressure_control->Resume(); }, [] {}));
469+
ARROW_ASSIGN_OR_RAISE(dataset_writer_,
470+
internal::DatasetWriter::Make(
471+
write_options_, plan->query_context()->async_scheduler(),
472+
[backpressure_control] { backpressure_control->Pause(); },
473+
[backpressure_control] { backpressure_control->Resume(); },
474+
[] {}, max_rows_queued_));
473475
return Status::OK();
474476
}
475477

@@ -501,6 +503,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
501503
std::shared_ptr<Schema> custom_schema_;
502504
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
503505
FileSystemDatasetWriteOptions write_options_;
506+
uint64_t max_rows_queued_;
504507
Future<> finished_ = Future<>::Make();
505508
std::shared_ptr<Schema> schema_ = nullptr;
506509
};
@@ -556,8 +559,9 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
556559
ValidateAndPrepareSchema(write_node_options, input_schema, custom_schema));
557560

558561
std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
559-
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema,
560-
write_node_options.write_options);
562+
std::make_shared<DatasetWritingSinkNodeConsumer>(
563+
custom_schema, write_node_options.write_options,
564+
write_node_options.max_rows_queued);
561565
ARROW_ASSIGN_OR_RAISE(
562566
auto node,
563567
// to preserve order explicitly, sequence the exec batches
@@ -577,20 +581,21 @@ class TeeNode : public acero::MapNode,
577581
public:
578582
TeeNode(acero::ExecPlan* plan, std::vector<acero::ExecNode*> inputs,
579583
std::shared_ptr<Schema> output_schema,
580-
FileSystemDatasetWriteOptions write_options)
584+
FileSystemDatasetWriteOptions write_options, uint64_t max_rows_queued)
581585
: MapNode(plan, std::move(inputs), std::move(output_schema)),
582-
write_options_(std::move(write_options)) {
586+
write_options_(std::move(write_options)),
587+
max_rows_queued_(max_rows_queued) {
583588
if (write_options.preserve_order) {
584589
sequencer_ = acero::util::SerialSequencingQueue::Make(this);
585590
}
586591
}
587592

588593
Status StartProducing() override {
589-
ARROW_ASSIGN_OR_RAISE(
590-
dataset_writer_,
591-
internal::DatasetWriter::Make(
592-
write_options_, plan_->query_context()->async_scheduler(),
593-
[this] { Pause(); }, [this] { Resume(); }, [this] { MapNode::Finish(); }));
594+
ARROW_ASSIGN_OR_RAISE(dataset_writer_,
595+
internal::DatasetWriter::Make(
596+
write_options_, plan_->query_context()->async_scheduler(),
597+
[this] { Pause(); }, [this] { Resume(); },
598+
[this] { MapNode::Finish(); }, max_rows_queued_));
594599
return MapNode::StartProducing();
595600
}
596601

@@ -608,7 +613,8 @@ class TeeNode : public acero::MapNode,
608613
ValidateAndPrepareSchema(write_node_options, input_schema, custom_schema));
609614

610615
return plan->EmplaceNode<TeeNode>(plan, std::move(inputs), std::move(custom_schema),
611-
std::move(write_node_options.write_options));
616+
std::move(write_node_options.write_options),
617+
write_node_options.max_rows_queued);
612618
}
613619

614620
const char* kind_name() const override { return "TeeNode"; }
@@ -657,6 +663,7 @@ class TeeNode : public acero::MapNode,
657663
private:
658664
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
659665
FileSystemDatasetWriteOptions write_options_;
666+
uint64_t max_rows_queued_;
660667
std::atomic<int32_t> backpressure_counter_ = 0;
661668
std::unique_ptr<acero::util::SerialSequencingQueue> sequencer_{nullptr};
662669
};

cpp/src/arrow/dataset/file_base.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ class ARROW_DS_EXPORT FileWriter {
385385
std::optional<int64_t> bytes_written_;
386386
};
387387

388+
// This lines up with our other defaults in the scanner and execution plan
389+
constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 8 * 1024 * 1024;
390+
388391
/// \brief Options for writing a dataset.
389392
struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
390393
/// Options for individual fragment writing.
@@ -487,6 +490,9 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {
487490
std::shared_ptr<Schema> custom_schema;
488491
/// \brief Optional metadata to attach to written batches
489492
std::shared_ptr<const KeyValueMetadata> custom_metadata;
493+
494+
/// Maximum rows queued before issuing pasue to upstream node
495+
uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued;
490496
};
491497

492498
/// @}

0 commit comments

Comments
 (0)