44import sys
55from datetime import timedelta
66from functools import partial
7- from typing import Any
87
98import django_rq
109import redis
11- from core .current_request import CurrentContext
1210from django .conf import settings
1311from django_rq import get_connection
1412from rq .command import send_stop_job_command
@@ -82,40 +80,6 @@ def redis_connected():
8280 return redis_healthcheck ()
8381
8482
85- def _is_serializable (value : Any ) -> bool :
86- """Check if a value can be serialized for job context."""
87- return isinstance (value , (str , int , float , bool , list , dict , type (None )))
88-
89-
90- def _capture_context () -> dict :
91- """
92- Capture the current context for passing to a job.
93- Returns a dictionary of context data that can be serialized.
94- """
95- context_data = {}
96-
97- # Get user information
98- if user := CurrentContext .get_user ():
99- context_data ['user_id' ] = user .id
100-
101- # Get organization if set separately
102- if org_id := CurrentContext .get_organization_id ():
103- context_data ['organization_id' ] = org_id
104-
105- # If organization_id is not set, try to get it from the user, this ensures that we have an organization_id for the job
106- # And it prefers the original requesting user's organization_id over the current active organization_id of the user which could change during async jobs
107- if not org_id and user and hasattr (user , 'active_organization_id' ) and user .active_organization_id :
108- context_data ['organization_id' ] = user .active_organization_id
109-
110- # Get any custom context values (exclude non-serializable objects)
111- job_data = CurrentContext .get_job_data ()
112- for key , value in job_data .items ():
113- if key not in ['user' , 'request' ] and _is_serializable (value ):
114- context_data [key ] = value
115-
116- return context_data
117-
118-
11983def redis_get (key ):
12084 if not redis_healthcheck ():
12185 return
@@ -148,9 +112,7 @@ def redis_delete(key):
148112
149113def start_job_async_or_sync (job , * args , in_seconds = 0 , ** kwargs ):
150114 """
151- Start job async with redis or sync if redis is not connected.
152- Automatically preserves context for async jobs and clears it after completion.
153-
115+ Start job async with redis or sync if redis is not connected
154116 :param job: Job function
155117 :param args: Function arguments
156118 :param in_seconds: Job will be delayed for in_seconds
@@ -160,29 +122,28 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
160122
161123 redis = redis_connected () and kwargs .get ('redis' , True )
162124 queue_name = kwargs .get ('queue_name' , 'default' )
163-
164125 if 'queue_name' in kwargs :
165126 del kwargs ['queue_name' ]
166127 if 'redis' in kwargs :
167128 del kwargs ['redis' ]
168-
169129 job_timeout = None
170130 if 'job_timeout' in kwargs :
171131 job_timeout = kwargs ['job_timeout' ]
172132 del kwargs ['job_timeout' ]
173-
174133 if redis :
175- # Async execution with Redis - wrap job for context management
134+ # Auto-capture request_id from thread local and pass it via job meta
176135 try :
177- context_data = _capture_context ()
136+ from label_studio . core . current_request import _thread_locals
178137
179- if context_data :
138+ request_id = getattr (_thread_locals , 'request_id' , None )
139+ if request_id :
140+ # Store in job meta for worker access
180141 meta = kwargs .get ('meta' , {})
181- # Store context data in job meta for worker access
182- meta .update (context_data )
142+ meta ['request_id' ] = request_id
183143 kwargs ['meta' ] = meta
184144 except Exception :
185- logger .info (f'Failed to capture context for job { job .__name__ } on queue { queue_name } ' )
145+ # Fail silently if no request context
146+ pass
186147
187148 try :
188149 args_info = _truncate_args_for_logging (args , kwargs )
@@ -193,7 +154,6 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
193154 enqueue_method = queue .enqueue
194155 if in_seconds > 0 :
195156 enqueue_method = partial (queue .enqueue_in , timedelta (seconds = in_seconds ))
196-
197157 job = enqueue_method (
198158 job ,
199159 * args ,
@@ -204,10 +164,8 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
204164 return job
205165 else :
206166 on_failure = kwargs .pop ('on_failure' , None )
207-
208167 try :
209- result = job (* args , ** kwargs )
210- return result
168+ return job (* args , ** kwargs )
211169 except Exception :
212170 exc_info = sys .exc_info ()
213171 if on_failure :
0 commit comments