Skip to content

Commit b383d90

Browse files
committed
feat(community): expose BigQueryLoggerConfig and add dataset check
- Export `BigQueryLoggerConfig` in `langchain_google_community/callbacks/__init__.py` to allow easier configuration. - Add `_ensure_dataset_exists` check to `BigQueryCallbackHandler` and `AsyncBigQueryCallbackHandler` initialization. - The handlers now raise a `ValueError` immediately if the target BigQuery dataset does not exist, ensuring fail-fast behavior.
1 parent 21fdb20 commit b383d90

4 files changed

Lines changed: 70 additions & 8 deletions

File tree

libs/community/langchain_google_community/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from langchain_google_community.callbacks import (
1919
AsyncBigQueryCallbackHandler,
2020
BigQueryCallbackHandler,
21+
BigQueryLoggerConfig,
2122
)
2223
from langchain_google_community.docai import DocAIParser, DocAIParsingResults
2324
from langchain_google_community.documentai_warehouse import DocumentAIWarehouseRetriever
@@ -81,6 +82,7 @@
8182
"AsyncBigQueryCallbackHandler",
8283
"BigQueryCallbackHandler",
8384
"BigQueryLoader",
85+
"BigQueryLoggerConfig",
8486
"BigQueryVectorStore",
8587
"CalendarCreateEvent",
8688
"CalendarDeleteEvent",

libs/community/langchain_google_community/callbacks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
from langchain_google_community.callbacks.bigquery_callback import (
66
AsyncBigQueryCallbackHandler,
77
BigQueryCallbackHandler,
8+
BigQueryLoggerConfig,
89
)
910

1011
__all__ = [
1112
"AsyncBigQueryCallbackHandler",
1213
"BigQueryCallbackHandler",
14+
"BigQueryLoggerConfig",
1315
]

libs/community/langchain_google_community/callbacks/bigquery_callback.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,20 @@ def _get_bigquery_events_schema(bigquery_module: Any) -> list[Any]:
303303
]
304304

305305

306+
def _ensure_dataset_exists(
307+
client: Any, project_id: str, dataset_id: str, cloud_exceptions: Any
308+
) -> None:
309+
if client is None:
310+
raise ValueError("BigQuery client is not initialized.")
311+
try:
312+
client.get_dataset(dataset_id)
313+
except cloud_exceptions.NotFound:
314+
raise ValueError(
315+
f"Dataset '{dataset_id}' does not exist in project '{project_id}'. "
316+
"Please create it before initializing the callback handler."
317+
)
318+
319+
306320
# ==============================================================================
307321
# CONFIGURATION
308322
# ==============================================================================
@@ -1186,7 +1200,10 @@ def __init__(
11861200
self._is_shutting_down: bool = False
11871201
self._setup_lock: asyncio.Lock = asyncio.Lock()
11881202

1189-
self.client: Any = None
1203+
self.client = self.bigquery.Client(project=self.project_id)
1204+
_ensure_dataset_exists(
1205+
self.client, self.project_id, self.dataset_id, self.cloud_exceptions
1206+
)
11901207
self.write_client: Any = None
11911208
self.async_batch_processor: _AsyncBatchProcessor | None = None
11921209
self._executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=1)
@@ -1201,10 +1218,6 @@ async def _ensure_started(self) -> None:
12011218
return
12021219
loop = asyncio.get_running_loop()
12031220

1204-
self.client = await loop.run_in_executor( # type: ignore[func-returns-value]
1205-
self._executor, lambda: self.bigquery.Client(project=self.project_id)
1206-
)
1207-
12081221
full_table_id = (
12091222
f"{self.project_id}.{self.dataset_id}.{self.config.table_id}"
12101223
)
@@ -1717,7 +1730,10 @@ def __init__(
17171730
self._is_shutting_down: bool = False
17181731
self._setup_lock: threading.Lock = threading.Lock()
17191732

1720-
self.client: Any = None
1733+
self.client = self.bigquery.Client(project=self.project_id)
1734+
_ensure_dataset_exists(
1735+
self.client, self.project_id, self.dataset_id, self.cloud_exceptions
1736+
)
17211737
self.write_client: Any = None
17221738
self.batch_processor: _BatchProcessor | None = None
17231739
self._executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=1)
@@ -1731,8 +1747,6 @@ def _ensure_started(self) -> None:
17311747
if self._started:
17321748
return
17331749

1734-
self.client = self.bigquery.Client(project=self.project_id)
1735-
17361750
full_table_id = (
17371751
f"{self.project_id}.{self.dataset_id}.{self.config.table_id}"
17381752
)

libs/community/tests/unit_tests/callbacks/test_bigquery_callbacks.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,3 +671,47 @@ def test_sync_log_parsing_error(
671671
assert (
672672
f"Failed to parse content: {error_message}" in logged_row["error_message"]
673673
)
674+
675+
676+
def test_sync_init_raises_if_dataset_missing(
677+
mock_bigquery_clients: Dict[str, Any],
678+
) -> None:
679+
"""Test that sync initialization raises ValueError if the dataset does not exist."""
680+
mock_bq_client = mock_bigquery_clients["mock_bq_client"]
681+
mock_cloud_exceptions = mock_bigquery_clients["mock_cloud_exceptions"]
682+
683+
# Simulate dataset not found
684+
mock_bq_client.get_dataset.side_effect = mock_cloud_exceptions.NotFound(
685+
"Dataset not found"
686+
)
687+
688+
with pytest.raises(ValueError, match="Dataset 'test_dataset' does not exist"):
689+
BigQueryCallbackHandler(
690+
project_id="test-project",
691+
dataset_id="test_dataset",
692+
table_id="test_table",
693+
)
694+
695+
mock_bq_client.get_dataset.assert_called_with("test_dataset")
696+
697+
698+
def test_async_init_raises_if_dataset_missing(
699+
mock_bigquery_clients: Dict[str, Any],
700+
) -> None:
701+
"""Test that async initialization raises ValueError if the dataset does not exist."""
702+
mock_bq_client = mock_bigquery_clients["mock_bq_client"]
703+
mock_cloud_exceptions = mock_bigquery_clients["mock_cloud_exceptions"]
704+
705+
# Simulate dataset not found
706+
mock_bq_client.get_dataset.side_effect = mock_cloud_exceptions.NotFound(
707+
"Dataset not found"
708+
)
709+
710+
with pytest.raises(ValueError, match="Dataset 'test_dataset' does not exist"):
711+
AsyncBigQueryCallbackHandler(
712+
project_id="test-project",
713+
dataset_id="test_dataset",
714+
table_id="test_table",
715+
)
716+
717+
mock_bq_client.get_dataset.assert_called_with("test_dataset")

0 commit comments

Comments
 (0)