-
Notifications
You must be signed in to change notification settings - Fork 67
feat: Single data column in cache #818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,6 +11,7 @@ | |||||||||||||||||||||||||||||||||
from functools import cached_property | ||||||||||||||||||||||||||||||||||
from typing import TYPE_CHECKING, Any, cast, final | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
import numpy as np | ||||||||||||||||||||||||||||||||||
import pandas as pd | ||||||||||||||||||||||||||||||||||
import sqlalchemy | ||||||||||||||||||||||||||||||||||
import ulid | ||||||||||||||||||||||||||||||||||
|
@@ -42,6 +43,7 @@ | |||||||||||||||||||||||||||||||||
from airbyte._util.hashing import one_way_hash | ||||||||||||||||||||||||||||||||||
from airbyte._util.name_normalizers import LowerCaseNormalizer | ||||||||||||||||||||||||||||||||||
from airbyte.constants import ( | ||||||||||||||||||||||||||||||||||
AB_DATA_COLUMN, | ||||||||||||||||||||||||||||||||||
AB_EXTRACTED_AT_COLUMN, | ||||||||||||||||||||||||||||||||||
AB_META_COLUMN, | ||||||||||||||||||||||||||||||||||
AB_RAW_ID_COLUMN, | ||||||||||||||||||||||||||||||||||
|
@@ -92,6 +94,13 @@ class SqlConfig(BaseModel, abc.ABC): | |||||||||||||||||||||||||||||||||
table_prefix: str | None = "" | ||||||||||||||||||||||||||||||||||
"""A prefix to add to created table names.""" | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
use_single_json_column: bool = Field(default=False) | ||||||||||||||||||||||||||||||||||
"""Store all data properties in a single JSON/JSONB column instead of individual columns. | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
When enabled, all data properties are stored in a single column named | ||||||||||||||||||||||||||||||||||
`_airbyte_data`. | ||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
_engine: Engine | None = None | ||||||||||||||||||||||||||||||||||
"""Cached SQL engine instance.""" | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
@@ -278,6 +287,24 @@ def process_airbyte_messages( | |||||||||||||||||||||||||||||||||
context={"write_strategy": write_strategy}, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Validate that single JSON column mode is only used with append strategy | ||||||||||||||||||||||||||||||||||
if ( | ||||||||||||||||||||||||||||||||||
self.sql_config.use_single_json_column | ||||||||||||||||||||||||||||||||||
and write_strategy not in {WriteStrategy.APPEND, WriteStrategy.AUTO} | ||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||
raise exc.PyAirbyteInputError( | ||||||||||||||||||||||||||||||||||
message=( | ||||||||||||||||||||||||||||||||||
"Single JSON column mode (`use_single_json_column=True`) is only " | ||||||||||||||||||||||||||||||||||
"compatible with `APPEND` write strategy. " | ||||||||||||||||||||||||||||||||||
"Other write strategies require primary keys as separate columns " | ||||||||||||||||||||||||||||||||||
"for merge/replace operations." | ||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||
context={ | ||||||||||||||||||||||||||||||||||
"write_strategy": write_strategy.value, | ||||||||||||||||||||||||||||||||||
"use_single_json_column": True, | ||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
stream_record_handlers: dict[str, StreamRecordHandler] = {} | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Process messages, writing to batches as we go | ||||||||||||||||||||||||||||||||||
|
@@ -690,13 +717,24 @@ def _get_sql_column_definitions( | |||||||||||||||||||||||||||||||||
) -> dict[str, sqlalchemy.types.TypeEngine]: | ||||||||||||||||||||||||||||||||||
"""Return the column definitions for the given stream.""" | ||||||||||||||||||||||||||||||||||
columns: dict[str, sqlalchemy.types.TypeEngine] = {} | ||||||||||||||||||||||||||||||||||
properties = self.catalog_provider.get_stream_properties(stream_name) | ||||||||||||||||||||||||||||||||||
for property_name, json_schema_property_def in properties.items(): | ||||||||||||||||||||||||||||||||||
clean_prop_name = self.normalizer.normalize(property_name) | ||||||||||||||||||||||||||||||||||
columns[clean_prop_name] = self.type_converter.to_sql_type( | ||||||||||||||||||||||||||||||||||
json_schema_property_def, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if self.sql_config.use_single_json_column: | ||||||||||||||||||||||||||||||||||
# In single JSON column mode, only create columns for: | ||||||||||||||||||||||||||||||||||
# 1. Internal Airbyte columns | ||||||||||||||||||||||||||||||||||
# 2. The _airbyte_data column (to store ALL data including PKs as JSON) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Add the single JSON data column | ||||||||||||||||||||||||||||||||||
columns[AB_DATA_COLUMN] = self.type_converter_class.get_json_type() | ||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||
# Standard mode: create a column for each property | ||||||||||||||||||||||||||||||||||
properties = self.catalog_provider.get_stream_properties(stream_name) | ||||||||||||||||||||||||||||||||||
for property_name, json_schema_property_def in properties.items(): | ||||||||||||||||||||||||||||||||||
clean_prop_name = self.normalizer.normalize(property_name) | ||||||||||||||||||||||||||||||||||
columns[clean_prop_name] = self.type_converter.to_sql_type( | ||||||||||||||||||||||||||||||||||
json_schema_property_def, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Always add internal Airbyte columns | ||||||||||||||||||||||||||||||||||
columns[AB_RAW_ID_COLUMN] = self.type_converter_class.get_string_type() | ||||||||||||||||||||||||||||||||||
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP() | ||||||||||||||||||||||||||||||||||
columns[AB_META_COLUMN] = self.type_converter_class.get_json_type() | ||||||||||||||||||||||||||||||||||
|
@@ -861,10 +899,38 @@ def _write_files_to_new_table( | |||||||||||||||||||||||||||||||||
stream_name | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Remove fields that are not in the schema | ||||||||||||||||||||||||||||||||||
for col_name in dataframe.columns: | ||||||||||||||||||||||||||||||||||
if col_name not in sql_column_definitions: | ||||||||||||||||||||||||||||||||||
dataframe = dataframe.drop(columns=col_name) | ||||||||||||||||||||||||||||||||||
if self.sql_config.use_single_json_column: | ||||||||||||||||||||||||||||||||||
# In single JSON column mode, reorganize the dataframe: | ||||||||||||||||||||||||||||||||||
# 1. Keep internal Airbyte columns separate | ||||||||||||||||||||||||||||||||||
# 2. Combine ALL data columns (including PKs) into _airbyte_data JSON column | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
dataframe.columns = Index( | ||||||||||||||||||||||||||||||||||
[self.normalizer.normalize(col) for col in dataframe.columns] | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
internal_cols = {AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN} | ||||||||||||||||||||||||||||||||||
data_columns = [ | ||||||||||||||||||||||||||||||||||
col for col in dataframe.columns if col not in internal_cols | ||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Create the _airbyte_data column by combining all data columns (including PKs) | ||||||||||||||||||||||||||||||||||
if data_columns: | ||||||||||||||||||||||||||||||||||
dataframe[AB_DATA_COLUMN] = ( | ||||||||||||||||||||||||||||||||||
dataframe[data_columns] | ||||||||||||||||||||||||||||||||||
.replace([np.nan], [None], regex=False) | ||||||||||||||||||||||||||||||||||
.apply(lambda row: row.to_dict(), axis=1) | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
# Drop the original data columns | ||||||||||||||||||||||||||||||||||
dataframe = dataframe.drop(columns=data_columns) | ||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||
Comment on lines
+918
to
+925
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Address Mypy failure in single JSON consolidation. Mypy is failing on this block because - if data_columns:
- dataframe[AB_DATA_COLUMN] = (
- dataframe[data_columns]
- .replace([np.nan], [None], regex=False)
- .apply(lambda row: row.to_dict(), axis=1)
- )
+ if data_columns:
+ sanitized = dataframe[data_columns].where(
+ pd.notna(dataframe[data_columns]), None
+ )
+ dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1) 📝 Committable suggestion
Suggested change
🧰 Tools🪛 GitHub Actions: Run Linters[error] 920-920: Mypy type check failed. airbyte/shared/sql_processor.py:920: List item 0 has incompatible type "None"; expected "str | bytes | date | timedelta | datetime64 | <8 more items>" [list-item]. Command: 'poetry run mypy .' 🤖 Prompt for AI Agents
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've noticed in another PR that MyPy is being replaced, wondering if I should fix this anyway |
||||||||||||||||||||||||||||||||||
# Standard mode: just remove fields not in schema and normalize | ||||||||||||||||||||||||||||||||||
for col_name in dataframe.columns: | ||||||||||||||||||||||||||||||||||
if col_name not in sql_column_definitions: | ||||||||||||||||||||||||||||||||||
dataframe = dataframe.drop(columns=col_name) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
dataframe.columns = Index( | ||||||||||||||||||||||||||||||||||
[self.normalizer.normalize(col) for col in dataframe.columns] | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Pandas will auto-create the table if it doesn't exist, which we don't want. | ||||||||||||||||||||||||||||||||||
if not self._table_exists(temp_table_name): | ||||||||||||||||||||||||||||||||||
|
@@ -875,9 +941,6 @@ def _write_files_to_new_table( | |||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Normalize all column names to lower case. | ||||||||||||||||||||||||||||||||||
dataframe.columns = Index([self.normalizer.normalize(col) for col in dataframe.columns]) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
# Write the data to the table. | ||||||||||||||||||||||||||||||||||
dataframe.to_sql( | ||||||||||||||||||||||||||||||||||
temp_table_name, | ||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix trailing whitespace to satisfy Ruff.
The new comment block has trailing spaces, which is tripping Ruff’s W291 check in CI. Could we trim those so the lint job passes, wdyt?
📝 Committable suggestion
🧰 Tools
🪛 GitHub Actions: Run Linters
[error] 122-122: Trailing whitespace detected (W291). This can be fixed automatically with 'ruff --fix' or by removing trailing spaces.
🤖 Prompt for AI Agents