diff --git a/airbyte/_processors/sql/duckdb.py b/airbyte/_processors/sql/duckdb.py index cdb649e4..6caff767 100644 --- a/airbyte/_processors/sql/duckdb.py +++ b/airbyte/_processors/sql/duckdb.py @@ -118,6 +118,15 @@ def _write_files_to_new_table( We use DuckDB native SQL functions to efficiently read the files and insert them into the table in a single operation. """ + # Fall back to pandas-based approach for single JSON column mode + # since the DuckDB native approach doesn't support dynamically combining columns + # into JSON. + if self.sql_config.use_single_json_column: + return super()._write_files_to_new_table( + files=files, + stream_name=stream_name, + batch_id=batch_id, + ) temp_table_name = self._create_table_for_loading( stream_name=stream_name, batch_id=batch_id, diff --git a/airbyte/_processors/sql/postgres.py b/airbyte/_processors/sql/postgres.py index 0e9bdb60..d94ee9b0 100644 --- a/airbyte/_processors/sql/postgres.py +++ b/airbyte/_processors/sql/postgres.py @@ -6,11 +6,13 @@ import functools from overrides import overrides +from sqlalchemy.dialects.postgresql import JSONB from airbyte._util.name_normalizers import LowerCaseNormalizer from airbyte._writers.jsonl import JsonlWriter from airbyte.secrets.base import SecretString from airbyte.shared.sql_processor import SqlConfig, SqlProcessorBase +from airbyte.types import SQLTypeConverter class PostgresConfig(SqlConfig): @@ -38,6 +40,15 @@ def get_database_name(self) -> str: return self.database +class PostgresTypeConverter(SQLTypeConverter): + """A Postgres-specific type converter that uses JSONB for JSON data.""" + + @classmethod + def get_json_type(cls) -> JSONB: + """Return JSONB type for Postgres instead of generic JSON.""" + return JSONB() + + class PostgresNormalizer(LowerCaseNormalizer): """A name normalizer for Postgres. @@ -73,3 +84,6 @@ class PostgresSqlProcessor(SqlProcessorBase): normalizer = PostgresNormalizer """A Postgres-specific name normalizer for table and column name normalization.""" + + type_converter_class = PostgresTypeConverter + """A Postgres-specific type converter that uses JSONB for JSON data.""" diff --git a/airbyte/constants.py b/airbyte/constants.py index 02a8fe7e..5d199532 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -32,6 +32,15 @@ `emitted_at` (`_airbyte_extracted_at`) timestamps. """ +AB_DATA_COLUMN = "_airbyte_data" +"""A column that stores all data properties as a single JSON/JSONB object. + +This column is used when `use_single_json_column=True` is set on the cache configuration. +Instead of creating individual columns for each property in the source data, all non-primary-key +data properties are stored together in this single JSON column. Primary keys and internal Airbyte +columns remain as separate columns for efficient querying and merge operations. +""" + AB_INTERNAL_COLUMNS = { AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, diff --git a/airbyte/datasets/_sql.py b/airbyte/datasets/_sql.py index e7a14475..aa6d10a7 100644 --- a/airbyte/datasets/_sql.py +++ b/airbyte/datasets/_sql.py @@ -12,6 +12,7 @@ from airbyte_protocol.models import ConfiguredAirbyteStream from airbyte.constants import ( + AB_DATA_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN, AB_RAW_ID_COLUMN, @@ -143,6 +144,16 @@ def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset: @property def column_names(self) -> list[str]: """Return the list of top-level column names, including internal Airbyte columns.""" + if self._cache.processor.sql_config.use_single_json_column: + # In single JSON column mode, return only _airbyte_data + internal columns + # (PKs are inside _airbyte_data, not separate columns) + return [ + AB_DATA_COLUMN, + AB_RAW_ID_COLUMN, + AB_EXTRACTED_AT_COLUMN, + AB_META_COLUMN, + ] + # Standard mode: all data columns + internal columns return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN] diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index a602078b..ba3fc252 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -11,6 +11,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, cast, final +import numpy as np import pandas as pd import sqlalchemy import ulid @@ -42,6 +43,7 @@ from airbyte._util.hashing import one_way_hash from airbyte._util.name_normalizers import LowerCaseNormalizer from airbyte.constants import ( + AB_DATA_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN, AB_RAW_ID_COLUMN, @@ -92,6 +94,13 @@ class SqlConfig(BaseModel, abc.ABC): table_prefix: str | None = "" """A prefix to add to created table names.""" + use_single_json_column: bool = Field(default=False) + """Store all data properties in a single JSON/JSONB column instead of individual columns. + + When enabled, all data properties are stored in a single column named + `_airbyte_data`. + """ + _engine: Engine | None = None """Cached SQL engine instance.""" @@ -278,6 +287,24 @@ def process_airbyte_messages( context={"write_strategy": write_strategy}, ) + # Validate that single JSON column mode is only used with append strategy + if ( + self.sql_config.use_single_json_column + and write_strategy not in {WriteStrategy.APPEND, WriteStrategy.AUTO} + ): + raise exc.PyAirbyteInputError( + message=( + "Single JSON column mode (`use_single_json_column=True`) is only " + "compatible with `APPEND` write strategy. " + "Other write strategies require primary keys as separate columns " + "for merge/replace operations." + ), + context={ + "write_strategy": write_strategy.value, + "use_single_json_column": True, + }, + ) + stream_record_handlers: dict[str, StreamRecordHandler] = {} # Process messages, writing to batches as we go @@ -690,13 +717,24 @@ def _get_sql_column_definitions( ) -> dict[str, sqlalchemy.types.TypeEngine]: """Return the column definitions for the given stream.""" columns: dict[str, sqlalchemy.types.TypeEngine] = {} - properties = self.catalog_provider.get_stream_properties(stream_name) - for property_name, json_schema_property_def in properties.items(): - clean_prop_name = self.normalizer.normalize(property_name) - columns[clean_prop_name] = self.type_converter.to_sql_type( - json_schema_property_def, - ) + if self.sql_config.use_single_json_column: + # In single JSON column mode, only create columns for: + # 1. Internal Airbyte columns + # 2. The _airbyte_data column (to store ALL data including PKs as JSON) + + # Add the single JSON data column + columns[AB_DATA_COLUMN] = self.type_converter_class.get_json_type() + else: + # Standard mode: create a column for each property + properties = self.catalog_provider.get_stream_properties(stream_name) + for property_name, json_schema_property_def in properties.items(): + clean_prop_name = self.normalizer.normalize(property_name) + columns[clean_prop_name] = self.type_converter.to_sql_type( + json_schema_property_def, + ) + + # Always add internal Airbyte columns columns[AB_RAW_ID_COLUMN] = self.type_converter_class.get_string_type() columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP() columns[AB_META_COLUMN] = self.type_converter_class.get_json_type() @@ -861,10 +899,38 @@ def _write_files_to_new_table( stream_name ) - # Remove fields that are not in the schema - for col_name in dataframe.columns: - if col_name not in sql_column_definitions: - dataframe = dataframe.drop(columns=col_name) + if self.sql_config.use_single_json_column: + # In single JSON column mode, reorganize the dataframe: + # 1. Keep internal Airbyte columns separate + # 2. Combine ALL data columns (including PKs) into _airbyte_data JSON column + + dataframe.columns = Index( + [self.normalizer.normalize(col) for col in dataframe.columns] + ) + + internal_cols = {AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN} + data_columns = [ + col for col in dataframe.columns if col not in internal_cols + ] + + # Create the _airbyte_data column by combining all data columns (including PKs) + if data_columns: + dataframe[AB_DATA_COLUMN] = ( + dataframe[data_columns] + .replace([np.nan], [None], regex=False) + .apply(lambda row: row.to_dict(), axis=1) + ) + # Drop the original data columns + dataframe = dataframe.drop(columns=data_columns) + else: + # Standard mode: just remove fields not in schema and normalize + for col_name in dataframe.columns: + if col_name not in sql_column_definitions: + dataframe = dataframe.drop(columns=col_name) + + dataframe.columns = Index( + [self.normalizer.normalize(col) for col in dataframe.columns] + ) # Pandas will auto-create the table if it doesn't exist, which we don't want. if not self._table_exists(temp_table_name): @@ -875,9 +941,6 @@ def _write_files_to_new_table( }, ) - # Normalize all column names to lower case. - dataframe.columns = Index([self.normalizer.normalize(col) for col in dataframe.columns]) - # Write the data to the table. dataframe.to_sql( temp_table_name, diff --git a/tests/integration_tests/test_all_cache_types.py b/tests/integration_tests/test_all_cache_types.py index 944db3bf..af628718 100644 --- a/tests/integration_tests/test_all_cache_types.py +++ b/tests/integration_tests/test_all_cache_types.py @@ -109,6 +109,7 @@ def source_pokeapi() -> ab.Source: return source +@pytest.mark.requires_creds @pytest.mark.slow @pytest.mark.skipif( "CI" in os.environ, diff --git a/tests/integration_tests/test_single_json_column.py b/tests/integration_tests/test_single_json_column.py new file mode 100644 index 00000000..71e09ad8 --- /dev/null +++ b/tests/integration_tests/test_single_json_column.py @@ -0,0 +1,261 @@ +"""Integration tests for the single JSON column feature with PostgreSQL and DuckDB.""" + +from __future__ import annotations + +import json +import airbyte as ab +import psycopg +import pytest +from airbyte import exceptions as exc +from airbyte.caches.duckdb import DuckDBCache +from airbyte.caches.postgres import PostgresCache +from airbyte.constants import AB_DATA_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN, AB_RAW_ID_COLUMN +from sqlalchemy import text + + +@pytest.fixture(scope="module", autouse=True) +def autouse_source_test_installation(source_test_installation) -> None: + """Install source-test connector for testing.""" + return + + +@pytest.fixture(scope="function", autouse=True) +def autouse_source_test_registry(source_test_registry) -> None: + """Enable the test registry for source-test connector.""" + return + + +def test_postgres_single_json_column_structure( + new_postgres_cache: PostgresCache, +) -> None: + """Test that single JSON column mode creates the correct table structure. + + This test verifies: + 1. Table has only 4 columns: _airbyte_data + 3 internal columns + 2. Primary keys are inside _airbyte_data (not separate columns) + 3. Data can be written and read back correctly + 4. _airbyte_data uses JSONB type (Postgres-specific) + """ + # Enable single JSON column mode + new_postgres_cache.use_single_json_column = True + + # Get source-test connector + source = ab.get_source("source-test", config={"apiKey": "test"}) + source.select_all_streams() + + # Write data to cache using append strategy + # Note: Single JSON column mode only supports APPEND strategy because + # primary keys are inside the JSON column, not as separate columns + result = source.read(new_postgres_cache, write_strategy="append") + + # Verify data was written + assert result.processed_records > 0 + + # Connect directly to Postgres to inspect table structure + pg_url = ( + f"postgresql://{new_postgres_cache.username}:{new_postgres_cache.password}" + f"@{new_postgres_cache.host}:{new_postgres_cache.port}/{new_postgres_cache.database}" + ) + + with psycopg.connect(pg_url) as conn: + with conn.cursor() as cur: + # Get all stream names from the result + for stream_name in result.streams.keys(): + # Use processor's normalizer to get the correct table name + normalized_stream_name = new_postgres_cache.processor.normalizer.normalize(stream_name) + table_name = f"{new_postgres_cache.table_prefix}{normalized_stream_name}" + + # Get column names and types + cur.execute( + """ + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + ORDER BY ordinal_position + """, + (new_postgres_cache.schema_name, table_name), + ) + columns = cur.fetchall() + column_names = [col[0] for col in columns] + column_types = {col[0]: col[1] for col in columns} + + # Verify we have exactly 4 columns + assert len(column_names) == 4, ( + f"Expected 4 columns, got {len(column_names)}: {column_names}" + ) + + # Verify the expected columns exist + expected_columns = { + AB_DATA_COLUMN, + AB_RAW_ID_COLUMN, + AB_EXTRACTED_AT_COLUMN, + AB_META_COLUMN, + } + assert set(column_names) == expected_columns, ( + f"Column mismatch. Expected: {expected_columns}, Got: {set(column_names)}" + ) + + # Verify _airbyte_data is JSONB type + assert column_types[AB_DATA_COLUMN] == "jsonb", ( + f"Expected _airbyte_data to be JSONB, got {column_types[AB_DATA_COLUMN]}" + ) + + # Read some data and verify it's in JSON format + cur.execute( + f""" + SELECT {AB_DATA_COLUMN} + FROM {new_postgres_cache.schema_name}.{table_name} + LIMIT 1 + """ + ) + row = cur.fetchone() + if row: + json_data = row[0] + # Verify it's a dict (psycopg2 automatically parses JSONB) + assert isinstance(json_data, dict), ( + f"Expected _airbyte_data to contain dict, got {type(json_data)}" + ) + # Verify it contains some data + assert len(json_data) > 0, ( + "_airbyte_data should contain data properties" + ) + + # Verify we can read data back through PyAirbyte's API + for stream_name in result.streams.keys(): + dataset = new_postgres_cache[stream_name] + df = dataset.to_pandas() + + # Skip empty streams (e.g., always-empty-stream) + if len(df) == 0: + continue + + # Verify DataFrame has the _airbyte_data column + assert AB_DATA_COLUMN in df.columns, ( + f"Expected {AB_DATA_COLUMN} in DataFrame columns" + ) + + # Verify it contains data + assert len(df) > 0, f"Expected data in stream {stream_name}" + + +def test_duckdb_single_json_column_structure( + new_duckdb_cache: DuckDBCache, +) -> None: + """Test that single JSON column mode creates the correct table structure in DuckDB. + + This test verifies: + 1. Table has only 4 columns: _airbyte_data + 3 internal columns + 2. Primary keys are inside _airbyte_data (not separate columns) + 3. Data can be written and read back correctly + 4. _airbyte_data uses JSON type + """ + new_duckdb_cache.use_single_json_column = True + + source = ab.get_source("source-test", config={"apiKey": "test"}) + source.select_all_streams() + + result = source.read(new_duckdb_cache, write_strategy="append") + + assert result.processed_records > 0 + + # Use the cache's existing SQL connection to inspect table structure + with new_duckdb_cache.processor.get_sql_connection() as conn: + for stream_name in result.streams.keys(): + normalized_stream_name = new_duckdb_cache.processor.normalizer.normalize(stream_name) + table_name = f"{new_duckdb_cache.table_prefix}{normalized_stream_name}" + + columns_query = text( + f""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = '{new_duckdb_cache.schema_name}' + AND table_name = '{table_name}' + ORDER BY ordinal_position + """ + ) + result_proxy = conn.execute(columns_query) + columns = result_proxy.fetchall() + column_names = [col[0] for col in columns] + column_types = {col[0]: col[1] for col in columns} + + assert len(column_names) == 4, ( + f"Expected 4 columns, got {len(column_names)}: {column_names}" + ) + + expected_columns = { + AB_DATA_COLUMN, + AB_RAW_ID_COLUMN, + AB_EXTRACTED_AT_COLUMN, + AB_META_COLUMN, + } + assert set(column_names) == expected_columns, ( + f"Column mismatch. Expected: {expected_columns}, Got: {set(column_names)}" + ) + + assert column_types[AB_DATA_COLUMN] == "JSON", ( + f"Expected _airbyte_data to be JSON, got {column_types[AB_DATA_COLUMN]}" + ) + + data_query = text( + f""" + SELECT {AB_DATA_COLUMN} + FROM {new_duckdb_cache.schema_name}.{table_name} + LIMIT 1 + """ + ) + result_proxy = conn.execute(data_query) + rows = result_proxy.fetchall() + + if rows: + json_data = rows[0][0] + if isinstance(json_data, str): + json_data = json.loads(json_data) + + assert isinstance(json_data, dict), ( + f"Expected _airbyte_data to contain dict, got {type(json_data)}" + ) + assert len(json_data) > 0, "_airbyte_data should contain data properties" + + for stream_name in result.streams.keys(): + dataset = new_duckdb_cache[stream_name] + df = dataset.to_pandas() + + # Skip empty streams + if len(df) == 0: + continue + + assert AB_DATA_COLUMN in df.columns, ( + f"Expected {AB_DATA_COLUMN} in DataFrame columns" + ) + + assert len(df) > 0, f"Expected data in stream {stream_name}" + + +def test_duckdb_single_json_column_rejects_non_append_strategies( + new_duckdb_cache: DuckDBCache, +) -> None: + """Test that single JSON column mode rejects non-append write strategies in DuckDB. + + Single JSON column mode is only compatible with APPEND strategy because + merge/replace strategies require primary keys as separate columns. + """ + # Enable single JSON column mode + new_duckdb_cache.use_single_json_column = True + + # Get source-test connector + source = ab.get_source("source-test", config={"apiKey": "test"}) + source.select_all_streams() + + # Verify that MERGE strategy is rejected + with pytest.raises(exc.PyAirbyteInputError) as excinfo: + source.read(new_duckdb_cache, write_strategy="merge") + + assert "use_single_json_column=True" in str(excinfo.value) + assert "APPEND" in str(excinfo.value) + + # Verify that REPLACE strategy is rejected + with pytest.raises(exc.PyAirbyteInputError) as excinfo: + source.read(new_duckdb_cache, write_strategy="replace", force_full_refresh=True) + + assert "use_single_json_column=True" in str(excinfo.value) + assert "APPEND" in str(excinfo.value)