Skip to content
8 changes: 5 additions & 3 deletions airbyte/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,9 @@ def _create_table_for_loading(
) -> str:
"""Create a new table for loading data."""
temp_table_name = self._get_temp_table_name(stream_name, batch_id)
engine = self.get_sql_engine()
column_definition_str = ",\n ".join(
f"{self._quote_identifier(column_name)} {sql_type}"
f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}"
for column_name, sql_type in self._get_sql_column_definitions(stream_name).items()
)
self._create_table(temp_table_name, column_definition_str)
Expand Down Expand Up @@ -623,9 +624,10 @@ def _ensure_final_table_exists(
"""
table_name = self.get_sql_table_name(stream_name)
did_exist = self._table_exists(table_name)
engine = self.get_sql_engine()
if not did_exist and create_if_missing:
column_definition_str = ",\n ".join(
f"{self._quote_identifier(column_name)} {sql_type}"
f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}"
for column_name, sql_type in self._get_sql_column_definitions(
stream_name,
).items()
Expand Down Expand Up @@ -687,7 +689,7 @@ def _get_sql_column_definitions(
)

columns[AB_RAW_ID_COLUMN] = self.type_converter_class.get_string_type()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP(timezone=True)
columns[AB_META_COLUMN] = self.type_converter_class.get_json_type()

return columns
Expand Down
6 changes: 3 additions & 3 deletions airbyte/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"number": sqlalchemy.types.DECIMAL(38, 9),
"boolean": sqlalchemy.types.BOOLEAN,
"date": sqlalchemy.types.DATE,
"timestamp_with_timezone": sqlalchemy.types.TIMESTAMP,
"timestamp_without_timezone": sqlalchemy.types.TIMESTAMP,
"timestamp_with_timezone": sqlalchemy.types.TIMESTAMP(timezone=True),
"timestamp_without_timezone": sqlalchemy.types.TIMESTAMP(timezone=False),
"time_with_timezone": sqlalchemy.types.TIME,
"time_without_timezone": sqlalchemy.types.TIME,
# Technically 'object' and 'array' as JSON Schema types, not airbyte types.
Expand Down Expand Up @@ -151,7 +151,7 @@ def to_sql_type( # noqa: PLR0911 # Too many return statements
return sqlalchemy.types.DATE()

if json_schema_type == "string" and json_schema_format == "date-time":
return sqlalchemy.types.TIMESTAMP()
return sqlalchemy.types.TIMESTAMP(timezone=True)

if json_schema_type == "array":
return sqlalchemy.types.JSON()
Expand Down
47 changes: 46 additions & 1 deletion tests/integration_tests/test_all_cache_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
import os
import sys
from pathlib import Path
from unittest.mock import patch

import airbyte as ab
import pytest
from airbyte import get_source
from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor
from airbyte._util.venv_util import get_bin_dir
from airbyte.results import ReadResult
from airbyte.shared.catalog_providers import CatalogProvider
from sqlalchemy import text
from viztracer import VizTracer


# Product count is always the same, regardless of faker scale.
NUM_PRODUCTS = 100

Expand Down Expand Up @@ -284,3 +286,46 @@ def test_auto_add_columns(
result = source_faker_seed_a.read(cache=new_generic_cache, write_strategy="auto")

assert "_airbyte_raw_id" in result["users"].to_sql_table().columns


@pytest.mark.slow
def test_cache_columns_for_datetime_types_are_timezone_aware():
"""Ensures sql types are correctly converted to the correct sql timezone aware column types"""
expected_sql = """
CREATE TABLE airbyte."products" (
"id" BIGINT,
"make" VARCHAR,
"model" VARCHAR,
"year" BIGINT,
"price" DECIMAL(38, 9),
"created_at" TIMESTAMP WITH TIME ZONE,
"updated_at" TIMESTAMP WITH TIME ZONE,
"_airbyte_raw_id" VARCHAR,
"_airbyte_extracted_at" TIMESTAMP WITH TIME ZONE,
"_airbyte_meta" JSON
)
\n """
source = get_source(
name="source-faker",
config={},
)

config = DuckDBConfig(
schema_name="airbyte",
db_path=":memory:",
)

processor = DuckDBSqlProcessor(
catalog_provider=CatalogProvider(source.configured_catalog),
temp_dir=Path(),
temp_file_cleanup=True,
sql_config=config,
)

with (
patch.object(processor, "_execute_sql") as _execute_sql_mock,
):
processor._ensure_final_table_exists(
stream_name="products",
)
_execute_sql_mock.assert_called_with(expected_sql)
10 changes: 7 additions & 3 deletions tests/unit_tests/test_type_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
"format": "date-time",
"airbyte_type": "timestamp_without_timezone",
},
types.TIMESTAMP,
types.TIMESTAMP(timezone=False),
),
(
{
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone",
},
types.TIMESTAMP,
types.TIMESTAMP(timezone=True),
),
(
{
Expand Down Expand Up @@ -68,7 +68,11 @@
def test_to_sql_type(json_schema_property_def, expected_sql_type):
converter = SQLTypeConverter()
sql_type = converter.to_sql_type(json_schema_property_def)
assert isinstance(sql_type, expected_sql_type)
if isinstance(expected_sql_type, types.TIMESTAMP):
assert isinstance(sql_type, types.TIMESTAMP)
assert sql_type.timezone == expected_sql_type.timezone
else:
assert isinstance(sql_type, expected_sql_type)


@pytest.mark.parametrize(
Expand Down