@@ -1361,17 +1361,46 @@ async def apipeline_enqueue_documents(
13611361 # Exclude IDs of documents that are already enqueued
13621362 unique_new_doc_ids = await self .doc_status .filter_keys (all_new_doc_ids )
13631363
1364- # Log ignored document IDs ( documents that were filtered out because they already exist)
1364+ # Handle duplicate documents - create trackable records with current track_id
13651365 ignored_ids = list (all_new_doc_ids - unique_new_doc_ids )
13661366 if ignored_ids :
1367+ duplicate_docs : dict [str , Any ] = {}
13671368 for doc_id in ignored_ids :
13681369 file_path = new_docs .get (doc_id , {}).get ("file_path" , "unknown_source" )
1369- logger .warning (
1370- f"Ignoring document ID (already exists): { doc_id } ({ file_path } )"
1370+ logger .warning (f"Duplicate document detected: { doc_id } ({ file_path } )" )
1371+
1372+ # Get existing document info for reference
1373+ existing_doc = await self .doc_status .get_by_id (doc_id )
1374+ existing_status = (
1375+ existing_doc .get ("status" , "unknown" ) if existing_doc else "unknown"
1376+ )
1377+ existing_track_id = (
1378+ existing_doc .get ("track_id" , "" ) if existing_doc else ""
13711379 )
1372- if len (ignored_ids ) > 3 :
1373- logger .warning (
1374- f"Total Ignoring { len (ignored_ids )} document IDs that already exist in storage"
1380+
1381+ # Create a new record with unique ID for this duplicate attempt
1382+ dup_record_id = compute_mdhash_id (f"{ doc_id } -{ track_id } " , prefix = "dup-" )
1383+ duplicate_docs [dup_record_id ] = {
1384+ "status" : DocStatus .FAILED ,
1385+ "content_summary" : f"[DUPLICATE] Original document: { doc_id } " ,
1386+ "content_length" : new_docs .get (doc_id , {}).get ("content_length" , 0 ),
1387+ "created_at" : datetime .now (timezone .utc ).isoformat (),
1388+ "updated_at" : datetime .now (timezone .utc ).isoformat (),
1389+ "file_path" : file_path ,
1390+ "track_id" : track_id , # Use current track_id for tracking
1391+ "error_msg" : f"Content already exists. Original doc_id: { doc_id } , Status: { existing_status } " ,
1392+ "metadata" : {
1393+ "is_duplicate" : True ,
1394+ "original_doc_id" : doc_id ,
1395+ "original_track_id" : existing_track_id ,
1396+ },
1397+ }
1398+
1399+ # Store duplicate records in doc_status
1400+ if duplicate_docs :
1401+ await self .doc_status .upsert (duplicate_docs )
1402+ logger .info (
1403+ f"Created { len (duplicate_docs )} duplicate document records with track_id: { track_id } "
13751404 )
13761405
13771406 # Filter new_docs to only include documents with unique IDs
0 commit comments