-
Notifications
You must be signed in to change notification settings - Fork 67
fix(cache): Properly dispose DuckDB connections to prevent file locking #808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add close() method to CacheBase to dispose all SQLAlchemy engines - Cache SQLAlchemy engine in SqlConfig and add dispose_engine() method - Implement context manager protocol (__enter__/__exit__) for clean resource management - Add __del__ for cleanup when cache is garbage collected - Add comprehensive tests demonstrating connection cleanup works Fixes #807 Co-Authored-By: AJ Steers <[email protected]>
Original prompt from AJ Steers
|
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1759272656-fix-duckdb-connection-leak' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1759272656-fix-duckdb-connection-leak' Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughAdds lifecycle/resource-management to CacheBase (close, context-manager, destructor), caches and exposes disposal for the SqlConfig SQL engine, declares SqlConfig-typed attributes on backend bases, and adds integration tests verifying DuckDB connection cleanup via close, context manager exit, and GC-triggered del. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Test
participant Cache as CacheBase / DuckDBCache
participant SQL as SqlConfig
participant DB as DuckDB Engine
rect rgb(240,248,255)
note over Test,Cache: Usage phase
Test->>Cache: create or use (with ...)
Cache->>SQL: get_sql_engine()
alt engine not cached
SQL->>DB: create_engine(...)
SQL-->>SQL: cache engine
else engine cached
SQL-->>Cache: return cached engine
end
Test->>Cache: perform read/write
end
rect rgb(245,255,240)
note over Cache,DB: Cleanup phase
Test->>Cache: close() / __exit__() / GC -> __del__()
Cache->>SQL: dispose_engine()
SQL->>DB: engine.dispose()
SQL-->>SQL: clear cached engine
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
airbyte/caches/base.py (1)
129-131
: Consider checking for disposal method existence.Currently you're checking if
_read_processor
exists and is not None, but you're not verifying thatsql_config.dispose_engine
exists before calling it. While it should always exist based on the SqlConfig base class, defensive programming might suggest checking. Wdyt?if hasattr(self, "_read_processor") and self._read_processor is not None: with contextlib.suppress(Exception): - self._read_processor.sql_config.dispose_engine() + if hasattr(self._read_processor.sql_config, "dispose_engine"): + self._read_processor.sql_config.dispose_engine()Similar pattern for lines 134-136.
tests/integration_tests/test_duckdb_cache.py (2)
103-103
: Ensure schema exists is redundant but harmless.Line 103 explicitly calls
_ensure_schema_exists()
, but line 105'sCREATE TABLE IF NOT EXISTS
already ensures the schema exists as a side effect of the cache initialization in line 101. The explicit call doesn't hurt, but it's not necessary. Wdyt about removing it for clarity?cache = DuckDBCache(db_path=db_path) - cache.processor._ensure_schema_exists() - cache.processor._execute_sql( "CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name VARCHAR)" )
86-161
: Consider adding a test for concurrent cleanup robustness.All three tests verify cleanup works in happy-path scenarios. Given the PR mentions "potential concurrent-usage implications," consider adding a test that exercises concurrent access or verifies cleanup is safe when engines are in use. Wdyt?
Example test sketch:
def test_duckdb_cleanup_with_active_connection(tmp_path): """Test that cleanup is safe even with active queries.""" import duckdb db_path = tmp_path / "test_concurrent.duckdb" cache = DuckDBCache(db_path=db_path) # Get a connection but don't close it yet conn = cache.get_sql_engine().connect() # Close cache while connection is still active cache.close() # Connection should still work (engine.dispose() doesn't force-close) # but new connections should be rejected or create fresh engines conn.close() # Verify external access works after cleanup ext_conn = duckdb.connect(str(db_path)) ext_conn.close()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/caches/base.py
(3 hunks)airbyte/shared/sql_processor.py
(2 hunks)tests/integration_tests/test_duckdb_cache.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
airbyte/shared/sql_processor.py (5)
airbyte/_processors/sql/snowflake.py (2)
get_sql_alchemy_url
(134-149)get_sql_alchemy_connect_args
(112-116)airbyte/_processors/sql/bigquery.py (1)
get_sql_alchemy_url
(65-81)airbyte/caches/motherduck.py (1)
get_sql_alchemy_url
(49-63)airbyte/_processors/sql/postgres.py (1)
get_sql_alchemy_url
(29-33)airbyte/_processors/sql/duckdb.py (1)
get_sql_alchemy_url
(41-50)
tests/integration_tests/test_duckdb_cache.py (3)
airbyte/caches/duckdb.py (1)
DuckDBCache
(43-54)airbyte/caches/base.py (2)
processor
(193-195)close
(119-139)airbyte/shared/sql_processor.py (2)
_ensure_schema_exists
(519-544)_execute_sql
(818-833)
airbyte/caches/base.py (1)
airbyte/shared/sql_processor.py (2)
sql_config
(385-387)dispose_engine
(152-156)
🪛 GitHub Actions: Run Linters
airbyte/caches/base.py
[error] 12-12: Command 'poetry run mypy .' failed: Module 'typing' has no attribute 'Self'. Use 'from typing_extensions import Self' instead.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (10)
airbyte/caches/base.py (3)
119-140
: LGTM! Comprehensive cleanup with proper safety measures.The
close()
method properly disposes engines from the read processor, both backends, and the cache itself. Usingcontextlib.suppress(Exception)
ensures cleanup continues even if individual disposal operations fail. The idempotent design allows safe repeated calls.
141-153
: LGTM! Context manager protocol correctly implemented.The context manager implementation follows the standard protocol:
__enter__
returns self, and__exit__
delegates cleanup toclose()
. The__exit__
signature correctly accepts exception info but doesn't suppress exceptions (returns None implicitly), which is the expected behavior.
154-158
: LGTM! Destructor safely handles cleanup during garbage collection.The
__del__
method wrapsclose()
incontextlib.suppress(Exception)
to prevent exceptions during interpreter shutdown. This is the correct pattern for cleanup in destructors.airbyte/shared/sql_processor.py (4)
95-96
: LGTM! Private engine cache attribute properly declared.The
_engine
attribute is correctly typed asEngine | None
and initialized to None, following the pattern for lazy initialization.
138-150
: LGTM! Engine caching correctly implemented.The lazy initialization pattern is clean: create and cache the engine on first access, then return the cached instance on subsequent calls. The engine configuration properly includes schema translation, debug echo, and connect args.
152-156
: LGTM! Engine disposal properly releases resources.The
dispose_engine()
method correctly callsengine.dispose()
to release all connections in the pool, then clears the cache to ensure a fresh engine on next access.
138-150
: Consider guardingget_sql_engine
initialization with a lockI didn’t find any direct multi-threaded calls to
SqlConfig
, butSnowflakeConfig
spawns threads viaThreadPoolExecutor
. If a sharedSqlConfig
hitsget_sql_engine
simultaneously, you could end up creating multiple engines. Could you confirm whetherSqlConfig
instances are shared across threads and, if so, wrap_engine
initialization in athreading.Lock
? wdyt?tests/integration_tests/test_duckdb_cache.py (3)
86-120
: LGTM! Explicit cleanup test properly verifies connection release.This test correctly exercises the explicit
close()
path:
- Creates cache and writes data
- Calls
close()
to release connections- Opens new connection to verify data persists and file isn't locked
The schema qualification (
main.test_table
) is correct for DuckDB.
122-138
: LGTM! Context manager test properly verifies exit cleanup.This test correctly exercises the context manager protocol:
- Uses cache within
with
block- Writes data
- Exits context (triggers
__exit__
→close()
)- Verifies external connection succeeds
140-161
: LGTM! Garbage collection test verifies del cleanup.This test correctly exercises the
__del__
path:
- Creates cache in nested function scope (no persistent reference)
- Function exits, making cache eligible for GC
- Forces garbage collection with
gc.collect()
- Verifies external connection succeeds
The nested function scope is a good pattern to ensure the cache reference is truly released.
- Use typing_extensions.Self directly instead of try-except pattern for MyPy compatibility - Add noqa comment for PLR0904 (too many public methods) since adding close() pushed class over limit - Let Ruff auto-fix import ordering Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/caches/base.py
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/caches/base.py (1)
airbyte/shared/sql_processor.py (3)
SqlConfig
(86-167)sql_config
(385-387)dispose_engine
(152-156)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
- Remove contextlib.suppress from close() to let callers know about failures - Keep exception suppression only in __del__ for safe garbage collection - Add docstring explaining exception propagation behavior Addresses code review feedback from coderabbitai[bot] Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte/caches/base.py (1)
131-134
: Guard backend disposal when attributes might be missing.If construction bails out before
_catalog_backend
or_state_backend
are set, an explicitclose()
will raiseAttributeError
, which breaks the “idempotent and safe” contract and leaks the underlying engine until GC kicks in. Could we pivot togetattr(..., None)
(or similar) so we only attemptdispose_engine()
on objects that actually exist, wdyt?- for backend in [self._catalog_backend, self._state_backend]: - if backend is not None and hasattr(backend, "_sql_config"): + for backend in ( + getattr(self, "_catalog_backend", None), + getattr(self, "_state_backend", None), + ): + if backend is not None and hasattr(backend, "_sql_config"):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/caches/base.py
(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/caches/base.py (2)
airbyte/shared/sql_processor.py (3)
SqlConfig
(86-167)sql_config
(385-387)dispose_engine
(152-156)airbyte/_writers/base.py (1)
AirbyteWriterInterface
(20-88)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
- Add _sql_config declaration to CatalogBackendBase and StateBackendBase - Refactor close() to access backends individually for better type inference - This makes the code cleaner and more Pythonic while maintaining identical behavior All tests pass and MyPy/Ruff checks pass. Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte/caches/_state_backend_base.py (1)
28-29
: Scope_sql_config
toSqlStateBackend
?
OnlySqlStateBackend
consumesSqlConfig
, so consider moving its declaration (or type annotation) into that subclass—or make it optional on the base if you expect non-SQL implementations. WDYT?airbyte/caches/_catalog_backend.py (1)
58-59
: Consider moving_sql_config
to the SQL‐specific subclass
The_sql_config: SqlConfig
annotation onCatalogBackendBase
currently only applies toSqlCatalogBackend
, so relocating it avoids implying that future non‐SQL backends must provide this field. WDYT?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/caches/_catalog_backend.py
(1 hunks)airbyte/caches/_state_backend_base.py
(2 hunks)airbyte/caches/base.py
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/caches/base.py
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte/caches/_catalog_backend.py (1)
airbyte/shared/sql_processor.py (1)
SqlConfig
(86-167)
airbyte/caches/_state_backend_base.py (1)
airbyte/shared/sql_processor.py (1)
SqlConfig
(86-167)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
fix(cache): Properly dispose DuckDB connections to prevent file locking
Summary
Resolves issue #807 where PyAirbyte's DuckDB cache does not properly release database connections, preventing external tools from accessing the same database file.
Key Changes:
SqlConfig
to cache SQLAlchemy engines instead of creating new ones each time, and addeddispose_engine()
method for proper cleanupclose()
method toCacheBase
that disposes all SQLAlchemy engines from processors and backends__enter__
/__exit__
) for clean resource management__del__
method for cleanup during garbage collectionThe fix ensures that when a cache object is cleaned up (via
close()
, context manager, or garbage collection), all underlying SQLAlchemy engines are properly disposed, releasing file locks on DuckDB databases.Review & Testing Checklist for Human
uvx harlequin test.duckdb
) can access the database file without errorswith DuckDBCache() as cache:
usage to ensure proper cleanup occursTest Plan Recommendation
Notes
SqlConfig.get_sql_engine()
now returns cached engines instead of creating new ones each time - this is a significant behavioral change that could affect concurrent usage patternscontextlib.suppress(Exception)
to prevent cleanup failures from breaking application flow, but this could potentially hide real issues during developmentSelf
type hint to support Python 3.10Link to Devin run: https://app.devin.ai/sessions/9333803942584401a53d0f92dcbafa45
Requested by: @aaronsteers
Summary by CodeRabbit
New Features
Bug Fixes
Tests