Skip to content

Commit 5dad883

Browse files
authored
Merge pull request #459 from SciPhi-AI/Nolan/ReaddBlast
Reapply changes from merge conflict
2 parents 2a387b9 + ab271fe commit 5dad883

File tree

1 file changed

+137
-27
lines changed

1 file changed

+137
-27
lines changed

r2r/main/r2r_app.py

Lines changed: 137 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
to_async_generator,
2929
)
3030
from r2r.pipes import R2REvalPipe
31+
from r2r.telemetry.telemetry_decorator import telemetry_event
3132

3233
from .r2r_abstractions import R2RPipelines, R2RProviders
3334
from .r2r_config import R2RConfig
@@ -258,6 +259,7 @@ class UpdatePromptRequest(BaseModel):
258259
template: Optional[str] = None
259260
input_types: Optional[dict[str, str]] = None
260261

262+
@telemetry_event("UpdatePrompt")
261263
async def update_prompt_app(self, request: UpdatePromptRequest):
262264
"""Update a prompt's template and/or input types."""
263265
try:
@@ -289,7 +291,27 @@ async def aingest_documents(
289291
)
290292

291293
document_infos = []
294+
skipped_documents = []
295+
processed_documents = []
296+
existing_document_ids = [
297+
str(doc_info.document_id)
298+
for doc_info in self.providers.vector_db.get_documents_info()
299+
]
300+
292301
for iteration, document in enumerate(documents):
302+
if (
303+
version is not None
304+
and str(document.id) in existing_document_ids
305+
):
306+
logger.error(f"Document with ID {document.id} already exists.")
307+
if len(documents) == 1:
308+
raise HTTPException(
309+
status_code=409,
310+
detail=f"Document with ID {document.id} already exists.",
311+
)
312+
skipped_documents.append(document.title or str(document.id))
313+
continue
314+
293315
document_metadata = (
294316
metadatas[iteration] if metadatas else document.metadata
295317
)
@@ -319,24 +341,62 @@ async def aingest_documents(
319341
)
320342
)
321343

344+
processed_documents.append(document.title or str(document.id))
345+
346+
if skipped_documents and len(skipped_documents) == len(documents):
347+
logger.error("All provided documents already exist.")
348+
raise HTTPException(
349+
status_code=409,
350+
detail="All provided documents already exist. Use the update endpoint to update these documents.",
351+
)
352+
353+
if skipped_documents:
354+
logger.warning(
355+
f"Skipped ingestion for the following documents since they already exist: {', '.join(skipped_documents)}. Use the update endpoint to update these documents."
356+
)
357+
322358
await self.ingestion_pipeline.run(
323-
input=to_async_generator(documents),
324-
versions=versions,
359+
input=to_async_generator(
360+
[
361+
doc
362+
for doc in documents
363+
if str(doc.id) not in existing_document_ids
364+
]
365+
),
366+
versions=[
367+
info.version
368+
for info in document_infos
369+
if info.created_at == info.updated_at
370+
],
325371
run_manager=self.run_manager,
326372
)
327373

328374
self.providers.vector_db.upsert_documents_info(document_infos)
329-
return {"results": "Entries upserted successfully."}
375+
return {
376+
"processed_documents": [
377+
f"Document '{title}' processed successfully."
378+
for title in processed_documents
379+
],
380+
"skipped_documents": [
381+
f"Document '{title}' skipped since it already exists."
382+
for title in skipped_documents
383+
],
384+
}
330385

331386
class IngestDocumentsRequest(BaseModel):
332387
documents: list[Document]
333388

389+
@telemetry_event("IngestDocuments")
334390
async def ingest_documents_app(self, request: IngestDocumentsRequest):
335391
async with manage_run(
336392
self.run_manager, "ingest_documents_app"
337393
) as run_id:
338394
try:
339395
return await self.aingest_documents(request.documents)
396+
397+
except HTTPException as he:
398+
raise he
399+
340400
except Exception as e:
341401
await self.logging_connection.log(
342402
log_id=run_id,
@@ -423,6 +483,7 @@ async def aupdate_documents(
423483
class UpdateDocumentsRequest(BaseModel):
424484
documents: list[Document]
425485

486+
@telemetry_event("UpdateDocuments")
426487
async def update_documents_app(self, request: UpdateDocumentsRequest):
427488
async with manage_run(
428489
self.run_manager, "update_documents_app"
@@ -445,10 +506,7 @@ async def update_documents_app(self, request: UpdateDocumentsRequest):
445506
logger.error(
446507
f"update_documents_app(documents={request.documents}) - \n\n{str(e)})"
447508
)
448-
logger.error(
449-
f"update_documents_app(documents={request.documents}) - \n\n{str(e)})"
450-
)
451-
raise HTTPException(status_code=500, detail=str(e))
509+
raise HTTPException(status_code=500, detail=str(e)) from e
452510

453511
@syncable
454512
async def aingest_files(
@@ -482,6 +540,12 @@ async def aingest_files(
482540
try:
483541
documents = []
484542
document_infos = []
543+
skipped_documents = []
544+
processed_documents = []
545+
existing_document_ids = [
546+
str(doc_info.document_id)
547+
for doc_info in self.providers.vector_db.get_documents_info()
548+
]
485549

486550
for iteration, file in enumerate(files):
487551
logger.info(f"Processing file: {file.filename}")
@@ -522,14 +586,27 @@ async def aingest_files(
522586
detail=f"{file_extension} is explicitly excluded in the configuration file.",
523587
)
524588

525-
file_content = await file.read()
526-
logger.info(f"File read successfully: {file.filename}")
527-
528589
document_id = (
529590
generate_id_from_label(file.filename)
530591
if document_ids is None
531592
else document_ids[iteration]
532593
)
594+
if (
595+
version is not None
596+
and str(document_id) in existing_document_ids
597+
):
598+
logger.error(f"File with ID {document_id} already exists.")
599+
if len(files) == 1:
600+
raise HTTPException(
601+
status_code=409,
602+
detail=f"File with ID {document_id} already exists.",
603+
)
604+
skipped_documents.append(file.filename)
605+
continue
606+
607+
file_content = await file.read()
608+
logger.info(f"File read successfully: {file.filename}")
609+
533610
document_metadata = metadatas[iteration] if metadatas else {}
534611
document_title = (
535612
document_metadata.get("title", None) or file.filename
@@ -567,7 +644,21 @@ async def aingest_files(
567644
)
568645
)
569646

570-
# Run the pipeline asynchronously with filtered documents
647+
processed_documents.append(file.filename)
648+
649+
if skipped_documents and len(skipped_documents) == len(files):
650+
logger.error("All uploaded documents already exist.")
651+
raise HTTPException(
652+
status_code=409,
653+
detail="All uploaded documents already exist. Use the update endpoint to update these documents.",
654+
)
655+
656+
if skipped_documents:
657+
logger.warning(
658+
f"Skipped ingestion for the following documents since they already exist: {', '.join(skipped_documents)}. Use the update endpoint to update these documents."
659+
)
660+
661+
# Run the pipeline asynchronously
571662
await self.ingestion_pipeline.run(
572663
input=to_async_generator(documents),
573664
versions=versions,
@@ -578,8 +669,14 @@ async def aingest_files(
578669
self.providers.vector_db.upsert_documents_info(document_infos)
579670

580671
return {
581-
"results": f"File '{file}' processed successfully."
582-
for file in document_infos
672+
"processed_documents": [
673+
f"File '{filename}' processed successfully."
674+
for filename in processed_documents
675+
],
676+
"skipped_documents": [
677+
f"File '{filename}' skipped since it already exists."
678+
for filename in skipped_documents
679+
],
583680
}
584681
except Exception as e:
585682
raise e
@@ -588,6 +685,7 @@ async def aingest_files(
588685
for file in files:
589686
file.file.close()
590687

688+
@telemetry_event("IngestFiles")
591689
async def ingest_files_app(
592690
self,
593691
files: list[UploadFile] = File(...),
@@ -756,6 +854,7 @@ class UpdateFilesRequest(BaseModel):
756854
metadatas: Optional[str] = Form(None)
757855
ids: str = Form("")
758856

857+
@telemetry_event("UpdateFiles")
759858
async def update_files_app(
760859
self,
761860
files: list[UploadFile] = File(...),
@@ -845,6 +944,7 @@ class SearchRequest(BaseModel):
845944
search_limit: int = 10
846945
do_hybrid_search: Optional[bool] = False
847946

947+
@telemetry_event("Search")
848948
async def search_app(self, request: SearchRequest):
849949
async with manage_run(self.run_manager, "search_app") as run_id:
850950
try:
@@ -960,6 +1060,7 @@ class RAGRequest(BaseModel):
9601060
rag_generation_config: Optional[str] = None
9611061
streaming: Optional[bool] = None
9621062

1063+
@telemetry_event("RAG")
9631064
async def rag_app(self, request: RAGRequest):
9641065
async with manage_run(self.run_manager, "rag_app") as run_id:
9651066
try:
@@ -1069,6 +1170,7 @@ class EvalRequest(BaseModel):
10691170
context: str
10701171
completion: str
10711172

1173+
@telemetry_event("Evaluate")
10721174
async def evaluate_app(self, request: EvalRequest):
10731175
async with manage_run(self.run_manager, "evaluate_app") as run_id:
10741176
try:
@@ -1110,6 +1212,7 @@ class DeleteRequest(BaseModel):
11101212
keys: list[str]
11111213
values: list[Union[bool, int, str]]
11121214

1215+
@telemetry_event("Delete")
11131216
async def delete_app(self, request: DeleteRequest = Body(...)):
11141217
try:
11151218
return await self.adelete(request.keys, request.values)
@@ -1168,6 +1271,7 @@ async def alogs(
11681271

11691272
return {"results": aggregated_logs}
11701273

1274+
@telemetry_event("Logs")
11711275
async def logs_app(
11721276
self,
11731277
log_type_filter: Optional[str] = Query(None),
@@ -1236,27 +1340,27 @@ async def aanalytics(
12361340
analysis_type = analysis_config[0]
12371341
if analysis_type == "bar_chart":
12381342
extract_key = analysis_config[1]
1239-
results[
1240-
filter_key
1241-
] = AnalysisTypes.generate_bar_chart_data(
1242-
filtered_logs[filter_key], extract_key
1343+
results[filter_key] = (
1344+
AnalysisTypes.generate_bar_chart_data(
1345+
filtered_logs[filter_key], extract_key
1346+
)
12431347
)
12441348
elif analysis_type == "basic_statistics":
12451349
extract_key = analysis_config[1]
1246-
results[
1247-
filter_key
1248-
] = AnalysisTypes.calculate_basic_statistics(
1249-
filtered_logs[filter_key], extract_key
1350+
results[filter_key] = (
1351+
AnalysisTypes.calculate_basic_statistics(
1352+
filtered_logs[filter_key], extract_key
1353+
)
12501354
)
12511355
elif analysis_type == "percentile":
12521356
extract_key = analysis_config[1]
12531357
percentile = int(analysis_config[2])
1254-
results[
1255-
filter_key
1256-
] = AnalysisTypes.calculate_percentile(
1257-
filtered_logs[filter_key],
1258-
extract_key,
1259-
percentile,
1358+
results[filter_key] = (
1359+
AnalysisTypes.calculate_percentile(
1360+
filtered_logs[filter_key],
1361+
extract_key,
1362+
percentile,
1363+
)
12601364
)
12611365
else:
12621366
logger.warning(
@@ -1265,6 +1369,7 @@ async def aanalytics(
12651369

12661370
return {"results": results}
12671371

1372+
@telemetry_event("Analytics")
12681373
async def analytics_app(
12691374
self,
12701375
filter_criteria: FilterCriteria = Body(...),
@@ -1292,6 +1397,7 @@ async def aapp_settings(self, *args: Any, **kwargs: Any):
12921397
}
12931398
}
12941399

1400+
@telemetry_event("AppSettings")
12951401
async def app_settings_app(self):
12961402
"""Return the config.json and all prompts."""
12971403
try:
@@ -1306,6 +1412,7 @@ async def ausers_stats(self, user_ids: Optional[list[uuid.UUID]] = None):
13061412
[str(ele) for ele in user_ids]
13071413
)
13081414

1415+
@telemetry_event("UsersStats")
13091416
async def users_stats_app(
13101417
self, user_ids: Optional[list[uuid.UUID]] = Query(None)
13111418
):
@@ -1335,6 +1442,7 @@ async def adocuments_info(
13351442
),
13361443
)
13371444

1445+
@telemetry_event("DocumentsInfo")
13381446
async def documents_info_app(
13391447
self,
13401448
document_ids: Optional[list[str]] = Query(None),
@@ -1355,6 +1463,7 @@ async def documents_info_app(
13551463
async def adocument_chunks(self, document_id: str) -> list[str]:
13561464
return self.providers.vector_db.get_document_chunks(document_id)
13571465

1466+
@telemetry_event("DocumentChunks")
13581467
async def document_chunks_app(self, document_id: str):
13591468
try:
13601469
chunks = await self.adocument_chunks(document_id)
@@ -1365,6 +1474,7 @@ async def document_chunks_app(self, document_id: str):
13651474
)
13661475
raise HTTPException(status_code=500, detail=str(e)) from e
13671476

1477+
@telemetry_event("OpenAPI")
13681478
def openapi_spec_app(self):
13691479
from fastapi.openapi.utils import get_openapi
13701480

0 commit comments

Comments
 (0)