Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airbyte/_processors/sql/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ def _write_files_to_new_table(
We use DuckDB native SQL functions to efficiently read the files and insert
them into the table in a single operation.
"""
# Fall back to pandas-based approach for single JSON column mode
# since the DuckDB native approach doesn't support dynamically combining columns
# into JSON.
if self.sql_config.use_single_json_column:
Comment on lines +122 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix trailing whitespace to satisfy Ruff.

The new comment block has trailing spaces, which is tripping Ruff’s W291 check in CI. Could we trim those so the lint job passes, wdyt?

-        # since the DuckDB native approach doesn't support dynamically combining columns 
+        # since the DuckDB native approach doesn't support dynamically combining columns
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# since the DuckDB native approach doesn't support dynamically combining columns
# into JSON.
if self.sql_config.use_single_json_column:
# since the DuckDB native approach doesn't support dynamically combining columns
# into JSON.
if self.sql_config.use_single_json_column:
🧰 Tools
🪛 GitHub Actions: Run Linters

[error] 122-122: Trailing whitespace detected (W291). This can be fixed automatically with 'ruff --fix' or by removing trailing spaces.

🤖 Prompt for AI Agents
In airbyte/_processors/sql/duckdb.py around lines 122 to 124, the added comment
lines contain trailing whitespace which triggers Ruff W291; remove the trailing
spaces at the ends of those comment lines (and any other trailing whitespace
nearby), save the file, and re-run the linter/CI to verify the warning is
resolved.

return super()._write_files_to_new_table(
files=files,
stream_name=stream_name,
batch_id=batch_id,
)
temp_table_name = self._create_table_for_loading(
stream_name=stream_name,
batch_id=batch_id,
Expand Down
14 changes: 14 additions & 0 deletions airbyte/_processors/sql/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import functools

from overrides import overrides
from sqlalchemy.dialects.postgresql import JSONB

from airbyte._util.name_normalizers import LowerCaseNormalizer
from airbyte._writers.jsonl import JsonlWriter
from airbyte.secrets.base import SecretString
from airbyte.shared.sql_processor import SqlConfig, SqlProcessorBase
from airbyte.types import SQLTypeConverter


class PostgresConfig(SqlConfig):
Expand Down Expand Up @@ -38,6 +40,15 @@ def get_database_name(self) -> str:
return self.database


class PostgresTypeConverter(SQLTypeConverter):
"""A Postgres-specific type converter that uses JSONB for JSON data."""

@classmethod
def get_json_type(cls) -> JSONB:
"""Return JSONB type for Postgres instead of generic JSON."""
return JSONB()


class PostgresNormalizer(LowerCaseNormalizer):
"""A name normalizer for Postgres.

Expand Down Expand Up @@ -73,3 +84,6 @@ class PostgresSqlProcessor(SqlProcessorBase):

normalizer = PostgresNormalizer
"""A Postgres-specific name normalizer for table and column name normalization."""

type_converter_class = PostgresTypeConverter
"""A Postgres-specific type converter that uses JSONB for JSON data."""
9 changes: 9 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
`emitted_at` (`_airbyte_extracted_at`) timestamps.
"""

AB_DATA_COLUMN = "_airbyte_data"
"""A column that stores all data properties as a single JSON/JSONB object.

This column is used when `use_single_json_column=True` is set on the cache configuration.
Instead of creating individual columns for each property in the source data, all non-primary-key
data properties are stored together in this single JSON column. Primary keys and internal Airbyte
columns remain as separate columns for efficient querying and merge operations.
"""

AB_INTERNAL_COLUMNS = {
AB_RAW_ID_COLUMN,
AB_EXTRACTED_AT_COLUMN,
Expand Down
11 changes: 11 additions & 0 deletions airbyte/datasets/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_protocol.models import ConfiguredAirbyteStream

from airbyte.constants import (
AB_DATA_COLUMN,
AB_EXTRACTED_AT_COLUMN,
AB_META_COLUMN,
AB_RAW_ID_COLUMN,
Expand Down Expand Up @@ -143,6 +144,16 @@ def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
@property
def column_names(self) -> list[str]:
"""Return the list of top-level column names, including internal Airbyte columns."""
if self._cache.processor.sql_config.use_single_json_column:
# In single JSON column mode, return only _airbyte_data + internal columns
# (PKs are inside _airbyte_data, not separate columns)
return [
AB_DATA_COLUMN,
AB_RAW_ID_COLUMN,
AB_EXTRACTED_AT_COLUMN,
AB_META_COLUMN,
]
# Standard mode: all data columns + internal columns
return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN]


Expand Down
89 changes: 76 additions & 13 deletions airbyte/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, cast, final

import numpy as np
import pandas as pd
import sqlalchemy
import ulid
Expand Down Expand Up @@ -42,6 +43,7 @@
from airbyte._util.hashing import one_way_hash
from airbyte._util.name_normalizers import LowerCaseNormalizer
from airbyte.constants import (
AB_DATA_COLUMN,
AB_EXTRACTED_AT_COLUMN,
AB_META_COLUMN,
AB_RAW_ID_COLUMN,
Expand Down Expand Up @@ -92,6 +94,13 @@ class SqlConfig(BaseModel, abc.ABC):
table_prefix: str | None = ""
"""A prefix to add to created table names."""

use_single_json_column: bool = Field(default=False)
"""Store all data properties in a single JSON/JSONB column instead of individual columns.

When enabled, all data properties are stored in a single column named
`_airbyte_data`.
"""

_engine: Engine | None = None
"""Cached SQL engine instance."""

Expand Down Expand Up @@ -278,6 +287,24 @@ def process_airbyte_messages(
context={"write_strategy": write_strategy},
)

# Validate that single JSON column mode is only used with append strategy
if (
self.sql_config.use_single_json_column
and write_strategy not in {WriteStrategy.APPEND, WriteStrategy.AUTO}
):
raise exc.PyAirbyteInputError(
message=(
"Single JSON column mode (`use_single_json_column=True`) is only "
"compatible with `APPEND` write strategy. "
"Other write strategies require primary keys as separate columns "
"for merge/replace operations."
),
context={
"write_strategy": write_strategy.value,
"use_single_json_column": True,
},
)

stream_record_handlers: dict[str, StreamRecordHandler] = {}

# Process messages, writing to batches as we go
Expand Down Expand Up @@ -690,13 +717,24 @@ def _get_sql_column_definitions(
) -> dict[str, sqlalchemy.types.TypeEngine]:
"""Return the column definitions for the given stream."""
columns: dict[str, sqlalchemy.types.TypeEngine] = {}
properties = self.catalog_provider.get_stream_properties(stream_name)
for property_name, json_schema_property_def in properties.items():
clean_prop_name = self.normalizer.normalize(property_name)
columns[clean_prop_name] = self.type_converter.to_sql_type(
json_schema_property_def,
)

if self.sql_config.use_single_json_column:
# In single JSON column mode, only create columns for:
# 1. Internal Airbyte columns
# 2. The _airbyte_data column (to store ALL data including PKs as JSON)

# Add the single JSON data column
columns[AB_DATA_COLUMN] = self.type_converter_class.get_json_type()
else:
# Standard mode: create a column for each property
properties = self.catalog_provider.get_stream_properties(stream_name)
for property_name, json_schema_property_def in properties.items():
clean_prop_name = self.normalizer.normalize(property_name)
columns[clean_prop_name] = self.type_converter.to_sql_type(
json_schema_property_def,
)

# Always add internal Airbyte columns
columns[AB_RAW_ID_COLUMN] = self.type_converter_class.get_string_type()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP()
columns[AB_META_COLUMN] = self.type_converter_class.get_json_type()
Expand Down Expand Up @@ -861,10 +899,38 @@ def _write_files_to_new_table(
stream_name
)

# Remove fields that are not in the schema
for col_name in dataframe.columns:
if col_name not in sql_column_definitions:
dataframe = dataframe.drop(columns=col_name)
if self.sql_config.use_single_json_column:
# In single JSON column mode, reorganize the dataframe:
# 1. Keep internal Airbyte columns separate
# 2. Combine ALL data columns (including PKs) into _airbyte_data JSON column

dataframe.columns = Index(
[self.normalizer.normalize(col) for col in dataframe.columns]
)

internal_cols = {AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN}
data_columns = [
col for col in dataframe.columns if col not in internal_cols
]

# Create the _airbyte_data column by combining all data columns (including PKs)
if data_columns:
dataframe[AB_DATA_COLUMN] = (
dataframe[data_columns]
.replace([np.nan], [None], regex=False)
.apply(lambda row: row.to_dict(), axis=1)
)
# Drop the original data columns
dataframe = dataframe.drop(columns=data_columns)
else:
Comment on lines +918 to +925
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Address Mypy failure in single JSON consolidation.

Mypy is failing on this block because .replace([np.nan], [None], regex=False) feeds [None] to pandas, which violates the stubbed type signature (see the CI error at Line 920). Could we switch to a where-based approach to normalize NaNs before building _airbyte_data, wdyt?

-                if data_columns:
-                    dataframe[AB_DATA_COLUMN] = (
-                        dataframe[data_columns]
-                        .replace([np.nan], [None], regex=False)
-                        .apply(lambda row: row.to_dict(), axis=1)
-                    )
+                if data_columns:
+                    sanitized = dataframe[data_columns].where(
+                        pd.notna(dataframe[data_columns]), None
+                    )
+                    dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dataframe[AB_DATA_COLUMN] = (
dataframe[data_columns]
.replace([np.nan], [None], regex=False)
.apply(lambda row: row.to_dict(), axis=1)
)
# Drop the original data columns
dataframe = dataframe.drop(columns=data_columns)
else:
if data_columns:
sanitized = dataframe[data_columns].where(
pd.notna(dataframe[data_columns]), None
)
dataframe[AB_DATA_COLUMN] = sanitized.apply(lambda row: row.to_dict(), axis=1)
# Drop the original data columns
dataframe = dataframe.drop(columns=data_columns)
else:
🧰 Tools
🪛 GitHub Actions: Run Linters

[error] 920-920: Mypy type check failed. airbyte/shared/sql_processor.py:920: List item 0 has incompatible type "None"; expected "str | bytes | date | timedelta | datetime64 | <8 more items>" [list-item]. Command: 'poetry run mypy .'

🤖 Prompt for AI Agents
In airbyte/shared/sql_processor.py around lines 918 to 925, the current use of
.replace([np.nan], [None], regex=False) causes a mypy type error because passing
[None] to replace conflicts with pandas stubs; change the normalization to a
where-based approach: compute a cleaned frame for data_columns that replaces NaN
values with None using a boolean mask (e.g., using isna()/~isna() with
where/mask) and then apply row.to_dict() on that cleaned frame to build
AB_DATA_COLUMN, then drop the original data_columns as before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed in another PR that MyPy is being replaced, wondering if I should fix this anyway

# Standard mode: just remove fields not in schema and normalize
for col_name in dataframe.columns:
if col_name not in sql_column_definitions:
dataframe = dataframe.drop(columns=col_name)

dataframe.columns = Index(
[self.normalizer.normalize(col) for col in dataframe.columns]
)

# Pandas will auto-create the table if it doesn't exist, which we don't want.
if not self._table_exists(temp_table_name):
Expand All @@ -875,9 +941,6 @@ def _write_files_to_new_table(
},
)

# Normalize all column names to lower case.
dataframe.columns = Index([self.normalizer.normalize(col) for col in dataframe.columns])

# Write the data to the table.
dataframe.to_sql(
temp_table_name,
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/test_all_cache_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def source_pokeapi() -> ab.Source:
return source


@pytest.mark.requires_creds
@pytest.mark.slow
@pytest.mark.skipif(
"CI" in os.environ,
Expand Down
Loading
Loading