66
77from core .base import (
88 DocumentChunk ,
9+ DocumentResponse ,
910 GraphConstructionStatus ,
1011 R2RException ,
1112)
@@ -101,6 +102,20 @@ async def ingest_files(input_data):
101102 collection_id = generate_default_user_collection_id (
102103 document_info .owner_id
103104 )
105+ collection_ids = [collection_id ]
106+ else :
107+ collection_ids_uuid = []
108+ for cid in collection_ids :
109+ if isinstance (cid , str ):
110+ collection_ids_uuid .append (UUID (cid ))
111+ elif isinstance (cid , UUID ):
112+ collection_ids_uuid .append (cid )
113+ collection_ids = collection_ids_uuid
114+
115+ await _ensure_collections_exists (
116+ service , document_info , collection_ids
117+ )
118+ for collection_id in collection_ids :
104119 await service .providers .database .collections_handler .assign_document_to_collection_relational (
105120 document_id = document_info .id ,
106121 collection_id = collection_id ,
@@ -119,49 +134,6 @@ async def ingest_files(input_data):
119134 status_type = "graph_cluster_status" ,
120135 status = GraphConstructionStatus .OUTDATED , # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
121136 )
122- else :
123- for collection_id in collection_ids :
124- try :
125- # FIXME: Right now we just throw a warning if the collection already exists, but we should probably handle this more gracefully
126- name = "My Collection"
127- description = f"A collection started during { document_info .title } ingestion"
128-
129- await service .providers .database .collections_handler .create_collection (
130- owner_id = document_info .owner_id ,
131- name = name ,
132- description = description ,
133- collection_id = collection_id ,
134- )
135- await service .providers .database .graphs_handler .create (
136- collection_id = collection_id ,
137- name = name ,
138- description = description ,
139- graph_id = collection_id ,
140- )
141- except Exception as e :
142- logger .warning (
143- f"Warning, could not create collection with error: { str (e )} "
144- )
145-
146- await service .providers .database .collections_handler .assign_document_to_collection_relational (
147- document_id = document_info .id ,
148- collection_id = collection_id ,
149- )
150-
151- await service .providers .database .chunks_handler .assign_document_chunks_to_collection (
152- document_id = document_info .id ,
153- collection_id = collection_id ,
154- )
155- await service .providers .database .documents_handler .set_workflow_status (
156- id = collection_id ,
157- status_type = "graph_sync_status" ,
158- status = GraphConstructionStatus .OUTDATED ,
159- )
160- await service .providers .database .documents_handler .set_workflow_status (
161- id = collection_id ,
162- status_type = "graph_cluster_status" ,
163- status = GraphConstructionStatus .OUTDATED , # NOTE - we should actually check that cluster has been made first, if not it should be PENDING still
164- )
165137 except Exception as e :
166138 logger .error (
167139 f"Error during assigning document to collection: { str (e )} "
@@ -238,6 +210,79 @@ async def ingest_files(input_data):
238210 status_code = 500 , detail = f"Error during ingestion: { str (e )} "
239211 ) from e
240212
213+ async def _ensure_collections_exists (
214+ service : IngestionService ,
215+ document_info : DocumentResponse ,
216+ collection_ids : list [UUID ],
217+ ):
218+ try :
219+ result = await service .providers .database .collections_handler .get_collections_overview (
220+ offset = 0 ,
221+ limit = len (collection_ids ),
222+ filter_collection_ids = collection_ids ,
223+ )
224+ existing_collections = result .get ("results" , [])
225+ if not isinstance (existing_collections , list ):
226+ logger .error (
227+ "Invalid response format for existing collections retrieval: %s" ,
228+ result ,
229+ )
230+ raise R2RException (
231+ status_code = 500 ,
232+ message = "Error during collection retrieval: Invalid response format." ,
233+ )
234+ existing_collection_ids = [c .id for c in existing_collections ]
235+ user_info = (
236+ await service .providers .database .users_handler .get_user_by_id (
237+ id = document_info .owner_id
238+ )
239+ )
240+ logger .debug (
241+ "existing collection ids: %s" , existing_collection_ids
242+ )
243+ user_collection_ids = user_info .collection_ids or []
244+ logger .debug ("user collection ids: %s" , user_collection_ids )
245+ for collection_id in collection_ids :
246+ if collection_id in existing_collection_ids :
247+ if collection_id in user_collection_ids :
248+ continue
249+ else :
250+ raise R2RException (
251+ status_code = 403 ,
252+ message = f"Collection { collection_id } does not belong to user "
253+ f"{ document_info .owner_id } " ,
254+ )
255+ # create collection if not exist
256+ # (maybe failed is more safe if collection is not exists?)
257+ docname = document_info .title or document_info .id
258+ name = f"Created for ingesting document { docname } "
259+ logger .info (
260+ "Creating collection: %s, %s " , collection_id , name
261+ )
262+ description = name
263+
264+ await service .providers .database .collections_handler .create_collection (
265+ owner_id = document_info .owner_id ,
266+ name = name ,
267+ description = description ,
268+ collection_id = collection_id ,
269+ )
270+ await service .providers .database .users_handler .add_user_to_collection (
271+ id = document_info .owner_id ,
272+ collection_id = collection_id ,
273+ )
274+ await service .providers .database .graphs_handler .create (
275+ collection_id = collection_id ,
276+ name = name ,
277+ description = description ,
278+ )
279+ except Exception as e :
280+ logger .warning (
281+ f"Warning, could not ensure collection: { str (e )} " ,
282+ exc_info = True ,
283+ )
284+ raise e
285+
241286 async def ingest_chunks (input_data ):
242287 document_info = None
243288 try :
0 commit comments