Skip to content

Commit f017f8a

Browse files
Askircevian
andauthored
chore: reduce default batch_size for loading_uri (#651)
* chore: reduce default batch_size for loading_uri * chore: better batch_size docs. --------- Co-authored-by: Matvey Arye <[email protected]>
1 parent 8bc2676 commit f017f8a

File tree

3 files changed

+19
-4
lines changed

3 files changed

+19
-4
lines changed

docs/vectorizer/api-reference.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1284,7 +1284,7 @@ You use `ai.processing_default` to specify the concurrency and batch size for th
12841284

12851285
|Name| Type | Default | Required | Description |
12861286
|-|------|------------------------------|-|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
1287-
|batch_size| int | Determined by the vectorizer |✖| The number of items to process in each batch. The optimal batch size depends on your data and cloud function configuration, larger batch sizes can improve efficiency but may increase memory usage. |
1287+
|batch_size| int | Determined by the vectorizer |✖| The number of items to process in each batch. The optimal batch size depends on your data and cloud function configuration, larger batch sizes can improve efficiency but may increase memory usage. The default is 1 for vectorizers that use document loading (`ai.loading_uri`) and 50 otherwise. |
12881288
|concurrency| int | Determined by the vectorizer |✖| The number of concurrent processing tasks to run. The optimal concurrency depends on your cloud infrastructure and rate limits, higher concurrency can speed up processing but may increase costs and resource usage. |
12891289

12901290
#### Returns

projects/pgai/pgai/vectorizer/processing.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class ProcessingDefault(BaseModel):
2626
"""
2727

2828
implementation: Literal["default"]
29-
batch_size: Annotated[int, Gt(gt=0), Le(le=2048)] = 50
29+
batch_size: int | None = None
3030
concurrency: Annotated[int, Gt(gt=0), Le(le=10)] = 1
3131
log_level: Literal[
3232
"CRITICAL",

projects/pgai/pgai/vectorizer/vectorizer.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,21 @@ async def _do_batch(self, conn: AsyncConnection) -> int:
905905

906906
return len(items)
907907

908+
@cached_property
909+
def _batch_size(self) -> int:
910+
"""Returns the batch size for processing.
911+
Documents take way longer to process than simple text rows,
912+
due to download and parsing overhead.
913+
So when the vectorizer is processing documents
914+
we use a smaller default batch size."""
915+
if self.vectorizer.config.processing.batch_size is not None:
916+
return max(1, min(self.vectorizer.config.processing.batch_size, 2048))
917+
else:
918+
if isinstance(self.vectorizer.config.loading, UriLoading):
919+
return 1
920+
else:
921+
return 50
922+
908923
async def _fetch_work(self, conn: AsyncConnection) -> list[SourceRow]:
909924
"""
910925
Fetches a batch of tasks from the work queue table. Safe for concurrent use.
@@ -924,15 +939,15 @@ async def _fetch_work(self, conn: AsyncConnection) -> list[SourceRow]:
924939
await cursor.execute(
925940
self.queries.fetch_work_query_with_retries,
926941
(
927-
self.vectorizer.config.processing.batch_size,
942+
self._batch_size,
928943
queue_table_oid,
929944
),
930945
)
931946
else:
932947
await cursor.execute(
933948
self.queries.fetch_work_query,
934949
(
935-
self.vectorizer.config.processing.batch_size,
950+
self._batch_size,
936951
queue_table_oid,
937952
),
938953
)

0 commit comments

Comments
 (0)