A Deep Deep Dive into df.write_lance() #4600
everettVT
started this conversation in
Show and tell
Replies: 2 comments
-
|
I think this would be a great piece of source material as a prompt for a deep research prompted blog post. |
Beta Was this translation helpful? Give feedback.
0 replies
-
|
Good job, @everettVT. I noticed you've done some work on the daft lance connector. I'd like to know your thoughts. Let's see if we can cooperate to break down this work? |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
So as a part of another discussion on adding LanceTable support to the daft catalog, I began reviewing the daft.read_lance and df.write_lance methods and went down a huge rabbit hole which helped me understand both how daft performs distributed writes, and how lance fragements work.
I figured I would share my notes that I made a long the way as an exercise.
How Does Daft write Lance Datasets?
sequenceDiagram participant User participant DataFrame participant WriteSink as DataFrame.write_sink participant LanceDataSink participant LogicalPlanBuilder participant RustLogicalBuilder as _LogicalPlanBuilder (Rust) participant PhysicalPlan participant DataSink as DataSink.write() participant Lance User->>DataFrame: write_lance(uri, mode, io_config, **kwargs) DataFrame->>LanceDataSink: new LanceDataSink(uri, schema, mode, io_config, **kwargs) Note over LanceDataSink: Constructor:<br/>- Set _table_uri, _mode, _io_config, _kwargs<br/>- Convert io_config to storage_options<br/>- Convert schema to pyarrow_schema<br/>- Try lance.dataset() to read existing<br/>- Set _version (0 or latest_version)<br/>- Validate schema if table exists LanceDataSink-->>DataFrame: sink instance DataFrame->>WriteSink: write_sink(sink) WriteSink->>LanceDataSink: sink.start() Note over LanceDataSink: start() is pass-through (no-op) WriteSink->>LogicalPlanBuilder: _builder.write_datasink(sink.name(), sink) LogicalPlanBuilder->>RustLogicalBuilder: _builder.datasink_write(name, sink) RustLogicalBuilder-->>LogicalPlanBuilder: new _LogicalPlanBuilder LogicalPlanBuilder-->>WriteSink: new LogicalPlanBuilder WriteSink->>WriteSink: write_df = DataFrame(builder) WriteSink->>WriteSink: write_df.collect() Note over WriteSink,PhysicalPlan: Physical execution begins during collect() WriteSink->>PhysicalPlan: Execute physical plan loop For each partition in the DataFrame PhysicalPlan->>DataSink: sink.write(micropartitions_iterator) loop For each micropartition in partition DataSink->>DataSink: micropartition.to_arrow() DataSink->>Lance: lance.fragment.write_fragments(arrow_table, dataset_uri, mode, storage_options, **kwargs) Lance-->>DataSink: list[FragmentMetadata] DataSink->>DataSink: yield WriteResult(fragments, bytes_written, rows_written) end DataSink-->>PhysicalPlan: Iterator[WriteResult[FragmentMetadata]] end PhysicalPlan-->>WriteSink: All write results collected WriteSink->>WriteSink: results = write_df.to_pydict() WriteSink->>LanceDataSink: sink.finalize(results["write_results"]) Note over LanceDataSink: finalize():<br/>- Extract all fragments from write_results<br/>- Create LanceOperation (Overwrite/Append)<br/>- Commit operation to create final dataset<br/>- Return stats as MicroPartition loop For each write_result in write_results LanceDataSink->>LanceDataSink: chain fragments from write_result.result end alt mode == "create" or "overwrite" LanceDataSink->>Lance: LanceOperation.Overwrite(schema, all_fragments) else mode == "append" LanceDataSink->>Lance: LanceOperation.Append(all_fragments) end LanceDataSink->>Lance: LanceDataset.commit(uri, operation, read_version, storage_options) Lance-->>LanceDataSink: committed_dataset LanceDataSink->>Lance: dataset.stats.dataset_stats() Lance-->>LanceDataSink: stats_dict LanceDataSink->>LanceDataSink: MicroPartition.from_pydict({num_fragments, num_deleted_rows, num_small_files, version}) LanceDataSink-->>WriteSink: final_micropartition WriteSink->>WriteSink: to_logical_plan_builder(micropartition) WriteSink-->>DataFrame: DataFrame with write results DataFrame-->>User: final result DataFramedataframe.py
At
daft/dataframe/dataframe.pywith def write_lance()lance_data_sink.py
using
daft/dataframe/lance_data_sink.pyclass LanceDataSinkwrite_sink() in dataframe.py
whose methods are executed with DataFrame.write_sink()
write_lance in daft/logical/builder.py
which routes the sink operation through LogicalPlanBuilder.write_lance() from
daft/logical/builder.pylance_write in daft/execution/physical_plan.py
which runs def lance_write() from
daft/execution/physical_plan.pyWriteLance in daft/execution/execution_step.py
which executes WriteLance in
daft/execution/execution_step.pywrite_lance in daft/execution/record_batch_io.py
which FINALLY executes write_lance() in
daft/execution/recordbatch_io.pyBeta Was this translation helpful? Give feedback.
All reactions