@@ -285,9 +285,11 @@ async def _log_performance_metric(self, redis_client: AsyncRedis, ttft: int | No
285285 logger .error (f"Failed to log request metrics (latency) in redis (id: { self .id } )" , exc_info = True )
286286 await safe_redis_reset (redis_client )
287287
288- def _start_langfuse_observation (self , request_content : RequestContent ) -> Any | None :
288+ def _start_langfuse_observation (self , request_content : RequestContent , ctx = None ) -> Any | None :
289289 langfuse_obs = None
290290 if global_context .langfuse_client is not None :
291+ if ctx is None :
292+ ctx = request_context .get ()
291293 try :
292294 langfuse_obs = global_context .langfuse_client .start_observation (
293295 as_type = "generation" ,
@@ -299,9 +301,10 @@ def _start_langfuse_observation(self, request_content: RequestContent) -> Any |
299301
300302 return langfuse_obs
301303
302- def _end_langfuse_observation (self , langfuse_obs : Any , latency : int | None , ttft : int | None = None ):
304+ def _end_langfuse_observation (self , langfuse_obs : Any , latency : int | None , ttft : int | None = None , ctx = None ):
303305 if global_context .langfuse_client is not None and langfuse_obs is not None :
304- ctx = request_context .get ()
306+ if ctx is None :
307+ ctx = request_context .get ()
305308 if ctx .usage is not None :
306309 global_context .langfuse_client .update_observation (
307310 langfuse_obs ,
@@ -429,7 +432,8 @@ async def forward_stream(self, request_content: RequestContent, redis_client: As
429432 url = urljoin (base = self .url , url = self .ENDPOINT_TABLE .get_endpoint (endpoint = request_content .endpoint ).lstrip ("/" ))
430433 request_content = self ._format_request (request_content = request_content )
431434
432- langfuse_obs = self ._start_langfuse_observation (request_content = request_content )
435+ ctx = request_context .get ()
436+ langfuse_obs = self ._start_langfuse_observation (request_content = request_content , ctx = ctx )
433437 inflight_key = f"{ PREFIX__REDIS_METRIC_GAUGE } :{ Metric .INFLIGHT .value } :{ self .id } "
434438 inflight_incremented = False
435439
@@ -481,7 +485,7 @@ async def forward_stream(self, request_content: RequestContent, redis_client: As
481485 latency = self ._elapsed_ms (start_time = start_time )
482486 extra_chunk = self ._get_extra_stream_chunk (request_content = request_content , buffer = buffer , latency = latency )
483487
484- self ._end_langfuse_observation (langfuse_obs = langfuse_obs , latency = latency , ttft = ttft )
488+ self ._end_langfuse_observation (langfuse_obs = langfuse_obs , latency = latency , ttft = ttft , ctx = ctx )
485489
486490 if extra_chunk is not None :
487491 yield f"data: { dumps (extra_chunk )} \n \n " , response .status_code
0 commit comments