Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import contextlib
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, ClassVar, Literal, final

Expand All @@ -12,6 +13,7 @@
from pydantic import Field, PrivateAttr
from sqlalchemy import exc as sqlalchemy_exc
from sqlalchemy import text
from typing_extensions import Self

from airbyte_protocol.models import ConfiguredAirbyteCatalog

Expand All @@ -28,6 +30,7 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from types import TracebackType

from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.caches._state_backend_base import StateBackendBase
Expand All @@ -39,7 +42,7 @@
from airbyte.strategies import WriteStrategy


class CacheBase(SqlConfig, AirbyteWriterInterface):
class CacheBase(SqlConfig, AirbyteWriterInterface): # noqa: PLR0904
"""Base configuration for a cache.

Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
Expand Down Expand Up @@ -108,6 +111,47 @@ def __init__(self, **data: Any) -> None: # noqa: ANN401
temp_file_cleanup=self.cleanup,
)

def close(self) -> None:
"""Close all database connections and dispose of connection pools.

This method ensures that all SQLAlchemy engines created by this cache
and its processors are properly disposed, releasing all database connections.
This is especially important for file-based databases like DuckDB, which
lock the database file until all connections are closed.

This method is idempotent and can be called multiple times safely.

Raises:
Exception: If any engine disposal fails, the exception will propagate
to the caller. This ensures callers are aware of cleanup failures.
"""
if hasattr(self, "_read_processor") and self._read_processor is not None:
self._read_processor.sql_config.dispose_engine()

for backend in [self._catalog_backend, self._state_backend]:
if backend is not None and hasattr(backend, "_sql_config"):
backend._sql_config.dispose_engine() # noqa: SLF001

self.dispose_engine()

def __enter__(self) -> Self:
"""Enter context manager."""
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit context manager and clean up resources."""
self.close()

def __del__(self) -> None:
"""Clean up resources when cache is garbage collected."""
with contextlib.suppress(Exception):
self.close()

@property
def config_hash(self) -> str | None:
"""Return a hash of the cache configuration.
Expand Down
31 changes: 21 additions & 10 deletions airbyte/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class SqlConfig(BaseModel, abc.ABC):
table_prefix: str | None = ""
"""A prefix to add to created table names."""

_engine: Engine | None = None
"""Cached SQL engine instance."""

@abc.abstractmethod
def get_sql_alchemy_url(self) -> SecretString:
"""Returns a SQL Alchemy URL."""
Expand Down Expand Up @@ -133,16 +136,24 @@ def get_sql_alchemy_connect_args(self) -> dict[str, Any]:
return {}

def get_sql_engine(self) -> Engine:
"""Return a new SQL engine to use."""
return create_engine(
url=self.get_sql_alchemy_url(),
echo=DEBUG_MODE,
execution_options={
"schema_translate_map": {None: self.schema_name},
},
future=True,
connect_args=self.get_sql_alchemy_connect_args(),
)
"""Return a cached SQL engine, creating it if necessary."""
if self._engine is None:
self._engine = create_engine(
url=self.get_sql_alchemy_url(),
echo=DEBUG_MODE,
execution_options={
"schema_translate_map": {None: self.schema_name},
},
future=True,
connect_args=self.get_sql_alchemy_connect_args(),
)
return self._engine

def dispose_engine(self) -> None:
"""Dispose of the cached SQL engine and release all connections."""
if self._engine is not None:
self._engine.dispose()
self._engine = None

def get_vendor_client(self) -> object:
"""Return the vendor-specific client object.
Expand Down
78 changes: 78 additions & 0 deletions tests/integration_tests/test_duckdb_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,81 @@ def duckdb_cache() -> Generator[DuckDBCache, None, None]:
yield cache
# TODO: Delete cache DB file after test is complete.
return


def test_duckdb_connection_cleanup(tmp_path):
"""Test that DuckDB connections are properly released after cache operations.

This test verifies the fix for issue #807: DuckDB Cache Does Not Release Database Connections.

Reproduction steps:
1. Create a DuckDB cache and write data to it
2. Delete the cache object or call close()
3. Attempt to open a new connection to the same database file
4. Verify the new connection succeeds without errors
"""
import duckdb

db_path = tmp_path / "test_connection_cleanup.duckdb"

cache = DuckDBCache(db_path=db_path)

cache.processor._ensure_schema_exists()

cache.processor._execute_sql(
"CREATE TABLE IF NOT EXISTS test_table (id INTEGER, name VARCHAR)"
)
cache.processor._execute_sql("INSERT INTO test_table VALUES (1, 'test')")

cache.close()

try:
conn = duckdb.connect(str(db_path))
result = conn.execute("SELECT * FROM main.test_table").fetchall()
assert len(result) == 1
assert result[0] == (1, "test")
conn.close()
except Exception as e:
pytest.fail(f"Failed to open new connection after cache cleanup: {e}")


def test_duckdb_context_manager_cleanup(tmp_path):
"""Test that DuckDB connections are cleaned up when using cache as context manager."""
import duckdb

db_path = tmp_path / "test_context_manager.duckdb"

with DuckDBCache(db_path=db_path) as cache:
cache.processor._execute_sql(
"CREATE TABLE IF NOT EXISTS test_table (id INTEGER)"
)
cache.processor._execute_sql("INSERT INTO test_table VALUES (1)")

conn = duckdb.connect(str(db_path))
result = conn.execute("SELECT * FROM main.test_table").fetchall()
assert len(result) == 1
conn.close()


def test_duckdb_del_cleanup(tmp_path):
"""Test that DuckDB connections are cleaned up via __del__ when cache is garbage collected."""
import duckdb
import gc

db_path = tmp_path / "test_del_cleanup.duckdb"

def create_and_use_cache():
cache = DuckDBCache(db_path=db_path)
cache.processor._execute_sql(
"CREATE TABLE IF NOT EXISTS test_table (id INTEGER)"
)
cache.processor._execute_sql("INSERT INTO test_table VALUES (1)")

create_and_use_cache()

gc.collect()

conn = duckdb.connect(str(db_path))
result = conn.execute("SELECT * FROM main.test_table").fetchall()
assert len(result) == 1
conn.close()