Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/zenml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __getattr__(name: str) -> Any:
from zenml.pipelines import get_pipeline_context, pipeline
from zenml.steps import step, get_step_context
from zenml.steps.utils import log_step_metadata
from zenml.trace_collectors.utils import get_trace_collector
from zenml.utils.metadata_utils import log_metadata
from zenml.utils.tag_utils import Tag, add_tags, remove_tags

Expand All @@ -71,6 +72,7 @@ def __getattr__(name: str) -> Any:
"ExternalArtifact",
"get_pipeline_context",
"get_step_context",
"get_trace_collector",
"load_artifact",
"log_metadata",
"log_artifact_metadata",
Expand Down
12 changes: 12 additions & 0 deletions src/zenml/cli/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,14 @@ def register_stack(
type=str,
required=False,
)
@click.option(
"-t",
"--trace_collector",
"trace_collector",
help="Name of the trace collector for this stack.",
type=str,
required=False,
)
def update_stack(
stack_name_or_id: Optional[str] = None,
artifact_store: Optional[str] = None,
Expand All @@ -673,6 +681,7 @@ def update_stack(
data_validator: Optional[str] = None,
image_builder: Optional[str] = None,
model_registry: Optional[str] = None,
trace_collector: Optional[str] = None,
) -> None:
"""Update a stack.

Expand All @@ -691,6 +700,7 @@ def update_stack(
data_validator: Name of the new data validator for this stack.
image_builder: Name of the new image builder for this stack.
model_registry: Name of the new model registry for this stack.
trace_collector: Name of the new trace collector for this stack.
"""
client = Client()

Expand Down Expand Up @@ -718,6 +728,8 @@ def update_stack(
updates[StackComponentType.MODEL_REGISTRY] = [model_registry]
if image_builder:
updates[StackComponentType.IMAGE_BUILDER] = [image_builder]
if trace_collector:
updates[StackComponentType.TRACE_COLLECTOR] = [trace_collector]
if model_deployer:
updates[StackComponentType.MODEL_DEPLOYER] = [model_deployer]
if orchestrator:
Expand Down
17 changes: 17 additions & 0 deletions src/zenml/config/step_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class StepConfigurationUpdate(StrictBaseModel):
enable_step_logs: Optional[bool] = None
step_operator: Optional[Union[bool, str]] = None
experiment_tracker: Optional[Union[bool, str]] = None
trace_collector: Optional[Union[bool, str]] = None
parameters: Dict[str, Any] = {}
settings: Dict[str, SerializeAsAny[BaseSettings]] = {}
extra: Dict[str, Any] = {}
Expand Down Expand Up @@ -190,6 +191,22 @@ def uses_experiment_tracker(self, name: str) -> bool:
else:
return False

def uses_trace_collector(self, name: str) -> bool:
"""Checks if the step configuration uses the given trace collector.

Args:
name: The name of the trace collector.

Returns:
If the step configuration uses the given trace collector.
"""
if self.trace_collector is True:
return True
elif isinstance(self.trace_collector, str):
return self.trace_collector == name
else:
return False


class PartialStepConfiguration(StepConfigurationUpdate):
"""Class representing a partial step configuration."""
Expand Down
1 change: 1 addition & 0 deletions src/zenml/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class StackComponentType(StrEnum):
ORCHESTRATOR = "orchestrator"
STEP_OPERATOR = "step_operator"
MODEL_REGISTRY = "model_registry"
TRACE_COLLECTOR = "trace_collector"

@property
def plural(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions src/zenml/integrations/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
KUBERNETES = "kubernetes"
LABEL_STUDIO = "label_studio"
LANGCHAIN = "langchain"
LANGFUSE = "langfuse"
LIGHTGBM = "lightgbm"
# LLAMA_INDEX = "llama_index"
MLFLOW = "mlflow"
Expand Down
49 changes: 49 additions & 0 deletions src/zenml/integrations/langfuse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""LangFuse integration for ZenML.

The LangFuse integration allows ZenML to collect and query traces from LangFuse,
an open-source LLM observability platform. This enables monitoring, debugging,
and analysis of LLM applications through ZenML's trace collector interface.
"""

from typing import List, Type

from zenml.integrations.constants import LANGFUSE
from zenml.integrations.integration import Integration


class LangFuseIntegration(Integration):
"""Definition of LangFuse integration for ZenML."""

NAME = LANGFUSE
REQUIREMENTS = ["langfuse>=3.2.0"]

@classmethod
def flavors(cls) -> List[Type["Flavor"]]:
"""Declare the flavors for the LangFuse integration.

Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.langfuse.flavors import (
LangFuseTraceCollectorFlavor,
)

return [
LangFuseTraceCollectorFlavor,
]


LangFuseIntegration.check_installation()
20 changes: 20 additions & 0 deletions src/zenml/integrations/langfuse/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""LangFuse integration constants."""

# Environment variables for trace context propagation
ZENML_LANGFUSE_TRACE_ID = "ZENML_LANGFUSE_TRACE_ID"
ZENML_LANGFUSE_SESSION_ID = "ZENML_LANGFUSE_SESSION_ID"
ZENML_LANGFUSE_PIPELINE_NAME = "ZENML_LANGFUSE_PIPELINE_NAME"
ZENML_LANGFUSE_USER_ID = "ZENML_LANGFUSE_USER_ID"
24 changes: 24 additions & 0 deletions src/zenml/integrations/langfuse/flavors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""LangFuse flavors."""

from zenml.integrations.langfuse.flavors.langfuse_trace_collector_flavor import (
LangFuseTraceCollectorConfig,
LangFuseTraceCollectorFlavor,
)

__all__ = [
"LangFuseTraceCollectorConfig",
"LangFuseTraceCollectorFlavor",
]
Loading