14
14
from langflow .services .deps import get_variable_service
15
15
16
16
if TYPE_CHECKING :
17
- from langchain .callbacks .base import BaseCallbackHandler
18
17
from collections .abc import AsyncGenerator
18
+
19
+ from langchain .callbacks .base import BaseCallbackHandler
19
20
from sqlmodel .ext .asyncio .session import AsyncSession
20
21
21
22
from langflow .custom .custom_component .component import Component
@@ -142,16 +143,17 @@ async def _start(self, trace_context: TraceContext) -> None:
142
143
except Exception : # noqa: BLE001
143
144
logger .exception ("Error starting tracing service" )
144
145
145
- def _initialize_langsmith_tracer (self , trace_context : TraceContext ) -> None :
146
+ def _initialize_langsmith_tracer (self , trace_context : TraceContext , variables : dict ) -> None :
146
147
langsmith_tracer = _get_langsmith_tracer ()
147
148
trace_context .tracers ["langsmith" ] = langsmith_tracer (
148
149
trace_name = trace_context .run_name ,
149
150
trace_type = "chain" ,
150
151
project_name = trace_context .project_name ,
151
152
trace_id = trace_context .run_id ,
153
+ global_vars = variables ,
152
154
)
153
155
154
- def _initialize_langwatch_tracer (self , trace_context : TraceContext ) -> None :
156
+ def _initialize_langwatch_tracer (self , trace_context : TraceContext , variables : dict ) -> None :
155
157
if (
156
158
"langwatch" not in trace_context .tracers
157
159
or trace_context .tracers ["langwatch" ].trace_id != trace_context .run_id
@@ -162,35 +164,32 @@ def _initialize_langwatch_tracer(self, trace_context: TraceContext) -> None:
162
164
trace_type = "chain" ,
163
165
project_name = trace_context .project_name ,
164
166
trace_id = trace_context .run_id ,
167
+ global_vars = variables ,
165
168
)
166
169
167
- async def _initialize_langfuse_tracer (self , trace_context : TraceContext , session : AsyncSession ) -> None :
170
+ def _initialize_langfuse_tracer (self , trace_context : TraceContext , variables : dict ) -> None :
168
171
langfuse_tracer = _get_langfuse_tracer ()
169
- variable_names = langfuse_tracer .get_required_variable_names ()
170
- variables = await self .get_varaibles_from_db (session , trace_context .user_id , variable_names )
171
-
172
172
trace_context .tracers ["langfuse" ] = langfuse_tracer (
173
173
trace_name = trace_context .run_name ,
174
174
trace_type = "chain" ,
175
175
project_name = trace_context .project_name ,
176
176
trace_id = trace_context .run_id ,
177
- public_key = variables .get ("LANGFUSE_PUBLIC_KEY" ),
178
- secret_key = variables .get ("LANGFUSE_SECRET_KEY" ),
179
- host = variables .get ("LANGFUSE_HOST" ),
180
177
user_id = trace_context .user_id ,
181
178
session_id = trace_context .session_id ,
179
+ global_vars = variables ,
182
180
)
183
181
184
- def _initialize_arize_phoenix_tracer (self , trace_context : TraceContext ) -> None :
182
+ def _initialize_arize_phoenix_tracer (self , trace_context : TraceContext , variables : dict ) -> None :
185
183
arize_phoenix_tracer = _get_arize_phoenix_tracer ()
186
184
trace_context .tracers ["arize_phoenix" ] = arize_phoenix_tracer (
187
185
trace_name = trace_context .run_name ,
188
186
trace_type = "chain" ,
189
187
project_name = trace_context .project_name ,
190
188
trace_id = trace_context .run_id ,
189
+ global_vars = variables ,
191
190
)
192
191
193
- def _initialize_opik_tracer (self , trace_context : TraceContext ) -> None :
192
+ def _initialize_opik_tracer (self , trace_context : TraceContext , variables : dict ) -> None :
194
193
opik_tracer = _get_opik_tracer ()
195
194
trace_context .tracers ["opik" ] = opik_tracer (
196
195
trace_name = trace_context .run_name ,
@@ -199,6 +198,7 @@ def _initialize_opik_tracer(self, trace_context: TraceContext) -> None:
199
198
trace_id = trace_context .run_id ,
200
199
user_id = trace_context .user_id ,
201
200
session_id = trace_context .session_id ,
201
+ global_vars = variables ,
202
202
)
203
203
204
204
async def get_varaibles_from_db (self , session_scope , user_id , variable_names ):
@@ -227,17 +227,37 @@ async def start_tracers(
227
227
trace_context = TraceContext (run_id , run_name , project_name , user_id , session_id )
228
228
trace_context_var .set (trace_context )
229
229
230
- await self ._start (trace_context , session , user_id )
231
-
232
- self ._initialize_langsmith_tracer (trace_context )
233
-
234
- self ._initialize_langwatch_tracer (trace_context )
235
-
236
- await self ._initialize_langfuse_tracer (trace_context , session )
237
-
238
- self ._initialize_arize_phoenix_tracer (trace_context )
230
+ tracers = {
231
+ "langsmith" : {
232
+ "names" : _get_langsmith_tracer ().get_required_variable_names (),
233
+ "init_fn" : self ._initialize_langsmith_tracer ,
234
+ },
235
+ "langwatch" : {
236
+ "names" : _get_langwatch_tracer ().get_required_variable_names (),
237
+ "init_fn" : self ._initialize_langwatch_tracer ,
238
+ },
239
+ "langfuse" : {
240
+ "names" : _get_langfuse_tracer ().get_required_variable_names (),
241
+ "init_fn" : self ._initialize_langfuse_tracer ,
242
+ },
243
+ "arize_phoenix" : {
244
+ "names" : _get_arize_phoenix_tracer ().get_required_variable_names (),
245
+ "init_fn" : self ._initialize_arize_phoenix_tracer ,
246
+ },
247
+ "opik" : {
248
+ "names" : _get_opik_tracer ().get_required_variable_names (),
249
+ "init_fn" : self ._initialize_opik_tracer ,
250
+ },
251
+ }
252
+
253
+ all_variable_names = list ({name for t in tracers .values () for name in t ["names" ]})
254
+ variables = await self .get_varaibles_from_db (session_scope , user_id , all_variable_names )
255
+ await self ._start (trace_context )
256
+
257
+ for tracer in tracers .values ():
258
+ filtered_vars = {k : v for k , v in variables .items () if k in tracer ["names" ]}
259
+ tracer ["init_fn" ](trace_context , filtered_vars )
239
260
240
- self ._initialize_opik_tracer (trace_context )
241
261
except Exception as e : # noqa: BLE001
242
262
logger .debug (f"Error initializing tracers: { e } " )
243
263
0 commit comments