Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ def _get_sql_column_definitions(
)

columns[AB_RAW_ID_COLUMN] = self.type_converter_class.get_string_type()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP()
columns[AB_EXTRACTED_AT_COLUMN] = sqlalchemy.TIMESTAMP(timezone=True)
columns[AB_META_COLUMN] = self.type_converter_class.get_json_type()

return columns
Expand Down
6 changes: 3 additions & 3 deletions airbyte/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"number": sqlalchemy.types.DECIMAL(38, 9),
"boolean": sqlalchemy.types.BOOLEAN,
"date": sqlalchemy.types.DATE,
"timestamp_with_timezone": sqlalchemy.types.TIMESTAMP,
"timestamp_without_timezone": sqlalchemy.types.TIMESTAMP,
"timestamp_with_timezone": sqlalchemy.types.TIMESTAMP(timezone=True),
"timestamp_without_timezone": sqlalchemy.types.TIMESTAMP(timezone=False),
"time_with_timezone": sqlalchemy.types.TIME,
"time_without_timezone": sqlalchemy.types.TIME,
# Technically 'object' and 'array' as JSON Schema types, not airbyte types.
Expand Down Expand Up @@ -151,7 +151,7 @@ def to_sql_type( # noqa: PLR0911 # Too many return statements
return sqlalchemy.types.DATE()

if json_schema_type == "string" and json_schema_format == "date-time":
return sqlalchemy.types.TIMESTAMP()
return sqlalchemy.types.TIMESTAMP(timezone=True)

if json_schema_type == "array":
return sqlalchemy.types.JSON()
Expand Down
47 changes: 46 additions & 1 deletion tests/integration_tests/test_all_cache_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
import os
import sys
from pathlib import Path
from unittest.mock import patch

import airbyte as ab
import pytest
from airbyte import get_source
from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor
from airbyte._util.venv_util import get_bin_dir
from airbyte.results import ReadResult
from airbyte.shared.catalog_providers import CatalogProvider
from sqlalchemy import text
from viztracer import VizTracer


# Product count is always the same, regardless of faker scale.
NUM_PRODUCTS = 100

Expand Down Expand Up @@ -284,3 +286,46 @@ def test_auto_add_columns(
result = source_faker_seed_a.read(cache=new_generic_cache, write_strategy="auto")

assert "_airbyte_raw_id" in result["users"].to_sql_table().columns


@pytest.mark.slow
def test_cache_columns_for_datetime_types_are_timezone_aware():
"""Ensures sql types are correctly converted to the correct sql timezone aware column types"""
source = get_source(
name="source-faker",
config={},
)

config = DuckDBConfig(
schema_name="airbyte",
db_path=":memory:",
)

processor = DuckDBSqlProcessor(
catalog_provider=CatalogProvider(source.configured_catalog),
temp_dir=Path(),
temp_file_cleanup=True,
sql_config=config,
)

with (
patch.object(processor, "_execute_sql") as _execute_sql_mock,
):
processor._ensure_final_table_exists(
stream_name="products",
)
for call_args in _execute_sql_mock.call_args_list:
sql = call_args[0][0]
if 'CREATE TABLE airbyte."products"' in sql:
assert '"created_at" TIMESTAMP' in sql, (
"created_at should be a TIMESTAMP column"
)
assert '"updated_at" TIMESTAMP' in sql, (
"updated_at should be a TIMESTAMP column"
)
assert '"_airbyte_extracted_at" TIMESTAMP' in sql, (
"_airbyte_extracted_at should be a TIMESTAMP column"
)
break
else:
pytest.fail("Expected CREATE TABLE statement for products not found")
10 changes: 7 additions & 3 deletions tests/unit_tests/test_type_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
"format": "date-time",
"airbyte_type": "timestamp_without_timezone",
},
types.TIMESTAMP,
types.TIMESTAMP(timezone=False),
),
(
{
"type": "string",
"format": "date-time",
"airbyte_type": "timestamp_with_timezone",
},
types.TIMESTAMP,
types.TIMESTAMP(timezone=True),
),
(
{
Expand Down Expand Up @@ -68,7 +68,11 @@
def test_to_sql_type(json_schema_property_def, expected_sql_type):
converter = SQLTypeConverter()
sql_type = converter.to_sql_type(json_schema_property_def)
assert isinstance(sql_type, expected_sql_type)
if isinstance(expected_sql_type, types.TIMESTAMP):
assert isinstance(sql_type, types.TIMESTAMP)
assert sql_type.timezone == expected_sql_type.timezone
else:
assert isinstance(sql_type, expected_sql_type)


@pytest.mark.parametrize(
Expand Down