Skip to content

feat: tracing configuration via global variables #7183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/backend/base/langflow/api/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ async def create_graph(fresh_session, flow_id_str: str, flow_name: str | None) -
flow_name = result.first()

return await build_graph_from_data(
session=fresh_session,
flow_id=flow_id_str,
payload=data.model_dump(),
user_id=str(current_user.id),
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def build_graph_from_data(flow_id: uuid.UUID | str, payload: dict, **kwarg
vertex.update_raw_params({"session_id": session_id}, overwrite=True)

graph.session_id = session_id
await graph.initialize_run()
await graph.initialize_run(session_scope)
return graph


Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ async def build_vertex(
)
else:
graph = cache.get("result")
await graph.initialize_run()
await graph.initialize_run(session_scope)
vertex = graph.get_vertex(vertex_id)

try:
Expand Down
7 changes: 5 additions & 2 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable
from contextlib import AbstractAsyncContextManager

from langflow.api.v1.schemas import InputValueRequest
from langflow.custom.custom_component.component import Component
Expand Down Expand Up @@ -651,12 +652,14 @@ def set_run_id(self, run_id: uuid.UUID | None = None) -> None:

self._run_id = str(run_id)

async def initialize_run(self) -> None:
async def initialize_run(self, session_scope: Callable[[], AbstractAsyncContextManager[Any]] | None = None) -> None:
if not self._run_id:
self.set_run_id()
if self.tracing_service:

if self.tracing_service and session_scope:
run_name = f"{self.flow_name} - {self.flow_id}"
await self.tracing_service.start_tracers(
session_scope=session_scope,
run_id=uuid.UUID(self._run_id),
run_name=run_name,
user_id=self.user_id,
Expand Down
41 changes: 35 additions & 6 deletions src/backend/base/langflow/services/tracing/arize_phoenix.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,24 @@ class ArizePhoenixTracer(BaseTracer):
chat_input_value: str
chat_output_value: str

@staticmethod
def get_required_variable_names():
return [
"ARIZE_API_KEY",
"ARIZE_SPACE_ID",
"ARIZE_COLLECTOR_ENDPOINT",
"PHOENIX_API_KEY",
"PHOENIX_COLLECTOR_ENDPOINT",
]

def __init__(
self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID, session_id: str | None = None
self,
trace_name: str,
trace_type: str,
project_name: str,
trace_id: UUID,
session_id: str | None = None,
global_vars: dict | None = None,
):
"""Initializes the ArizePhoenixTracer instance and sets up a root span."""
self.trace_name = trace_name
Expand All @@ -52,6 +68,7 @@ def __init__(
self.chat_input_value = ""
self.chat_output_value = ""
self.session_id = session_id
self.global_vars = global_vars or {}

try:
self._ready = self.setup_arize_phoenix()
Expand Down Expand Up @@ -97,9 +114,15 @@ def setup_arize_phoenix(self) -> bool:
}

# Arize Config
arize_api_key = os.getenv("ARIZE_API_KEY", None)
arize_space_id = os.getenv("ARIZE_SPACE_ID", None)
arize_collector_endpoint = os.getenv("ARIZE_COLLECTOR_ENDPOINT", "https://otlp.arize.com")
if self.global_vars:
arize_api_key = self.global_vars["ARIZE_API_KEY"]
arize_space_id = self.global_vars["ARIZE_SPACE_ID"]
arize_collector_endpoint = self.global_vars.get("ARIZE_COLLECTOR_ENDPOINT", "https://otlp.arize.com")
else:
arize_api_key = os.getenv("ARIZE_API_KEY", None)
arize_space_id = os.getenv("ARIZE_SPACE_ID", None)
arize_collector_endpoint = os.getenv("ARIZE_COLLECTOR_ENDPOINT", "https://otlp.arize.com")

enable_arize_tracing = bool(arize_api_key and arize_space_id)
arize_endpoint = f"{arize_collector_endpoint}/v1"
arize_headers = {
Expand All @@ -109,8 +132,14 @@ def setup_arize_phoenix(self) -> bool:
}

# Phoenix Config
phoenix_api_key = os.getenv("PHOENIX_API_KEY", None)
phoenix_collector_endpoint = os.getenv("PHOENIX_COLLECTOR_ENDPOINT", "https://app.phoenix.arize.com")
default_endpoint = "https://app.phoenix.arize.com"
if self.global_vars:
phoenix_api_key = self.global_vars["PHOENIX_API_KEY"]
phoenix_collector_endpoint = self.global_vars.get("PHOENIX_COLLECTOR_ENDPOINT", default_endpoint)
else:
phoenix_api_key = os.getenv("PHOENIX_API_KEY", None)
phoenix_collector_endpoint = os.getenv("PHOENIX_COLLECTOR_ENDPOINT", default_endpoint)

enable_phoenix_tracing = bool(phoenix_api_key)
phoenix_endpoint = f"{phoenix_collector_endpoint}/v1/traces"
phoenix_headers = {
Expand Down
5 changes: 5 additions & 0 deletions src/backend/base/langflow/services/tracing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ def end(
@abstractmethod
def get_langchain_callback(self) -> BaseCallbackHandler | None:
raise NotImplementedError

@abstractmethod
def get_required_variable_names(self) -> list[str]:
"""Returns a list of variable names required to configure the service."""
raise NotImplementedError
26 changes: 21 additions & 5 deletions src/backend/base/langflow/services/tracing/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
class LangFuseTracer(BaseTracer):
flow_id: str

@staticmethod
def get_required_variable_names():
return [
"LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_HOST",
]

def __init__(
self,
trace_name: str,
Expand All @@ -31,6 +39,7 @@ def __init__(
trace_id: UUID,
user_id: str | None = None,
session_id: str | None = None,
global_vars: dict | None = None,
) -> None:
self.project_name = project_name
self.trace_name = trace_name
Expand All @@ -40,8 +49,9 @@ def __init__(
self.session_id = session_id
self.flow_id = trace_name.split(" - ")[-1]
self.spans: dict = OrderedDict() # spans that are not ended
self.global_vars = global_vars or {}

config = self._get_config()
config = self._get_config(self.global_vars)
self._ready: bool = self.setup_langfuse(config) if config else False

@property
Expand Down Expand Up @@ -161,10 +171,16 @@ def get_langchain_callback(self) -> BaseCallbackHandler | None:
return stateful_client.get_langchain_handler()

@staticmethod
def _get_config() -> dict:
secret_key = os.getenv("LANGFUSE_SECRET_KEY", None)
public_key = os.getenv("LANGFUSE_PUBLIC_KEY", None)
host = os.getenv("LANGFUSE_HOST", None)
def _get_config(global_vars) -> dict:
if global_vars:
secret_key = global_vars.get("LANGFUSE_SECRET_KEY", None)
public_key = global_vars.get("LANGFUSE_PUBLIC_KEY", None)
host = global_vars.get("LANGFUSE_HOST", None)
else:
secret_key = os.getenv("LANGFUSE_SECRET_KEY", None)
public_key = os.getenv("LANGFUSE_PUBLIC_KEY", None)
host = os.getenv("LANGFUSE_HOST", None)

if secret_key and public_key and host:
return {"secret_key": secret_key, "public_key": public_key, "host": host}
return {}
19 changes: 16 additions & 3 deletions src/backend/base/langflow/services/tracing/langsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@


class LangSmithTracer(BaseTracer):
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
@staticmethod
def get_required_variable_names():
return [
"LANGCHAIN_API_KEY",
]

def __init__(
self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID, global_vars: dict | None = None
):
try:
self.global_vars = global_vars or {}

self._ready = self.setup_langsmith()
if not self._ready:
return
Expand All @@ -51,12 +61,15 @@ def ready(self):
return self._ready

def setup_langsmith(self) -> bool:
if os.getenv("LANGCHAIN_API_KEY") is None:
if os.getenv("LANGCHAIN_API_KEY") is None and "LANGCHAIN_API_KEY" not in self.global_vars:
return False
try:
from langsmith import Client

self._client = Client()
if "LANGCHAIN_API_KEY" in self.global_vars:
self._client = Client(api_key=self.global_vars["LANGCHAIN_API_KEY"])
else:
self._client = Client()
except ImportError:
logger.exception("Could not import langsmith. Please install it with `pip install langsmith`.")
return False
Expand Down
14 changes: 12 additions & 2 deletions src/backend/base/langflow/services/tracing/langwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@
class LangWatchTracer(BaseTracer):
flow_id: str

def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
@staticmethod
def get_required_variable_names():
return ["LANGWATCH_API_KEY"]

def __init__(
self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID, global_vars: dict | None = None
) -> None:
self.trace_name = trace_name
self.trace_type = trace_type
self.project_name = project_name
self.trace_id = trace_id
self.flow_id = trace_name.split(" - ")[-1]
self.global_vars = global_vars or {}

try:
self._ready: bool = self.setup_langwatch()
Expand Down Expand Up @@ -59,12 +66,15 @@ def ready(self):
return self._ready

def setup_langwatch(self) -> bool:
if "LANGWATCH_API_KEY" not in os.environ:
if "LANGWATCH_API_KEY" not in os.environ and "LANGWATCH_API_KEY" not in self.global_vars:
return False
try:
import langwatch

self._client = langwatch
if self.global_vars.get("LANGWATCH_API_KEY", None):
self._client.api_key = self.global_vars["LANGWATCH_API_KEY"]

except ImportError:
logger.exception("Could not import langwatch. Please install it with `pip install langwatch`.")
return False
Expand Down
25 changes: 20 additions & 5 deletions src/backend/base/langflow/services/tracing/opik.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ def get_distributed_trace_headers(trace_id, span_id):
class OpikTracer(BaseTracer):
flow_id: str

@staticmethod
def get_required_variable_names():
return [
"OPIK_API_KEY",
"OPIK_URL_OVERRIDE",
"OPIK_WORKSPACE",
]

def __init__(
self,
trace_name: str,
Expand All @@ -39,6 +47,7 @@ def __init__(
trace_id: UUID,
user_id: str | None = None,
session_id: str | None = None,
global_vars: dict | None = None,
):
self._project_name = project_name
self.trace_name = trace_name
Expand All @@ -48,8 +57,9 @@ def __init__(
self.session_id = session_id
self.flow_id = trace_name.split(" - ")[-1]
self.spans: dict = {}
self.global_vars = global_vars or {}

config = self._get_config()
config = self._get_config(self.global_vars)
self._ready: bool = self._setup_opik(config, trace_id) if config else False
self._distributed_headers = None

Expand Down Expand Up @@ -224,10 +234,15 @@ def _convert_to_opik_type(self, value):
return value

@staticmethod
def _get_config() -> dict:
host = os.getenv("OPIK_URL_OVERRIDE", None)
api_key = os.getenv("OPIK_API_KEY", None)
workspace = os.getenv("OPIK_WORKSPACE", None)
def _get_config(global_vars) -> dict:
if global_vars:
host = global_vars.get("OPIK_URL_OVERRIDE", None)
api_key = global_vars.get("OPIK_API_KEY", None)
workspace = global_vars.get("OPIK_WORKSPACE", None)
else:
host = os.getenv("OPIK_URL_OVERRIDE", None)
api_key = os.getenv("OPIK_API_KEY", None)
workspace = os.getenv("OPIK_WORKSPACE", None)

# API Key is mandatory for Opik Cloud and URL is mandatory for Open-Source Opik Server
if host or api_key:
Expand Down
Loading
Loading