Skip to content

Commit

Permalink
Concurrent index creation, allow -1 for paginated entries
Browse files Browse the repository at this point in the history
  • Loading branch information
NolanTrem committed Oct 9, 2024
1 parent 0a769bc commit 9345566
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 60 deletions.
1 change: 1 addition & 0 deletions py/compose.full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ services:
- POSTGRES_PORT=${POSTGRES_PORT:-5432}
- POSTGRES_DBNAME=${POSTGRES_DBNAME:-postgres}
- R2R_PROJECT_NAME=${R2R_PROJECT_NAME:-r2r_default}
- POSTGRES_MAX_CONNECTIONS=${POSTGRES_MAX_CONNECTIONS:-1024}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- OPENAI_API_BASE=${OPENAI_API_BASE:-}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
Expand Down
6 changes: 5 additions & 1 deletion py/core/main/api/management_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ async def file_stream():
async def documents_overview_app(
document_ids: list[str] = Query([]),
offset: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
limit: int = Query(
100,
ge=-1,
description="Number of items to return. Use -1 to return all items.",
),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedDocumentOverviewResponse:
request_user_ids = (
Expand Down
4 changes: 2 additions & 2 deletions py/core/main/services/management_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ async def documents_overview(
user_ids: Optional[list[UUID]] = None,
collection_ids: Optional[list[UUID]] = None,
document_ids: Optional[list[UUID]] = None,
offset: Optional[int] = 0,
limit: Optional[int] = 1000,
offset: Optional[int] = None,
limit: Optional[int] = None,
*args: Any,
**kwargs: Any,
):
Expand Down
111 changes: 54 additions & 57 deletions py/core/providers/database/vecs/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,26 @@ def _drop(self):

return self

def _get_index_options(
self,
method: IndexMethod,
index_arguments: Optional[Union[IndexArgsIVFFlat, IndexArgsHNSW]],
) -> str:
if method == IndexMethod.ivfflat:
if isinstance(index_arguments, IndexArgsIVFFlat):
return f"WITH (lists={index_arguments.n_lists})"
else:
# Default value if no arguments provided
return "WITH (lists=100)"
elif method == IndexMethod.hnsw:
if isinstance(index_arguments, IndexArgsHNSW):
return f"WITH (m={index_arguments.m}, ef_construction={index_arguments.ef_construction})"
else:
# Default values if no arguments provided
return "WITH (m=16, ef_construction=64)"
else:
return "" # No options for other methods

def upsert(
self,
records: Iterable[Record],
Expand Down Expand Up @@ -941,6 +961,7 @@ def create_index(
Union[IndexArgsIVFFlat, IndexArgsHNSW]
] = None,
replace=True,
concurrently=True,
) -> None:
"""
Creates an index for the collection.
Expand Down Expand Up @@ -1017,69 +1038,45 @@ def create_index(
if ops is None:
raise ArgError("Unknown index measure")

unique_string = str(uuid4()).replace("-", "_")[0:7]

with self.client.Session() as sess:
with sess.begin():
if self.index is not None:
if replace:
sess.execute(
text(
f'drop index {self.client.project_name}."{self.index}";'
)
)
self._index = None
else:
raise ArgError(
"replace is set to False but an index exists"
)

if method == IndexMethod.ivfflat:
if not index_arguments:
n_records: int = sess.execute(func.count(self.table.c.extraction_id)).scalar() # type: ignore
concurrently_sql = "CONCURRENTLY" if concurrently else ""

n_lists = (
int(max(n_records / 1000, 30))
if n_records < 1_000_000
else int(math.sqrt(n_records))
)
else:
# The following mypy error is ignored because mypy
# complains that `index_arguments` is typed as a union
# of IndexArgsIVFFlat and IndexArgsHNSW types,
# which both don't necessarily contain the `n_lists`
# parameter, however we have validated that the
# correct type is being used above.
n_lists = index_arguments.n_lists # type: ignore

sess.execute(
text(
f"""
create index ix_{ops}_ivfflat_nl{n_lists}_{unique_string}
on {self.client.project_name}."{self.table.name}"
using ivfflat (vec {ops}) with (lists={n_lists})
"""
)
# Drop existing index if needed (must be outside of transaction)
if self.index is not None and replace:
drop_index_sql = f'DROP INDEX {concurrently_sql} IF EXISTS {self.client.project_name}."{self.index}";'
try:
with self.client.engine.connect() as connection:
connection = connection.execution_options(
isolation_level="AUTOCOMMIT"
)
connection.execute(text(drop_index_sql))
except Exception as e:
raise Exception(f"Failed to drop existing index: {e}")
self._index = None

if method == IndexMethod.hnsw:
if not index_arguments:
index_arguments = IndexArgsHNSW()
unique_string = str(uuid4()).replace("-", "_")[0:7]
index_name = f"ix_{ops}_{method}__{unique_string}"

# See above for explanation of why the following lines
# are ignored
m = index_arguments.m # type: ignore
ef_construction = index_arguments.ef_construction # type: ignore
create_index_sql = f"""
CREATE INDEX {concurrently_sql} {index_name}
ON {self.client.project_name}."{self.table.name}"
USING {method} (vec {ops}) {self._get_index_options(method, index_arguments)};
"""

sess.execute(
text(
f"""
create index ix_{ops}_hnsw_m{m}_efc{ef_construction}_{unique_string}
on {self.client.project_name}."{self.table.name}"
using hnsw (vec {ops}) WITH (m={m}, ef_construction={ef_construction});
"""
)
try:
if concurrently:
with self.client.engine.connect() as connection:
connection = connection.execution_options(
isolation_level="AUTOCOMMIT"
)
connection.execute(text(create_index_sql))
else:
with self.client.Session() as sess:
sess.execute(text(create_index_sql))
sess.commit()
except Exception as e:
raise Exception(f"Failed to create index: {e}")

self._index = index_name

return None

Expand Down
1 change: 1 addition & 0 deletions py/core/providers/database/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def create_index(
measure=measure,
index_arguments=index_options,
replace=True,
concurrently=True,
)

def delete(
Expand Down

0 comments on commit 9345566

Please sign in to comment.