-
Notifications
You must be signed in to change notification settings - Fork 381
feat(io): implement write_sql with SQLDataSink and explicit dtype support #5979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Introduces DataFrame.write_sql() method and SQLDataSink to enable distributed SQL writes through the DataSink pattern, aligning with other connectors like ClickHouse and Bigtable.
Key Changes:
- Implemented
SQLDataSinkwith driver-side table initialization (start()) that handles write mode semantics (append/overwrite/fail) before distributed workers begin writing - Worker processes create isolated SQLAlchemy connections to avoid socket serialization issues across distributed workers
- Added optional
dtypeparameter for explicit SQLAlchemy type mapping, passed through topandas.DataFrame.to_sql() - Returns aggregate write metrics (
total_written_rows,total_written_bytes) as a single-row DataFrame - Comprehensive test coverage validates multiple data sources (pydict, CSV, JSON), write modes, dtype scenarios, and chunking options
- Tests properly verify correctness by reading back written data using both
daft.read_sql()and SQLAlchemy'sinspect()API
Minor Issue:
- Manual
write_modevalidation inwrite_sql()is redundant since the@DataframePublicAPIdecorator already validatesLiteraltype hints
Confidence Score: 4/5
- This PR is safe to merge with minimal risk - it follows established patterns and includes comprehensive testing
- Score reflects well-structured implementation following the DataSink pattern used by other connectors, thorough test coverage across multiple scenarios, and proper handling of distributed execution. The only issue found is a redundant validation check that doesn't affect functionality.
- No files require special attention - all changes follow established patterns and have appropriate test coverage
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| daft/dataframe/dataframe.py | 5/5 | Added write_sql method that creates SQLDataSink and calls write_sink; well-documented with clear examples showing dtype usage |
| daft/io/_sql.py | 4/5 | Implemented SQLDataSink following DataSink pattern; handles distributed writes with driver-side table setup and worker-side appends |
| tests/integration/sql/test_write_sql.py | 5/5 | Comprehensive test coverage for write_sql with multiple sources, modes, dtypes, and chunking; validates both write success and schema enforcement |
Sequence Diagram
sequenceDiagram
participant User
participant DataFrame
participant SQLDataSink
participant Driver
participant Workers
participant Database
User->>DataFrame: write_sql(table_name, conn, write_mode, dtype)
DataFrame->>DataFrame: limit(0).to_pandas() to get empty_pdf
DataFrame->>SQLDataSink: Create SQLDataSink(table_name, conn, write_mode, dtype, empty_pdf)
DataFrame->>SQLDataSink: write_sink(sink)
Note over SQLDataSink,Driver: Driver-side initialization
SQLDataSink->>Driver: start()
Driver->>Database: Connect to database
Driver->>Database: Check if table exists
alt write_mode == "fail"
alt Table exists
Database-->>Driver: Table exists
Driver-->>User: Raise ValueError
else Table does not exist
Driver->>Database: Create empty table with schema (empty_pdf.to_sql)
end
else write_mode == "overwrite"
Driver->>Database: Replace table with empty schema (empty_pdf.to_sql)
else write_mode == "append"
alt Table does not exist
Driver->>Database: Create empty table with schema (empty_pdf.to_sql)
end
end
Driver->>Database: Close connection
Note over SQLDataSink,Workers: Distributed write phase
loop For each micropartition
SQLDataSink->>Workers: write(micropartition)
Workers->>Database: Connect to database
Workers->>Workers: Convert micropartition.to_pandas()
Workers->>Database: pdf.to_sql(table_name, if_exists="append", dtype=dtype)
Database-->>Workers: Write complete
Workers-->>SQLDataSink: WriteResult(bytes_written, rows_written)
Workers->>Database: Close connection
end
Note over SQLDataSink,Driver: Finalization
SQLDataSink->>Driver: finalize(write_results)
Driver->>Driver: Aggregate total_written_rows and total_written_bytes
Driver-->>DataFrame: Return MicroPartition with metrics
DataFrame-->>User: Return DataFrame with write metrics
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5979 +/- ##
==========================================
- Coverage 72.63% 72.59% -0.05%
==========================================
Files 970 970
Lines 126562 126636 +74
==========================================
+ Hits 91924 91927 +3
- Misses 34638 34709 +71
🚀 New features to boost your workflow:
|
e72da49 to
82bcd24
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @huleilei! For reference, one of the reasons why the original PR wasn't merged was more verification needing to be done on handling Daft types. This will probably need some discretion based on how smoothly things, but here's another PR that implemented a postgres catalog that could be useful: 670eecc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context and the reference to the Postgres catalog PR! I completely understand the concern regarding type verification, as ensuring correct schema mapping is critical for production use.
To address this, I've designed the implementation with a "safe by default, controllable when needed" approach:
- Explicit Type Control via dtype : I've exposed the dtype parameter in write_sql , which is passed directly to the underlying to_sql call. This serves as a robust "escape hatch," allowing users to explicitly define SQLAlchemy types for columns where default inference might be insufficient (e.g., specific precision for Decimals or JSON types).
- Leveraging Pandas Ecosystem : By utilizing micropartition.to_pandas() , we benefit from Pandas' mature and battle-tested type inference logic for standard Daft/Arrow types before they hit the database.
- Verification Tests : I've added comprehensive integration tests (specifically test_write_sql_dtype_basic_types and test_write_sql_dtype_empty_df_creates_table ) that not only write data but also use sqlalchemy.inspect to verify that the actual created table schema matches our expectations.
While a full Catalog integration (like in #670eecc) is definitely a great direction for the future, I believe this DataSink implementation provides a solid, verified foundation for generic SQL writing capabilities.
|
@desmondcheongzx @kevinzwang @colin-ho help me review when you are convenient. Thanks |
Summary
This PR introduces DataFrame.write_sql() , enabling users to write Daft DataFrames to SQL databases (e.g., PostgreSQL, SQLite) via SQLAlchemy.
It implements a robust, distributed SQLDataSink that:
Key Changes
1. New Public API: DataFrame.write_sql
2. Internal Implementation: SQLDataSink
3. Tests
Addressing Previous Concerns (Type Verification)
This implementation addresses concerns about type safety (raised in previous discussions) by:
Checklist
Thank you very much for the idea provided by #5471
Related Issues