@@ -195,21 +195,33 @@ def check_user_file_processing(self: Task, *, tenant_id: str) -> None:
195195 for user_file_id in user_file_ids :
196196 # --- Protection 2: per-file queued guard ---
197197 queued_key = _user_file_queued_key (user_file_id )
198- if redis_client .exists (queued_key ):
198+ guard_set = redis_client .set (
199+ queued_key ,
200+ 1 ,
201+ ex = CELERY_USER_FILE_PROCESSING_TASK_EXPIRES ,
202+ nx = True ,
203+ )
204+ if not guard_set :
199205 skipped_guard += 1
200206 continue
201207
202- # --- Protection 3: task expiry + set guard ---
203- self .app .send_task (
204- OnyxCeleryTask .PROCESS_SINGLE_USER_FILE ,
205- kwargs = {"user_file_id" : str (user_file_id ), "tenant_id" : tenant_id },
206- queue = OnyxCeleryQueues .USER_FILE_PROCESSING ,
207- priority = OnyxCeleryPriority .HIGH ,
208- expires = CELERY_USER_FILE_PROCESSING_TASK_EXPIRES ,
209- )
210- redis_client .setex (
211- queued_key , CELERY_USER_FILE_PROCESSING_TASK_EXPIRES , 1
212- )
208+ # --- Protection 3: task expiry ---
209+ # If task submission fails, clear the guard immediately so the
210+ # next beat cycle can retry enqueuing this file.
211+ try :
212+ self .app .send_task (
213+ OnyxCeleryTask .PROCESS_SINGLE_USER_FILE ,
214+ kwargs = {
215+ "user_file_id" : str (user_file_id ),
216+ "tenant_id" : tenant_id ,
217+ },
218+ queue = OnyxCeleryQueues .USER_FILE_PROCESSING ,
219+ priority = OnyxCeleryPriority .HIGH ,
220+ expires = CELERY_USER_FILE_PROCESSING_TASK_EXPIRES ,
221+ )
222+ except Exception :
223+ redis_client .delete (queued_key )
224+ raise
213225 enqueued += 1
214226
215227 finally :
0 commit comments