Skip to content

Commit 1b6fee9

Browse files
authored
feat: add AsyncBigtableByteStore Class for the async-only key value store implementation. (#169)
* feat: add BigtableEngine Class for managing the client and execution context Added the engine class that manages the client and execution context which handles the async and sync conversion via a background event loop. Added a test file for the engine class. * fix(engine): update BigtableEngine setup by internalizing client creation Removes the option to pre-instantiate and pass a BigtableDataClientAsync to the BigtableEngine. The engine now creates its own client instance, making it more self-contained. * test(engine): update tests to match BigtableEngine class changes Adjusted test cases and assertions to align with the recent modifications in the BigtableEngine class API and behavior. * style(engine): update test_engine.py and engine.py format * test(engine): add tests for async_initialize workflow * feat(engine): add async_initialize factory and shared loop management This change introduces BigtableEngine.async_initialize for non-blocking asynchronous initialization. * style(engine): update test_engine.py * feat: add `AsyncBigtableByteStore` class for async-only implementation LangChain's key-value store. This commit introduces the `AsyncBigtableByteStore` class. This class is designed to handle setting, getting, deleting, and yielding keys asynchronously. It will be used by the main public-facing `BigtableByteStore` for all the data operations on the table. It also includes a test suite. * fix: Ensure the loop is closed in BigtableEngine.shutdown_default_loop The `shutdown_default_loop` method was not closing the asyncio event loop, only that class level variables were reassigned. This commit adds a line that closes the loop when the thread is successfully terminated. * fix: ensure the loop is closed in BigtableEngine.shutdown_default_loop The `shutdown_default_loop` method was not closing the asyncio event loop, only that class level variables were reassigned. This commit adds a line that closes the loop when the thread is successfully terminated. * feat: add BigtableEngine Class for managing the client and execution context (#163) * feat: add BigtableEngine Class for managing the client and execution context Added the engine class that manages the client and execution context which handles the async and sync conversion via a background event loop. Added a test file for the engine class. * fix(engine): update BigtableEngine setup by internalizing client creation Removes the option to pre-instantiate and pass a BigtableDataClientAsync to the BigtableEngine. The engine now creates its own client instance, making it more self-contained. * test(engine): update tests to match BigtableEngine class changes Adjusted test cases and assertions to align with the recent modifications in the BigtableEngine class API and behavior. * style(engine): update test_engine.py and engine.py format * test(engine): add tests for async_initialize workflow * feat(engine): add async_initialize factory and shared loop management This change introduces BigtableEngine.async_initialize for non-blocking asynchronous initialization. * style(engine): update test_engine.py * fix: import MutationsExceptionGroup and FailedMutationEntryError * fix: import MutationsExceptionGroup and FailedMutationEntryError * style: remove type from variable `loop` * fix: add test case for testing mutation errors * fix: add test case for testing mutation errors * fix: add test case for testing mutation errors * fix: address comments left in this PR
1 parent 05932d2 commit 1b6fee9

File tree

4 files changed

+524
-2
lines changed

4 files changed

+524
-2
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
from typing import (
17+
Any,
18+
AsyncIterator,
19+
Dict,
20+
Iterator,
21+
List,
22+
Optional,
23+
Sequence,
24+
Tuple,
25+
Union,
26+
cast,
27+
)
28+
29+
from google.cloud import bigtable
30+
from google.cloud.bigtable.data import (
31+
DeleteAllFromRow,
32+
ReadRowsQuery,
33+
RowMutationEntry,
34+
RowRange,
35+
TableAsync,
36+
)
37+
from langchain_core.stores import BaseStore
38+
39+
40+
class AsyncBigtableByteStore(BaseStore[str, bytes]):
41+
"""
42+
Async-only LangChain ByteStore implementation for Bigtable.
43+
44+
This class provides an asynchronous methods for storing and retrieving
45+
byte values in a Bigtable table. It uses row keys for keys and stores
46+
values in a specified column family and qualifier.
47+
48+
Attributes:
49+
table: The `TableAsync` instance used for Bigtable operations.
50+
value_column_family: The column family where values are stored.
51+
value_column_qualifier: The column qualifier where values are stored.
52+
"""
53+
54+
DEFAULT_COLUMN_FAMILY = "kv"
55+
DEFAULT_COLUMN_QUALIFIER = "val".encode("utf-8")
56+
57+
def __init__(
58+
self,
59+
async_table: TableAsync,
60+
column_family: str = DEFAULT_COLUMN_FAMILY,
61+
column_qualifier: bytes = DEFAULT_COLUMN_QUALIFIER,
62+
):
63+
"""
64+
Initializes a new AsyncBigtableByteStore.
65+
66+
Args:
67+
async_table: The `TableAsync` instance to use for Bigtable operations.
68+
column_family: The column family to store values in.
69+
column_qualifier: The column qualifier to store values in.
70+
"""
71+
self._table = async_table
72+
self._column_family = column_family
73+
self._column_qualifier = column_qualifier
74+
75+
@property
76+
def table(self):
77+
"""Returns the underlying Bigtable table instance."""
78+
return self._table
79+
80+
@property
81+
def value_column_family(self):
82+
"""Returns the column family used to store values."""
83+
return self._column_family
84+
85+
@property
86+
def value_column_qualifier(self):
87+
"""Returns the column qualifier used to store values."""
88+
return self._column_qualifier
89+
90+
async def amget(self, keys: Sequence[str]) -> List[Optional[bytes]]:
91+
"""
92+
Asynchronously retrieves values for a sequence of keys.
93+
94+
It only reads the most recent version for each key.
95+
96+
Args:
97+
keys: A sequence of keys to retrieve values for.
98+
99+
Returns:
100+
A list of byte values corresponding to the input keys. If a key is not
101+
found, `None` is returned for that key's position in the list.
102+
"""
103+
row_keys = [key.encode() for key in keys]
104+
results = {}
105+
row_filter = bigtable.data.row_filters.CellsColumnLimitFilter(1)
106+
107+
query = bigtable.data.ReadRowsQuery(
108+
row_keys=cast(List[Union[str, bytes]], row_keys), row_filter=row_filter
109+
)
110+
111+
# It only reads the most recent version for each row
112+
rows_read = await self.table.read_rows(query)
113+
for row in rows_read:
114+
cell = row.get_cells(
115+
family=self.value_column_family, qualifier=self.value_column_qualifier
116+
)[0]
117+
if cell:
118+
results[row.row_key] = cell.value
119+
120+
res = []
121+
for key in keys:
122+
if key.encode() in results:
123+
res.append(results[key.encode()])
124+
else:
125+
res.append(None)
126+
127+
return res
128+
129+
async def amset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None:
130+
"""
131+
Asynchronously stores key-value pairs in the Bigtable.
132+
133+
Args:
134+
key_value_pairs: A sequence of (key, value) tuples to store.
135+
136+
Raises:
137+
TypeError: If any key is not a string or any value is not bytes.
138+
"""
139+
mutations = []
140+
for key, value in key_value_pairs:
141+
if not isinstance(key, str):
142+
raise TypeError("Keys must be of type 'str'.")
143+
if not isinstance(value, bytes):
144+
raise TypeError("Values must be of type 'bytes'.")
145+
146+
mutation = bigtable.data.SetCell(
147+
family=self.value_column_family,
148+
qualifier=self.value_column_qualifier,
149+
new_value=value,
150+
)
151+
152+
row_mutation = bigtable.data.RowMutationEntry(
153+
row_key=key, mutations=[mutation]
154+
)
155+
mutations.append(row_mutation)
156+
157+
await self.table.bulk_mutate_rows(mutations)
158+
159+
async def amdelete(self, keys: Sequence[str]) -> None:
160+
"""
161+
Asynchronously deletes key-value pairs from the Bigtable.
162+
163+
Args:
164+
keys: A sequence of keys to delete.
165+
"""
166+
mutations = []
167+
for key in keys:
168+
mutation = DeleteAllFromRow()
169+
row_mutation = RowMutationEntry(key, [mutation])
170+
mutations.append(row_mutation)
171+
172+
if mutations:
173+
await self.table.bulk_mutate_rows(mutations)
174+
175+
async def ayield_keys(self, *, prefix: Optional[str] = None) -> AsyncIterator[str]:
176+
"""
177+
Asynchronously yields keys matching a given prefix.
178+
179+
It only yields the row keys that match the given prefix.
180+
181+
Args:
182+
prefix: An optional prefix to filter keys by. If `None` or an empty
183+
string, all keys are yielded.
184+
185+
Yields:
186+
Keys from the table that match a given prefix.
187+
"""
188+
row_filter = bigtable.data.row_filters.StripValueTransformerFilter(True)
189+
if not prefix or prefix == "":
190+
# Return all keys
191+
query = ReadRowsQuery(row_filter=row_filter)
192+
async for row in await self.table.read_rows_stream(query):
193+
yield row.row_key.decode("utf-8")
194+
195+
else:
196+
# Return keys matching the prefix
197+
end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1)
198+
prefix_range = RowRange(start_key=prefix, end_key=end_key)
199+
query = ReadRowsQuery(row_ranges=[prefix_range], row_filter=row_filter)
200+
201+
async for row in await self._table.read_rows_stream(query):
202+
yield row.row_key.decode("utf-8")
203+
204+
# Sync methods are not implemented in the Async-only version
205+
def mget(self, keys: Sequence[str]) -> List[Optional[bytes]]:
206+
raise NotImplementedError("Use amget for async operations.")
207+
208+
def mset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None:
209+
raise NotImplementedError("Use amset for async operations.")
210+
211+
def mdelete(self, keys: Sequence[str]) -> None:
212+
raise NotImplementedError("Use amdelete for async operations.")
213+
214+
def yield_keys(self, *, prefix: Optional[str] = None) -> Iterator[str]:
215+
raise NotImplementedError("Use ayield_keys for async operations.")

src/langchain_google_bigtable/engine.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,16 @@ async def close(self) -> None:
249249

250250
@classmethod
251251
async def shutdown_default_loop(cls) -> None:
252-
"""Stops the default class-level shared loop and thread"""
252+
"""
253+
Closes the default class-level shared loop and terminates the thread associated with it.
254+
255+
Note: Calling this method will prevent any new BigtableEngine instances
256+
from using the shared event loop. Additionally, after this method is called
257+
it will not be possible to run more coroutines in the previous loop.
258+
259+
Raises:
260+
Exception: If the thread does not terminate within the timeout period.
261+
"""
253262
loop = cls._default_loop
254263
thread = cls._default_thread
255264

@@ -261,9 +270,12 @@ async def shutdown_default_loop(cls) -> None:
261270
loop.call_soon_threadsafe(loop.stop)
262271
if thread:
263272
try:
264-
thread.join(timeout=10.0)
273+
thread.join(timeout=20.0)
265274
finally:
266275
if thread.is_alive():
267276
raise Exception(
268277
"Warning: BigtableEngine default thread did not terminate."
269278
)
279+
else:
280+
if loop:
281+
loop.close() # Close the loop for resource cleanup.

0 commit comments

Comments
 (0)