-
Notifications
You must be signed in to change notification settings - Fork 67
fix(postgres-cache): Store timestamps with timezone #805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(postgres-cache): Store timestamps with timezone #805
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@fix/store-timestamps-with-timezone' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@fix/store-timestamps-with-timezone' Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughAdds dialect-aware SQL type compilation when building CREATE TABLE DDLs, makes timestamp types explicitly timezone-aware or not, and updates unit and integration tests to expect TIMESTAMP(timezone=...) and TIMESTAMP WITH TIME ZONE in emitted DDL. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Caller
participant Processor as SqlProcessor
participant Engine as SQL_Engine
participant DB as Database
Caller->>Processor: ensure_final_table_exists / _create_table_for_loading
Processor->>Engine: get_sql_engine()
Note right of Processor: Resolve SQLAlchemy column types\n(e.g., TIMESTAMP(timezone=True/False))
Processor->>Processor: compile type SQL with\nsql_type.compile(dialect=engine.dialect)
Processor->>DB: CREATE TABLE ... (dialect-compiled types, TZ flags)
DB-->>Processor: OK
Processor-->>Caller: finished
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Would you like to also add a regression test that verifies Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)airbyte/_processors/sql/bigquery.py (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
🔇 Additional comments (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/shared/sql_processor.py (1)
678-696
: Ensure added/missing columns preserve tz rendering._add_column_to_table currently interpolates the TypeEngine directly, which uses the default dialect string and may drop “WITH TIME ZONE” on Postgres. This can silently regress schema consistency when columns are added post‑create. Can we compile with the actual dialect and quote the column name, wdyt?
def _add_column_to_table( self, table: Table, column_name: str, column_type: sqlalchemy.types.TypeEngine, ) -> None: """Add a column to the given table.""" - self._execute_sql( - text( - f"ALTER TABLE {self._fully_qualified(table.name)} " - f"ADD COLUMN {column_name} {column_type}" - ), - ) + engine = self.get_sql_engine() + compiled_type = column_type.compile(dialect=engine.dialect) + self._execute_sql( + text( + f'ALTER TABLE {self._fully_qualified(table.name)} ' + f'ADD COLUMN {self._quote_identifier(column_name)} {compiled_type}' + ) + )This keeps timezone flags intact and avoids unquoted identifiers.
🧹 Nitpick comments (6)
airbyte/types.py (2)
153-155
: Make fallback TIMESTAMP timezone-aware for consistency.If we ever hit fallback, we currently emit a naive TIMESTAMP, diverging from the new default. Shall we default to timezone=True to stay consistent, wdyt?
- if json_schema_type == "string" and json_schema_format == "date-time": - return sqlalchemy.types.TIMESTAMP() + if json_schema_type == "string" and json_schema_format == "date-time": + return sqlalchemy.types.TIMESTAMP(timezone=True)
21-24
: Optional: consider TIME tz parity.Do we also want to distinguish time_with_timezone/time_without_timezone similarly (where supported) to reduce ambiguity, wdyt?
- "time_with_timezone": sqlalchemy.types.TIME, - "time_without_timezone": sqlalchemy.types.TIME, + "time_with_timezone": sqlalchemy.types.TIME(timezone=True), + "time_without_timezone": sqlalchemy.types.TIME(timezone=False),Note: some dialects ignore TIME tz; tests may need guards.
tests/unit_tests/test_type_translation.py (1)
68-76
: Add a fallback-path test?Should we add a case that forces the fallback in to_sql_type (e.g., malformed schema for date-time) to ensure it yields a tz-aware TIMESTAMP after the suggested change, wdyt?
airbyte/shared/sql_processor.py (2)
577-581
: Nice: dialect-aware type compilation for temp tables.This ensures “WITH/WITHOUT TIME ZONE” renders correctly per dialect.
Minor: to avoid pulling in a real DBAPI during tests, would you consider compiling via a dialect instance (e.g., sqlalchemy.dialects.postgresql.dialect()) when the target is known, or at least disposing the engine after use to avoid stray pools, wdyt?
627-631
: Unify compile call signature.Here we use compile(engine.dialect) (positional) vs compile(dialect=engine.dialect) above. For readability and consistency, shall we use the keyword form here too, wdyt?
- f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}" + f"{self._quote_identifier(column_name)} {sql_type.compile(dialect=engine.dialect)}"tests/unit_tests/test_processors.py (1)
58-104
: Stubbing the engine/dialect to avoid DB driver dependency.Because _ensure_final_table_exists now creates an engine to get a dialect, this test may require a Postgres DBAPI. To keep it hermetic, can we stub get_sql_engine to a simple object exposing a Postgres dialect, wdyt?
-from unittest.mock import patch +from unittest.mock import patch +from types import SimpleNamespace +from sqlalchemy.dialects import postgresql @@ - with ( - patch.object(processor, "_execute_sql") as _execute_sql_mock, - patch.object( - processor, "_table_exists", return_value=False - ) as _table_exists_mock, - ): + with ( + patch.object(processor, "_execute_sql") as _execute_sql_mock, + patch.object(processor, "_table_exists", return_value=False) as _table_exists_mock, + patch.object(processor, "get_sql_engine", return_value=SimpleNamespace(dialect=postgresql.dialect())), + ): processor._ensure_final_table_exists( stream_name="products", ) _execute_sql_mock.assert_called_with(expected_sql)Optional: to reduce brittleness, we could assert substrings (e.g., presence of TIMESTAMP WITH TIME ZONE for created_at/updated_at) instead of full‑string equality, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte/shared/sql_processor.py
(3 hunks)airbyte/types.py
(1 hunks)tests/unit_tests/test_processors.py
(2 hunks)tests/unit_tests/test_type_translation.py
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/unit_tests/test_processors.py (3)
airbyte/_processors/sql/postgres.py (2)
PostgresConfig
(16-38)PostgresSqlProcessor
(59-75)airbyte/shared/catalog_providers.py (1)
CatalogProvider
(31-222)airbyte/shared/sql_processor.py (3)
catalog_provider
(215-231)sql_config
(374-376)_ensure_final_table_exists
(615-637)
airbyte/shared/sql_processor.py (3)
airbyte/cloud/sync_results.py (1)
get_sql_engine
(419-421)airbyte/_processors/sql/duckdb.py (1)
get_sql_engine
(78-87)airbyte/results.py (1)
get_sql_engine
(80-82)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
airbyte/types.py (1)
21-22
: Good: explicit TIMESTAMP tz flags in the map.This aligns compile-time rendering with dialects (e.g., Postgres -> WITH/ WITHOUT TIME ZONE). Nice.
Could you confirm no callers still assume naive TIMESTAMPs anywhere (e.g., row adapters or copy paths), wdyt?
tests/unit_tests/test_type_translation.py (1)
71-75
: LGTM: asserting both type and tz flag.The TIMESTAMP checks correctly validate timezone semantics.
@aaronsteers do you have thoughts on this PR before I put some time to fix the tests? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uptickmetachu - This looks great. Thanks for finding the issue and proposing this fix.
My only question/concern is to understand impact regarding backwards compatibility. Can you test and let me know what happens if the final table already has a timestamp column without timezone? Presumably the staging table will still get the new timezone-aware columns, and postgres will attempt to insert those values into the existing columns.
To be clear: I think it's totally reasonable to ask users to recreate the tables to get the fixed behavior - but I want to make sure the scenario doesn't lead to hard failure.
What do you think?
I had a play around and the pyairbyte has no issues writing to an existing cache with different settings. If a column was already defined as timestamps without timezone, the new version would correctly insert timestamps in the same format (timezone non aware way). I'll go ahead and try to fix up the above tests / modify the unit test so that it doesn't download |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
tests/integration_tests/test_all_cache_types.py (3)
291-292
: Should this test be marked as@pytest.mark.slow
?The test uses an in-memory database and mocks the SQL execution, so it doesn't perform any actual I/O. The slow marker is typically reserved for tests that do network calls, file operations, or run connectors. This test should execute very quickly—wdyt about removing the marker?
294-307
: Consider making the SQL assertion more robust.The expected SQL string is hardcoded with specific whitespace and formatting. If the SQL generation logic changes formatting (e.g., indentation, spacing) without changing semantics, this test will fail.
A few options to make this more maintainable:
- Assert on the presence of key patterns (e.g., using regex to check for
TIMESTAMP WITH TIME ZONE
for datetime columns)- Parse and normalize both the expected and actual SQL before comparison
- Use a SQL parser library to compare the AST rather than string matching
The current approach works but might be brittle—wdyt?
325-331
: Consider testing actual behavior instead of just SQL generation.While mocking
_execute_sql
verifies the correct SQL is generated, it doesn't confirm that the timestamp columns actually work correctly when writing data. The PR description mentions testing with PostgresCache and verifying backward compatibility with existing timezone-unaware columns.Would it be valuable to add a complementary test that:
- Actually creates the table (no mocking)
- Writes some data with datetime values
- Reads back the data and verifies timezone information is preserved
This would catch integration issues that pure SQL generation tests might miss—wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/shared/sql_processor.py
(3 hunks)airbyte/types.py
(2 hunks)tests/integration_tests/test_all_cache_types.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte/types.py
- airbyte/shared/sql_processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/integration_tests/test_all_cache_types.py (4)
airbyte/sources/util.py (1)
get_source
(47-132)airbyte/_processors/sql/duckdb.py (2)
DuckDBConfig
(27-87)DuckDBSqlProcessor
(90-184)airbyte/shared/catalog_providers.py (1)
CatalogProvider
(31-222)airbyte/shared/sql_processor.py (3)
catalog_provider
(215-231)sql_config
(374-376)_ensure_final_table_exists
(615-637)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
tests/integration_tests/test_all_cache_types.py (2)
15-15
: LGTM on the new imports!The added imports (
patch
,DuckDBConfig
,DuckDBSqlProcessor
,CatalogProvider
) are all necessary for the new test and properly imported from their respective modules.Also applies to: 20-20, 23-23
308-311
: Could you confirm that theproducts
stream schema definescreated_at
andupdated_at
with the JSON Schema “date-time” format? The previous script failed due to a missing pandas dependency—please verify this in a fully configured environment. Wdyt?
/test-pr
|
@uptickmetachu - Looks great. Thanks again for the contribution. I'm happy to merge as soon as tests pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte/shared/sql_processor.py (1)
638-642
: Consider moving engine fetch inside the conditional block?The engine is currently fetched before checking if table creation is needed (line 639). Since
get_sql_engine()
returns a cached instance, the performance impact is negligible. However, for clarity, you might move line 638 inside theif not did_exist and create_if_missing:
block, so it's only obtained when actually needed. Wdyt?table_name = self.get_sql_table_name(stream_name) did_exist = self._table_exists(table_name) -engine = self.get_sql_engine() if not did_exist and create_if_missing: + engine = self.get_sql_engine() column_definition_str = ",\n ".join( f"{self._quote_identifier(column_name)} {sql_type.compile(engine.dialect)}"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/shared/sql_processor.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/shared/sql_processor.py (4)
airbyte/cloud/sync_results.py (1)
get_sql_engine
(419-421)airbyte/_processors/sql/duckdb.py (1)
get_sql_engine
(78-87)airbyte/results.py (1)
get_sql_engine
(80-82)airbyte/_processors/sql/bigquery.py (1)
_quote_identifier
(150-156)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
airbyte/shared/sql_processor.py (2)
588-591
: Nice improvement for cross-database compatibility!Adding dialect-aware SQL type compilation ensures the generated DDL is correct for each database backend. The engine is cached, so there's minimal performance overhead. This aligns well with the PR objective to store timestamps with timezone information in a database-agnostic way.
703-703
: Core change looks good - have you verified compatibility across all supported databases?Setting
timezone=True
forAB_EXTRACTED_AT_COLUMN
directly addresses the PR objective. The dialect-aware compilation (from the earlier changes) will ensure each database gets the appropriate DDL (e.g.,TIMESTAMP WITH TIME ZONE
for Postgres).Have you verified that all currently supported SQL databases (DuckDB, BigQuery, Postgres, etc.) handle timezone-aware timestamps correctly with this approach? Based on the test changes mentioned in the AI summary, it seems like at least Postgres is covered.
Head branch was pushed to by a user without write access
da96d76
to
f984af1
Compare
/test-pr |
This subsequently means type hints are also correct
f984af1
to
26abdd9
Compare
/test-pr
|
@uptickmetachu - I see you applied a fix and I've kicked off the |
I might leave this one with yourself as it is a bit hairy / involves more refactoring. googleapis/python-bigquery-sqlalchemy#546
There will need to be some dispatch mechanism that has or helper method to allow this one (similar to the bigquery |
@uptickmetachu - Thanks for your hard work on this. Continuing debug effort here: |
Merged and shipped 🎉 Thank you again, @uptickmetachu, for contributing to PyAirbyte! 🙌 |
PyAirbyte documentation recommends using the
cache
instead of the docker based destinations.While testing the PostgresCache, I found that timestamps were created without timezone information.
This PR adds timezone information where possible.
Summary by CodeRabbit
New Features
Bug Fixes
Tests