-
Notifications
You must be signed in to change notification settings - Fork 67
feat: Single data column in cache #818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@mb/data-json' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@mb/data-json' Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughAdds a single JSON column mode controlled by SqlConfig.use_single_json_column, consolidating non-internal data into _airbyte_data. Updates column generation, dataframe preparation, and file-to-table writes accordingly. Adds Postgres JSONB typing via a new type converter, a DuckDB runtime fallback for this mode, constants/imports, and integration tests validating structure and behavior. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant PyAirbyte as PyAirbyte API
participant SQLProc as SQL Processor
participant DB as SQL Backend
rect rgba(220,235,255,0.5)
note right of PyAirbyte: Write with use_single_json_column = true
User->>PyAirbyte: write(dataset, config{use_single_json_column=true, strategy=APPEND})
PyAirbyte->>SQLProc: prepare load
SQLProc->>SQLProc: validate strategy (APPEND/AUTO only)
SQLProc->>SQLProc: build columns: [_airbyte_data, _airbyte_id, _airbyte_extracted_at, _airbyte_meta]
SQLProc->>SQLProc: pack non-internal fields into _airbyte_data (dict/JSON)
SQLProc->>DB: create table if needed
SQLProc->>DB: insert rows with JSON column
DB-->>SQLProc: ack
SQLProc-->>PyAirbyte: result
PyAirbyte-->>User: processed_records
end
sequenceDiagram
autonumber
participant SQLProc as DuckDB Processor
participant Super as Base SQL Processor
participant DuckDB as DuckDB
alt use_single_json_column = true
SQLProc->>Super: _write_files_to_new_table(files, ...)
note right of Super: Use base file-to-table path
else native path
SQLProc->>DuckDB: read files natively
SQLProc->>DuckDB: insert into new table
end
sequenceDiagram
autonumber
participant Proc as PostgresSqlProcessor
participant Conv as PostgresTypeConverter
participant PG as PostgreSQL
Proc->>Conv: get_json_type()
Conv-->>Proc: JSONB()
Proc->>PG: create table with _airbyte_data JSONB
Proc->>PG: insert rows
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
tests/integration_tests/test_single_json_column.py (3)
114-117
: Minor comment inconsistency: psycopg vs psycopg2.The comment mentions "psycopg2" but the code uses
psycopg
(version 3). Would it make sense to update the comment to match the actual library being used, wdyt?Apply this diff to correct the comment:
- # Verify it's a dict (psycopg2 automatically parses JSONB) + # Verify it's a dict (psycopg automatically parses JSONB)
167-175
: Consider parameterized queries to avoid SQL injection patterns.The queries use f-string interpolation to embed schema and table names directly into SQL strings. While this is acceptable in a test context where inputs are controlled, it demonstrates a pattern that could be risky in production code. Would it make sense to use SQLAlchemy's parameterization or identifier quoting for better practice demonstration, wdyt?
For example:
from sqlalchemy import column, table, select # Instead of f-strings, use SQLAlchemy constructs columns_query = select( column('column_name'), column('data_type') ).select_from( table('columns', schema='information_schema') ).where( column('table_schema') == new_duckdb_cache.schema_name ).where( column('table_name') == table_name ).order_by(column('ordinal_position'))This is more of a best-practice suggestion rather than a critical issue, since the test context is safe.
Also applies to: 199-205
234-261
: Consider testing "auto" write strategy as well.The test effectively validates that merge and replace strategies are rejected. However, the PR mentions that "auto" strategy should work (presumably by defaulting to append). Would it make sense to add a positive test case for "auto" strategy to ensure it's handled correctly in single JSON column mode, wdyt?
Something like:
# Verify that AUTO strategy works (should default to APPEND) result = source.read(new_duckdb_cache, write_strategy="auto") assert result.processed_records > 0
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
airbyte/_processors/sql/duckdb.py
(1 hunks)airbyte/_processors/sql/postgres.py
(3 hunks)airbyte/constants.py
(1 hunks)airbyte/datasets/_sql.py
(2 hunks)airbyte/shared/sql_processor.py
(6 hunks)tests/integration_tests/test_all_cache_types.py
(1 hunks)tests/integration_tests/test_single_json_column.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
airbyte/shared/sql_processor.py (7)
airbyte/strategies.py (1)
WriteStrategy
(18-51)airbyte/exceptions.py (1)
PyAirbyteInputError
(201-210)airbyte/_processors/sql/postgres.py (2)
get_json_type
(47-49)normalize
(65-67)airbyte/types.py (2)
get_json_type
(120-122)to_sql_type
(124-162)airbyte/_processors/sql/snowflake.py (2)
get_json_type
(198-200)to_sql_type
(182-195)airbyte/shared/catalog_providers.py (1)
get_stream_properties
(122-127)airbyte/_util/name_normalizers.py (2)
normalize
(23-25)normalize
(53-87)
airbyte/_processors/sql/postgres.py (2)
airbyte/shared/sql_processor.py (1)
SqlProcessorBase
(179-1240)airbyte/types.py (2)
SQLTypeConverter
(99-162)get_json_type
(120-122)
airbyte/datasets/_sql.py (2)
airbyte/caches/base.py (1)
processor
(191-193)airbyte/shared/sql_processor.py (1)
sql_config
(412-414)
airbyte/_processors/sql/duckdb.py (1)
airbyte/shared/sql_processor.py (2)
sql_config
(412-414)_write_files_to_new_table
(883-953)
tests/integration_tests/test_single_json_column.py (6)
airbyte/caches/duckdb.py (1)
DuckDBCache
(43-54)airbyte/caches/postgres.py (1)
PostgresCache
(37-61)tests/conftest.py (3)
source_test_installation
(289-320)source_test_registry
(250-272)new_postgres_cache
(231-246)airbyte/datasets/_sql.py (4)
stream_name
(87-88)column_names
(145-157)to_pandas
(109-110)to_pandas
(194-196)airbyte/shared/sql_processor.py (1)
get_sql_connection
(432-442)airbyte/exceptions.py (1)
PyAirbyteInputError
(201-210)
🪛 GitHub Actions: Run Linters
airbyte/shared/sql_processor.py
[error] 920-920: Mypy type check failed. airbyte/shared/sql_processor.py:920: List item 0 has incompatible type "None"; expected "str | bytes | date | timedelta | datetime64 | <8 more items>" [list-item]. Command: 'poetry run mypy .'
airbyte/_processors/sql/duckdb.py
[error] 122-122: Trailing whitespace detected (W291). This can be fixed automatically with 'ruff --fix' or by removing trailing spaces.
[error] ruff check failed with exit code 1 due to the above issues. Command: poetry run ruff check .
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (4)
tests/integration_tests/test_single_json_column.py (4)
1-26
: LGTM!The imports and fixture setup are clean and appropriate for the integration test. The autouse fixtures effectively enable the source-test connector for all tests in this module.
28-139
: Well-structured PostgreSQL test.The test thoroughly validates the single JSON column feature for PostgreSQL:
- Correctly enables the feature and uses append-only strategy
- Validates table structure at the SQL level (4 columns, JSONB type)
- Verifies data integrity both directly via SQL and through the PyAirbyte API
The combination of low-level SQL inspection and high-level API validation provides good coverage.
141-231
: Solid DuckDB structure test.The test effectively validates the single JSON column feature for DuckDB:
- Uses the cache's existing SQL connection (cleaner than manual connection)
- Handles DuckDB-specific behavior (JSON potentially returned as string)
- Validates structure and data integrity at both SQL and API levels
The parallel structure to the PostgreSQL test makes it easy to compare implementations.
234-261
: Good negative path testing.The test appropriately validates that incompatible write strategies (merge and replace) are rejected when single JSON column mode is enabled. The error message assertions ensure users receive helpful feedback about the restriction.
# since the DuckDB native approach doesn't support dynamically combining columns | ||
# into JSON. | ||
if self.sql_config.use_single_json_column: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix trailing whitespace to satisfy Ruff.
The new comment block has trailing spaces, which is tripping Ruff’s W291 check in CI. Could we trim those so the lint job passes, wdyt?
- # since the DuckDB native approach doesn't support dynamically combining columns
+ # since the DuckDB native approach doesn't support dynamically combining columns
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
# since the DuckDB native approach doesn't support dynamically combining columns | |
# into JSON. | |
if self.sql_config.use_single_json_column: | |
# since the DuckDB native approach doesn't support dynamically combining columns | |
# into JSON. | |
if self.sql_config.use_single_json_column: |
🧰 Tools
🪛 GitHub Actions: Run Linters
[error] 122-122: Trailing whitespace detected (W291). This can be fixed automatically with 'ruff --fix' or by removing trailing spaces.
🤖 Prompt for AI Agents
In airbyte/_processors/sql/duckdb.py around lines 122 to 124, the added comment
lines contain trailing whitespace which triggers Ruff W291; remove the trailing
spaces at the ends of those comment lines (and any other trailing whitespace
nearby), save the file, and re-run the linter/CI to verify the warning is
resolved.
dataframe[AB_DATA_COLUMN] = ( | ||
dataframe[data_columns] | ||
.replace([np.nan], [None], regex=False) | ||
.apply(lambda row: row.to_dict(), axis=1) | ||
) | ||
# Drop the original data columns | ||
dataframe = dataframe.drop(columns=data_columns) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address Mypy failure in single JSON consolidation.
Mypy is failing on this block because .replace([np.nan], [None], regex=False)
feeds [None]
to pandas, which violates the stubbed type signature (see the CI error at Line 920). Could we switch to a where
-based approach to normalize NaNs before building _airbyte_data
, wdyt?
- if data_columns:
- dataframe[AB_DATA_COLUMN] = (
- dataframe[data_columns]
- .replace([np.nan], [None], regex=False)
- .apply(lambda row: row.to_dict(), axis=1)
- )
+ if data_columns:
+ sanitized = dataframe[data_columns].where(
+ pd.notna(dataframe[data_columns]), None
+ )
+ dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
dataframe[AB_DATA_COLUMN] = ( | |
dataframe[data_columns] | |
.replace([np.nan], [None], regex=False) | |
.apply(lambda row: row.to_dict(), axis=1) | |
) | |
# Drop the original data columns | |
dataframe = dataframe.drop(columns=data_columns) | |
else: | |
if data_columns: | |
sanitized = dataframe[data_columns].where( | |
pd.notna(dataframe[data_columns]), None | |
) | |
dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1) | |
# Drop the original data columns | |
dataframe = dataframe.drop(columns=data_columns) | |
else: |
🧰 Tools
🪛 GitHub Actions: Run Linters
[error] 920-920: Mypy type check failed. airbyte/shared/sql_processor.py:920: List item 0 has incompatible type "None"; expected "str | bytes | date | timedelta | datetime64 | <8 more items>" [list-item]. Command: 'poetry run mypy .'
🤖 Prompt for AI Agents
In airbyte/shared/sql_processor.py around lines 918 to 925, the current use of
.replace([np.nan], [None], regex=False) causes a mypy type error because passing
[None] to replace conflicts with pandas stubs; change the normalization to a
where-based approach: compute a cleaned frame for data_columns that replaces NaN
values with None using a boolean mask (e.g., using isna()/~isna() with
where/mask) and then apply row.to_dict() on that cleaned frame to build
AB_DATA_COLUMN, then drop the original data_columns as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've noticed in another PR that MyPy is being replaced, wondering if I should fix this anyway
We have a data pipeline composed of Airbyte for the data reading from different sources which is scheduled and orchestrated by Dagster (via Airbyte API) therefore we think it makes sense for us to give PyAirbyte a try and just run our connectors within Dagster.
However, we only use Airbyte to read data from sources and load it into raw tables, always appending, without any deduplication or normalization, which is later done by Dagster. In this setup the Postgres connector writes all the data it receives to a jsonb column named
_airbyte_data
.PyAirbyte uses one column per attribute. The main issue with this approach is that it heavily relies on the quality of the connector schema: any little discrepancies between the schema and the data flowing in and it won't be able to write data into the destination table. There are many connectors where the type for schema fields has not been defined and thus Airbyte assumes string which pretty often does not correlate with the actual data being read.
This PR introduces a flag to add that behavior, only intended when appending data given that we have everything inside that jsonb column, even primary keys, without breaking any existing feature (hopefully!).
I have tested it with PostgreSQL, added tests for it and DuckDB too. Happy to extend this to other engines if needed.
Summary by CodeRabbit
New Features
Tests