Skip to content

Commit 0375fa8

Browse files
authored
added flag to do consistent read if needed / possible for DB (#1539)
1 parent 7b4cf4e commit 0375fa8

File tree

4 files changed

+7
-3
lines changed

4 files changed

+7
-3
lines changed

src/datachain/data_storage/db_engine.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def execute(
7777
query,
7878
cursor: Any | None = None,
7979
conn: Any | None = None,
80+
consistent_read: bool = False,
8081
) -> Iterator[tuple[Any, ...]]: ...
8182

8283
def get_table(self, name: str) -> "Table":

src/datachain/data_storage/sqlite.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ def execute(
230230
query,
231231
cursor: sqlite3.Cursor | None = None,
232232
conn=None,
233+
consistent_read: bool = False,
233234
) -> sqlite3.Cursor:
234235
if self.is_closed:
235236
# Reconnect in case of being closed previously.

src/datachain/data_storage/warehouse.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,9 @@ def query_count(self, query: sa.Select) -> int:
237237
count_query = sa.select(sa.func.count(1)).select_from(query.subquery())
238238
return next(self.db.execute(count_query))[0]
239239

240-
def table_rows_count(self, table) -> int:
240+
def table_rows_count(self, table, consistent_read: bool = False) -> int:
241241
count_query = sa.select(sa.func.count(1)).select_from(table)
242-
return next(self.db.execute(count_query))[0]
242+
return next(self.db.execute(count_query, consistent_read=consistent_read))[0]
243243

244244
def dataset_select_paginated(
245245
self,
@@ -494,7 +494,7 @@ def dataset_stats(
494494
if size_columns:
495495
expressions = (*expressions, sa.func.sum(sum(size_columns)))
496496
query = sa.select(*expressions)
497-
((nrows, *rest),) = self.db.execute(query)
497+
((nrows, *rest),) = self.db.execute(query, consistent_read=True)
498498
return nrows, rest[0] if rest else 0
499499

500500
@abstractmethod

src/datachain/query/dataset.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,8 @@ def as_iterable(self, **kwargs) -> Iterator[ResultIter]:
15091509
try:
15101510
query = self.apply_steps().select()
15111511
selected_columns = [c.name for c in query.selected_columns]
1512+
if "consistent_read" not in kwargs:
1513+
kwargs["consistent_read"] = True
15121514
yield ResultIter(
15131515
self.catalog.warehouse.dataset_rows_select(query, **kwargs),
15141516
selected_columns,

0 commit comments

Comments
 (0)