Skip to content

Commit 21283b4

Browse files
committed
chore: enable langfuse configuration through global variables
1 parent bdcd76d commit 21283b4

File tree

7 files changed

+66
-10
lines changed

7 files changed

+66
-10
lines changed

src/backend/base/langflow/api/build.py

+1
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ async def create_graph(fresh_session, flow_id_str: str, flow_name: str | None) -
237237
flow_name = result.first()
238238

239239
return await build_graph_from_data(
240+
session=fresh_session,
240241
flow_id=flow_id_str,
241242
payload=data.model_dump(),
242243
user_id=str(current_user.id),

src/backend/base/langflow/api/utils.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def _get_flow_name(flow_id: uuid.UUID) -> str:
149149
return flow.name
150150

151151

152-
async def build_graph_from_data(flow_id: uuid.UUID | str, payload: dict, **kwargs):
152+
async def build_graph_from_data(session: DbSession, flow_id: uuid.UUID | str, payload: dict, **kwargs):
153153
"""Build and cache the graph."""
154154
# Get flow name
155155
if "flow_name" not in kwargs:
@@ -169,7 +169,7 @@ async def build_graph_from_data(flow_id: uuid.UUID | str, payload: dict, **kwarg
169169
vertex.update_raw_params({"session_id": session_id}, overwrite=True)
170170

171171
graph.session_id = session_id
172-
await graph.initialize_run()
172+
await graph.initialize_run(session)
173173
return graph
174174

175175

@@ -180,7 +180,7 @@ async def build_graph_from_db_no_cache(flow_id: uuid.UUID, session: AsyncSession
180180
msg = "Invalid flow ID"
181181
raise ValueError(msg)
182182
kwargs["user_id"] = kwargs.get("user_id") or str(flow.user_id)
183-
return await build_graph_from_data(flow_id, flow.data, flow_name=flow.name, **kwargs)
183+
return await build_graph_from_data(session, flow_id, flow.data, flow_name=flow.name, **kwargs)
184184

185185

186186
async def build_graph_from_db(flow_id: uuid.UUID, session: AsyncSession, chat_service: ChatService, **kwargs):

src/backend/base/langflow/api/v1/chat.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ async def build_vertex(
248248
inputs: Annotated[InputValueRequest | None, Body(embed=True)] = None,
249249
files: list[str] | None = None,
250250
current_user: CurrentActiveUser,
251+
session: DbSession,
251252
) -> VertexBuildResponse:
252253
"""Build a vertex instead of the entire graph.
253254
@@ -258,6 +259,7 @@ async def build_vertex(
258259
inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None.
259260
files (List[str], optional): The files to use. Defaults to None.
260261
current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user).
262+
session (AsyncSession, optional): The session dependency. Defaults to Depends
261263
262264
Returns:
263265
VertexBuildResponse: The response containing the built vertex information.
@@ -291,7 +293,7 @@ async def build_vertex(
291293
)
292294
else:
293295
graph = cache.get("result")
294-
await graph.initialize_run()
296+
await graph.initialize_run(session)
295297
vertex = graph.get_vertex(vertex_id)
296298

297299
try:

src/backend/base/langflow/graph/graph/base.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
if TYPE_CHECKING:
4747
from collections.abc import Callable, Generator, Iterable
4848

49+
from sqlmodel.ext.asyncio.session import AsyncSession
50+
4951
from langflow.api.v1.schemas import InputValueRequest
5052
from langflow.custom.custom_component.component import Component
5153
from langflow.events.event_manager import EventManager
@@ -651,12 +653,13 @@ def set_run_id(self, run_id: uuid.UUID | None = None) -> None:
651653

652654
self._run_id = str(run_id)
653655

654-
async def initialize_run(self) -> None:
656+
async def initialize_run(self, session: AsyncSession) -> None:
655657
if not self._run_id:
656658
self.set_run_id()
657659
if self.tracing_service:
658660
run_name = f"{self.flow_name} - {self.flow_id}"
659661
await self.tracing_service.start_tracers(
662+
session=session,
660663
run_id=uuid.UUID(self._run_id),
661664
run_name=run_name,
662665
user_id=self.user_id,

src/backend/base/langflow/services/tracing/base.py

+5
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,8 @@ def end(
6969
@abstractmethod
7070
def get_langchain_callback(self) -> BaseCallbackHandler | None:
7171
raise NotImplementedError
72+
73+
@abstractmethod
74+
def get_required_variable_names(self) -> list[str]:
75+
"""Returns a list of variable names required to configure the service."""
76+
raise NotImplementedError

src/backend/base/langflow/services/tracing/langfuse.py

+23
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,44 @@ def __init__(
2929
trace_type: str,
3030
project_name: str,
3131
trace_id: UUID,
32+
public_key: str | None = None,
33+
secrect_key: str | None = None,
34+
host: str | None = None,
3235
user_id: str | None = None,
3336
session_id: str | None = None,
3437
) -> None:
3538
self.project_name = project_name
3639
self.trace_name = trace_name
3740
self.trace_type = trace_type
3841
self.trace_id = trace_id
42+
self.public_key = public_key
43+
self.secrect_key = secrect_key
44+
self.host = host
3945
self.user_id = user_id
4046
self.session_id = session_id
4147
self.flow_id = trace_name.split(" - ")[-1]
4248
self.spans: dict = OrderedDict() # spans that are not ended
4349

50+
if self.host is not None:
51+
os.environ["LANGFUSE_HOST"] = self.host
52+
53+
if self.public_key is not None:
54+
os.environ["LANGFUSE_PUBLIC_KEY"] = self.public_key
55+
56+
if self.secrect_key is not None:
57+
os.environ["LANGFUSE_SECRET_KEY"] = self.secrect_key
58+
4459
config = self._get_config()
4560
self._ready: bool = self.setup_langfuse(config) if config else False
4661

62+
@staticmethod
63+
def get_required_variable_names():
64+
return [
65+
"LANGFUSE_SECRET_KEY",
66+
"LANGFUSE_PUBLIC_KEY",
67+
"LANGFUSE_HOST",
68+
]
69+
4770
@property
4871
def ready(self):
4972
return self._ready

src/backend/base/langflow/services/tracing/service.py

+27-5
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
from contextlib import asynccontextmanager
77
from contextvars import ContextVar
88
from typing import TYPE_CHECKING, Any
9+
from uuid import UUID
910

1011
from loguru import logger
1112

1213
from langflow.services.base import Service
14+
from langflow.services.deps import get_variable_service
1315

1416
if TYPE_CHECKING:
15-
from uuid import UUID
16-
1717
from langchain.callbacks.base import BaseCallbackHandler
18+
from sqlmodel.ext.asyncio.session import AsyncSession
1819

1920
from langflow.custom.custom_component.component import Component
2021
from langflow.graph.vertex.base import Vertex
@@ -162,13 +163,19 @@ def _initialize_langwatch_tracer(self, trace_context: TraceContext) -> None:
162163
trace_id=trace_context.run_id,
163164
)
164165

165-
def _initialize_langfuse_tracer(self, trace_context: TraceContext) -> None:
166+
async def _initialize_langfuse_tracer(self, trace_context: TraceContext, session: AsyncSession) -> None:
166167
langfuse_tracer = _get_langfuse_tracer()
168+
variable_names = langfuse_tracer.get_required_variable_names()
169+
variables = await self.get_varaibles_from_db(session, trace_context.user_id, variable_names)
170+
167171
trace_context.tracers["langfuse"] = langfuse_tracer(
168172
trace_name=trace_context.run_name,
169173
trace_type="chain",
170174
project_name=trace_context.project_name,
171175
trace_id=trace_context.run_id,
176+
public_key=variables.get("LANGFUSE_PUBLIC_KEY"),
177+
secret_key=variables.get("LANGFUSE_SECRET_KEY"),
178+
host=variables.get("LANGFUSE_HOST"),
172179
user_id=trace_context.user_id,
173180
session_id=trace_context.session_id,
174181
)
@@ -193,8 +200,17 @@ def _initialize_opik_tracer(self, trace_context: TraceContext) -> None:
193200
session_id=trace_context.session_id,
194201
)
195202

203+
async def get_varaibles_from_db(self, session, user_id, variable_names):
204+
variable_service = get_variable_service()
205+
result = {}
206+
for var in await variable_service.get_all(UUID(user_id), session):
207+
if var.name in variable_names:
208+
result[var.name] = await variable_service.get_variable(UUID(user_id), var.name, "", session)
209+
return result
210+
196211
async def start_tracers(
197212
self,
213+
session: AsyncSession,
198214
run_id: UUID,
199215
run_name: str,
200216
user_id: str | None,
@@ -213,11 +229,17 @@ async def start_tracers(
213229
project_name = project_name or os.getenv("LANGCHAIN_PROJECT", "Langflow")
214230
trace_context = TraceContext(run_id, run_name, project_name, user_id, session_id)
215231
trace_context_var.set(trace_context)
216-
await self._start(trace_context)
232+
233+
await self._start(trace_context, session, user_id)
234+
217235
self._initialize_langsmith_tracer(trace_context)
236+
218237
self._initialize_langwatch_tracer(trace_context)
219-
self._initialize_langfuse_tracer(trace_context)
238+
239+
await self._initialize_langfuse_tracer(trace_context, session)
240+
220241
self._initialize_arize_phoenix_tracer(trace_context)
242+
221243
self._initialize_opik_tracer(trace_context)
222244
except Exception as e: # noqa: BLE001
223245
logger.debug(f"Error initializing tracers: {e}")

0 commit comments

Comments
 (0)