Skip to content

Conversation

uptickmetachu
Copy link
Contributor

@uptickmetachu uptickmetachu commented Sep 28, 2025

PyAirbyte documentation recommends using the cache instead of the docker based destinations.

While testing the PostgresCache, I found that timestamps were created without timezone information.

This PR adds timezone information where possible.

Summary by CodeRabbit

  • New Features

    • Datetime columns in created tables are timezone-aware by default.
    • SQL type generation is now dialect-aware, producing CREATE TABLE definitions tailored to each database.
  • Bug Fixes

    • Corrected mapping for timestamp types with/without timezone to ensure consistent schema generation and caching columns.
  • Tests

    • Added and updated unit and integration tests validating timezone-aware timestamps and dialect-specific SQL.

Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@fix/store-timestamps-with-timezone' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@fix/store-timestamps-with-timezone'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

Copy link
Contributor

coderabbitai bot commented Sep 28, 2025

📝 Walkthrough

Walkthrough

Adds dialect-aware SQL type compilation when building CREATE TABLE DDLs, makes timestamp types explicitly timezone-aware or not, and updates unit and integration tests to expect TIMESTAMP(timezone=...) and TIMESTAMP WITH TIME ZONE in emitted DDL.

Changes

Cohort / File(s) Summary of Changes
SQL processor: dialect-aware compilation & TZ timestamp
airbyte/shared/sql_processor.py
Obtain an engine (engine = self.get_sql_engine()) and compile SQLAlchemy column types with sql_type.compile(dialect=engine.dialect) when constructing CREATE TABLE DDL; change AB_EXTRACTED_AT_COLUMN to TIMESTAMP(timezone=True).
Type mapping: explicit timezone flags
airbyte/types.py
Update CONVERSION_MAP and to_sql_type so "timestamp_with_timezone" uses TIMESTAMP(timezone=True) and "timestamp_without_timezone" uses TIMESTAMP(timezone=False); treat JSON Schema "date-time" as timezone-aware.
Unit tests: expect timezone flags
tests/unit_tests/test_type_translation.py
Adjust test expectations to assert TIMESTAMP instances and their timezone flag (True/False) instead of a naive TIMESTAMP.
Integration test: DDL emits TIMESTAMP WITH TIME ZONE
tests/integration_tests/test_all_cache_types.py
Add test_cache_columns_for_datetime_types_are_timezone_aware which patches _execute_sql on a DuckDB processor to assert _ensure_final_table_exists emits CREATE TABLE using TIMESTAMP WITH TIME ZONE for datetime-like cache columns.
BigQuery processor: type & SQL formatting tweaks
airbyte/_processors/sql/bigquery.py
Return a concrete sqlalchemy.types.String() in BigQueryTypeConverter.get_string_type; consolidate/format SQL string construction for schema/table operations (no behavioral change).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller as Caller
  participant Processor as SqlProcessor
  participant Engine as SQL_Engine
  participant DB as Database

  Caller->>Processor: ensure_final_table_exists / _create_table_for_loading
  Processor->>Engine: get_sql_engine()
  Note right of Processor: Resolve SQLAlchemy column types\n(e.g., TIMESTAMP(timezone=True/False))
  Processor->>Processor: compile type SQL with\nsql_type.compile(dialect=engine.dialect)
  Processor->>DB: CREATE TABLE ... (dialect-compiled types, TZ flags)
  DB-->>Processor: OK
  Processor-->>Caller: finished
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested reviewers

  • aaronsteers

Would you like to also add a regression test that verifies TIMESTAMP(timezone=False) is emitted for non-timezone timestamps (to complement the new TZ-aware integration test), wdyt?

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly identifies the scope and intent of the changes by indicating a fix in the Postgres cache to store timestamps with timezone, aligning with the PR’s primary objective of making timestamp columns timezone‐aware.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 305446d and 26abdd9.

📒 Files selected for processing (1)
  • airbyte/_processors/sql/bigquery.py (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/_processors/sql/bigquery.py (1)
airbyte/shared/sql_processor.py (1)
  • _fully_qualified (572-577)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/_processors/sql/bigquery.py (4)

106-106: LGTM! Clean removal of redundant cast.

Since sqlalchemy.types.String() already returns a TypeEngine subclass, the cast was unnecessary. This simplification improves readability without changing behavior.


213-213: Formatting consolidation looks good.

The SQL statement is now on a single line with consistent quoting. The behavior remains unchanged, and this improves readability for shorter SQL commands.


297-297: Formatting change approved.

The ALTER TABLE statement is now more compact while maintaining the same behavior. The semicolon placement and SQL logic are preserved.


106-106: Minor observation: Changes in this file don't appear to relate to timestamp timezone handling.

The three changes here (removing a redundant cast on line 106 and consolidating SQL formatting on lines 213 and 297) are all benign code improvements, but they don't seem to contribute to the PR's stated objective of adding timezone information to stored timestamps. Were these included intentionally as cleanup, or perhaps carried over from auto-formatting? Either way, they're fine to keep—just curious if they were meant to be in this PR. wdyt?

Also applies to: 213-213, 297-297


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte/shared/sql_processor.py (1)

678-696: Ensure added/missing columns preserve tz rendering.

_add_column_to_table currently interpolates the TypeEngine directly, which uses the default dialect string and may drop “WITH TIME ZONE” on Postgres. This can silently regress schema consistency when columns are added post‑create. Can we compile with the actual dialect and quote the column name, wdyt?

     def _add_column_to_table(
         self,
         table: Table,
         column_name: str,
         column_type: sqlalchemy.types.TypeEngine,
     ) -> None:
         """Add a column to the given table."""
-        self._execute_sql(
-            text(
-                f"ALTER TABLE {self._fully_qualified(table.name)} "
-                f"ADD COLUMN {column_name} {column_type}"
-            ),
-        )
+        engine = self.get_sql_engine()
+        compiled_type = column_type.compile(dialect=engine.dialect)
+        self._execute_sql(
+            text(
+                f'ALTER TABLE {self._fully_qualified(table.name)} '
+                f'ADD COLUMN {self._quote_identifier(column_name)} {compiled_type}'
+            )
+        )

This keeps timezone flags intact and avoids unquoted identifiers.

🧹 Nitpick comments (6)
airbyte/types.py (2)

153-155: Make fallback TIMESTAMP timezone-aware for consistency.

If we ever hit fallback, we currently emit a naive TIMESTAMP, diverging from the new default. Shall we default to timezone=True to stay consistent, wdyt?

-        if json_schema_type == "string" and json_schema_format == "date-time":
-            return sqlalchemy.types.TIMESTAMP()
+        if json_schema_type == "string" and json_schema_format == "date-time":
+            return sqlalchemy.types.TIMESTAMP(timezone=True)

21-24: Optional: consider TIME tz parity.

Do we also want to distinguish time_with_timezone/time_without_timezone similarly (where supported) to reduce ambiguity, wdyt?

-    "time_with_timezone": sqlalchemy.types.TIME,
-    "time_without_timezone": sqlalchemy.types.TIME,
+    "time_with_timezone": sqlalchemy.types.TIME(timezone=True),
+    "time_without_timezone": sqlalchemy.types.TIME(timezone=False),

Note: some dialects ignore TIME tz; tests may need guards.

tests/unit_tests/test_type_translation.py (1)

68-76: Add a fallback-path test?

Should we add a case that forces the fallback in to_sql_type (e.g., malformed schema for date-time) to ensure it yields a tz-aware TIMESTAMP after the suggested change, wdyt?

airbyte/shared/sql_processor.py (2)

577-581: Nice: dialect-aware type compilation for temp tables.

This ensures “WITH/WITHOUT TIME ZONE” renders correctly per dialect.

Minor: to avoid pulling in a real DBAPI during tests, would you consider compiling via a dialect instance (e.g., sqlalchemy.dialects.postgresql.dialect()) when the target is known, or at least disposing the engine after use to avoid stray pools, wdyt?


627-631: Unify compile call signature.

Here we use compile(engine.dialect) (positional) vs compile(dialect=engine.dialect) above. For readability and consistency, shall we use the keyword form here too, wdyt?

-                f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}"
+                f"{self._quote_identifier(column_name)} {sql_type.compile(dialect=engine.dialect)}"
tests/unit_tests/test_processors.py (1)

58-104: Stubbing the engine/dialect to avoid DB driver dependency.

Because _ensure_final_table_exists now creates an engine to get a dialect, this test may require a Postgres DBAPI. To keep it hermetic, can we stub get_sql_engine to a simple object exposing a Postgres dialect, wdyt?

-from unittest.mock import patch
+from unittest.mock import patch
+from types import SimpleNamespace
+from sqlalchemy.dialects import postgresql

@@
-    with (
-        patch.object(processor, "_execute_sql") as _execute_sql_mock,
-        patch.object(
-            processor, "_table_exists", return_value=False
-        ) as _table_exists_mock,
-    ):
+    with (
+        patch.object(processor, "_execute_sql") as _execute_sql_mock,
+        patch.object(processor, "_table_exists", return_value=False) as _table_exists_mock,
+        patch.object(processor, "get_sql_engine", return_value=SimpleNamespace(dialect=postgresql.dialect())),
+    ):
         processor._ensure_final_table_exists(
             stream_name="products",
         )
         _execute_sql_mock.assert_called_with(expected_sql)

Optional: to reduce brittleness, we could assert substrings (e.g., presence of TIMESTAMP WITH TIME ZONE for created_at/updated_at) instead of full‑string equality, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20bb0a8 and b9680d2.

📒 Files selected for processing (4)
  • airbyte/shared/sql_processor.py (3 hunks)
  • airbyte/types.py (1 hunks)
  • tests/unit_tests/test_processors.py (2 hunks)
  • tests/unit_tests/test_type_translation.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/unit_tests/test_processors.py (3)
airbyte/_processors/sql/postgres.py (2)
  • PostgresConfig (16-38)
  • PostgresSqlProcessor (59-75)
airbyte/shared/catalog_providers.py (1)
  • CatalogProvider (31-222)
airbyte/shared/sql_processor.py (3)
  • catalog_provider (215-231)
  • sql_config (374-376)
  • _ensure_final_table_exists (615-637)
airbyte/shared/sql_processor.py (3)
airbyte/cloud/sync_results.py (1)
  • get_sql_engine (419-421)
airbyte/_processors/sql/duckdb.py (1)
  • get_sql_engine (78-87)
airbyte/results.py (1)
  • get_sql_engine (80-82)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
airbyte/types.py (1)

21-22: Good: explicit TIMESTAMP tz flags in the map.

This aligns compile-time rendering with dialects (e.g., Postgres -> WITH/ WITHOUT TIME ZONE). Nice.

Could you confirm no callers still assume naive TIMESTAMPs anywhere (e.g., row adapters or copy paths), wdyt?

tests/unit_tests/test_type_translation.py (1)

71-75: LGTM: asserting both type and tz flag.

The TIMESTAMP checks correctly validate timezone semantics.

@uptickmetachu
Copy link
Contributor Author

@aaronsteers do you have thoughts on this PR before I put some time to fix the tests?

Copy link
Contributor

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uptickmetachu - This looks great. Thanks for finding the issue and proposing this fix.

My only question/concern is to understand impact regarding backwards compatibility. Can you test and let me know what happens if the final table already has a timestamp column without timezone? Presumably the staging table will still get the new timezone-aware columns, and postgres will attempt to insert those values into the existing columns.

To be clear: I think it's totally reasonable to ask users to recreate the tables to get the fixed behavior - but I want to make sure the scenario doesn't lead to hard failure.

What do you think?

@uptickmetachu
Copy link
Contributor Author

I had a play around and the pyairbyte has no issues writing to an existing cache with different settings.

If a column was already defined as timestamps without timezone, the new version would correctly insert timestamps in the same format (timezone non aware way).

I'll go ahead and try to fix up the above tests / modify the unit test so that it doesn't download source-faker.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
tests/integration_tests/test_all_cache_types.py (3)

291-292: Should this test be marked as @pytest.mark.slow?

The test uses an in-memory database and mocks the SQL execution, so it doesn't perform any actual I/O. The slow marker is typically reserved for tests that do network calls, file operations, or run connectors. This test should execute very quickly—wdyt about removing the marker?


294-307: Consider making the SQL assertion more robust.

The expected SQL string is hardcoded with specific whitespace and formatting. If the SQL generation logic changes formatting (e.g., indentation, spacing) without changing semantics, this test will fail.

A few options to make this more maintainable:

  1. Assert on the presence of key patterns (e.g., using regex to check for TIMESTAMP WITH TIME ZONE for datetime columns)
  2. Parse and normalize both the expected and actual SQL before comparison
  3. Use a SQL parser library to compare the AST rather than string matching

The current approach works but might be brittle—wdyt?


325-331: Consider testing actual behavior instead of just SQL generation.

While mocking _execute_sql verifies the correct SQL is generated, it doesn't confirm that the timestamp columns actually work correctly when writing data. The PR description mentions testing with PostgresCache and verifying backward compatibility with existing timezone-unaware columns.

Would it be valuable to add a complementary test that:

  1. Actually creates the table (no mocking)
  2. Writes some data with datetime values
  3. Reads back the data and verifies timezone information is preserved

This would catch integration issues that pure SQL generation tests might miss—wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b9680d2 and 9526a9d.

📒 Files selected for processing (3)
  • airbyte/shared/sql_processor.py (3 hunks)
  • airbyte/types.py (2 hunks)
  • tests/integration_tests/test_all_cache_types.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte/types.py
  • airbyte/shared/sql_processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/integration_tests/test_all_cache_types.py (4)
airbyte/sources/util.py (1)
  • get_source (47-132)
airbyte/_processors/sql/duckdb.py (2)
  • DuckDBConfig (27-87)
  • DuckDBSqlProcessor (90-184)
airbyte/shared/catalog_providers.py (1)
  • CatalogProvider (31-222)
airbyte/shared/sql_processor.py (3)
  • catalog_provider (215-231)
  • sql_config (374-376)
  • _ensure_final_table_exists (615-637)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
tests/integration_tests/test_all_cache_types.py (2)

15-15: LGTM on the new imports!

The added imports (patch, DuckDBConfig, DuckDBSqlProcessor, CatalogProvider) are all necessary for the new test and properly imported from their respective modules.

Also applies to: 20-20, 23-23


308-311: Could you confirm that the products stream schema defines created_at and updated_at with the JSON Schema “date-time” format? The previous script failed due to a missing pandas dependency—please verify this in a fully configured environment. Wdyt?

@aaronsteers
Copy link
Contributor

aaronsteers commented Oct 1, 2025

/test-pr

PR test job started... Check job output.

❌ Tests failed.

@aaronsteers aaronsteers changed the title fix(cache): Store timestamps with timezone fix(postgres-cache): Store timestamps with timezone Oct 1, 2025
@aaronsteers aaronsteers enabled auto-merge (squash) October 1, 2025 01:27
@aaronsteers
Copy link
Contributor

@uptickmetachu - Looks great. Thanks again for the contribution. I'm happy to merge as soon as tests pass.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte/shared/sql_processor.py (1)

638-642: Consider moving engine fetch inside the conditional block?

The engine is currently fetched before checking if table creation is needed (line 639). Since get_sql_engine() returns a cached instance, the performance impact is negligible. However, for clarity, you might move line 638 inside the if not did_exist and create_if_missing: block, so it's only obtained when actually needed. Wdyt?

 table_name = self.get_sql_table_name(stream_name)
 did_exist = self._table_exists(table_name)
-engine = self.get_sql_engine()
 if not did_exist and create_if_missing:
+    engine = self.get_sql_engine()
     column_definition_str = ",\n  ".join(
         f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}"
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9526a9d and 305446d.

📒 Files selected for processing (1)
  • airbyte/shared/sql_processor.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/shared/sql_processor.py (4)
airbyte/cloud/sync_results.py (1)
  • get_sql_engine (419-421)
airbyte/_processors/sql/duckdb.py (1)
  • get_sql_engine (78-87)
airbyte/results.py (1)
  • get_sql_engine (80-82)
airbyte/_processors/sql/bigquery.py (1)
  • _quote_identifier (150-156)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
airbyte/shared/sql_processor.py (2)

588-591: Nice improvement for cross-database compatibility!

Adding dialect-aware SQL type compilation ensures the generated DDL is correct for each database backend. The engine is cached, so there's minimal performance overhead. This aligns well with the PR objective to store timestamps with timezone information in a database-agnostic way.


703-703: Core change looks good - have you verified compatibility across all supported databases?

Setting timezone=True for AB_EXTRACTED_AT_COLUMN directly addresses the PR objective. The dialect-aware compilation (from the earlier changes) will ensure each database gets the appropriate DDL (e.g., TIMESTAMP WITH TIME ZONE for Postgres).

Have you verified that all currently supported SQL databases (DuckDB, BigQuery, Postgres, etc.) handle timezone-aware timestamps correctly with this approach? Based on the test changes mentioned in the AI summary, it seems like at least Postgres is covered.

auto-merge was automatically disabled October 1, 2025 06:17

Head branch was pushed to by a user without write access

@uptickmetachu uptickmetachu force-pushed the fix/store-timestamps-with-timezone branch from da96d76 to f984af1 Compare October 1, 2025 06:17
@uptickmetachu
Copy link
Contributor Author

/test-pr

This subsequently means type hints are also correct
@uptickmetachu uptickmetachu force-pushed the fix/store-timestamps-with-timezone branch from f984af1 to 26abdd9 Compare October 1, 2025 06:20
@aaronsteers
Copy link
Contributor

aaronsteers commented Oct 1, 2025

/test-pr

PR test job started... Check job output.

❌ Tests failed.

@aaronsteers
Copy link
Contributor

@uptickmetachu - I see you applied a fix and I've kicked off the /test-pr slash command. Because the /test-pr slash commands runs under our creds, it has to be run by a maintainer. Tag me in a comment if you need me to rerun for you.

@uptickmetachu
Copy link
Contributor Author

uptickmetachu commented Oct 2, 2025

@aaronsteers

I might leave this one with yourself as it is a bit hairy / involves more refactoring.

googleapis/python-bigquery-sqlalchemy#546

sql_type.compile(engine) does not work with the BigQuery engine as it does not yet support JSON types.

There will need to be some dispatch mechanism that has or helper method to allow this one (similar to the bigquery String()) that preivously existed...

@aaronsteers
Copy link
Contributor

@uptickmetachu - Thanks for your hard work on this. Continuing debug effort here:

@aaronsteers
Copy link
Contributor

Merged and shipped 🎉

Thank you again, @uptickmetachu, for contributing to PyAirbyte! 🙌

@aaronsteers aaronsteers closed this Oct 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants