Skip to content
8 changes: 5 additions & 3 deletions airbyte/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,9 @@ def _create_table_for_loading(
) -> str:
"""Create a new table for loading data."""
temp_table_name = self._get_temp_table_name(stream_name, batch_id)
engine = self.get_sql_engine()
column_definition_str = ",\n ".join(
f"{self._quote_identifier(column_name)} {sql_type}"
f"{self._quote_identifier(column_name)} {sql_type.compile(dialect=engine.dialect)}"
for column_name, sql_type in self._get_sql_column_definitions(stream_name).items()
)
self._create_table(temp_table_name, column_definition_str)
Expand Down Expand Up @@ -623,9 +624,10 @@ def _ensure_final_table_exists(
"""
table_name = self.get_sql_table_name(stream_name)
did_exist = self._table_exists(table_name)
engine = self.get_sql_engine()
if not did_exist and create_if_missing:
column_definition_str = ",\n ".join(
f"{self._quote_identifier(column_name)} {sql_type}"
f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}"
for column_name, sql_type in self._get_sql_column_definitions(
stream_name,
).items()
Expand Down Expand Up @@ -687,7 +689,7 @@ def _get_sql_column_definitions(
)

columns[AB_RAW_ID_COLUMN] = self.type_converter_class.get_string_type()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP(timezone=True)
columns[AB_META_COLUMN] = self.type_converter_class.get_json_type()

return columns
Expand Down
4 changes: 2 additions & 2 deletions airbyte/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"number": sqlalchemy.types.DECIMAL(38, 9),
"boolean": sqlalchemy.types.BOOLEAN,
"date": sqlalchemy.types.DATE,
"timestamp_with_timezone": sqlalchemy.types.TIMESTAMP,
"timestamp_without_timezone": sqlalchemy.types.TIMESTAMP,
"timestamp_with_timezone": sqlalchemy.types.TIMESTAMP(timezone=True),
"timestamp_without_timezone": sqlalchemy.types.TIMESTAMP(timezone=False),
"time_with_timezone": sqlalchemy.types.TIME,
"time_without_timezone": sqlalchemy.types.TIME,
# Technically 'object' and 'array' as JSON Schema types, not airbyte types.
Expand Down
56 changes: 54 additions & 2 deletions tests/unit_tests/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

from pathlib import Path
from typing import Optional
from unittest.mock import patch

import pytest_mock
from airbyte.caches.snowflake import SnowflakeSqlProcessor, SnowflakeConfig
from airbyte_protocol.models import ConfiguredAirbyteCatalog
from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor
from airbyte.caches.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
from airbyte.secrets.base import SecretString
from airbyte.shared.catalog_providers import CatalogProvider
from airbyte.sources.util import get_source
from airbyte_protocol.models import ConfiguredAirbyteCatalog


def test_snowflake_cache_config_data_retention_time_in_days(
Expand Down Expand Up @@ -51,6 +55,54 @@ def _execute_sql(cmd):
assert actual_cmd == expected_cmd


def test_postgres_cache_config_generates_timestamps_with_timezone_settings():
expected_sql = """
CREATE TABLE airbyte_raw."products" (
"id" BIGINT,
"make" VARCHAR,
"model" VARCHAR,
"year" BIGINT,
"price" DECIMAL(38, 9),
"created_at" TIMESTAMP WITH TIME ZONE,
"updated_at" TIMESTAMP WITH TIME ZONE,
"_airbyte_raw_id" VARCHAR,
"_airbyte_extracted_at" TIMESTAMP WITH TIME ZONE,
"_airbyte_meta" JSON
)
\n """
source = get_source(
name="source-faker",
config={},
)

config = PostgresConfig(
host="localhost",
port=5432,
username="postgres",
password="postgres",
database="postgres",
schema_name="airbyte_raw",
)

processor = PostgresSqlProcessor(
catalog_provider=CatalogProvider(source.configured_catalog),
temp_dir=Path(),
temp_file_cleanup=True,
sql_config=config,
)

with (
patch.object(processor, "_execute_sql") as _execute_sql_mock,
patch.object(
processor, "_table_exists", return_value=False
) as _table_exists_mock,
):
processor._ensure_final_table_exists(
stream_name="products",
)
_execute_sql_mock.assert_called_with(expected_sql)


def _build_mocked_snowflake_processor(
mocker: pytest_mock.MockFixture, data_retention_time_in_days: Optional[int] = None
):
Expand Down
10 changes: 7 additions & 3 deletions tests/unit_tests/test_type_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
"format": "date-time",
"airbyte_type": "timestamp_without_timezone",
},
types.TIMESTAMP,
types.TIMESTAMP(timezone=False),
),
(
{
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone",
},
types.TIMESTAMP,
types.TIMESTAMP(timezone=True),
),
(
{
Expand Down Expand Up @@ -68,7 +68,11 @@
def test_to_sql_type(json_schema_property_def, expected_sql_type):
converter = SQLTypeConverter()
sql_type = converter.to_sql_type(json_schema_property_def)
assert isinstance(sql_type, expected_sql_type)
if isinstance(expected_sql_type, types.TIMESTAMP):
assert isinstance(sql_type, types.TIMESTAMP)
assert sql_type.timezone == expected_sql_type.timezone
else:
assert isinstance(sql_type, expected_sql_type)


@pytest.mark.parametrize(
Expand Down
Loading