Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ httpx = "*"
mcp = "*"
aiohttp = "*"
schedule = "*"
opentelemetry-api = ">=1.20.0"
opentelemetry-sdk = ">=1.20.0"
opentelemetry-exporter-otlp = ">=1.20.0"
uvloop = {version = "*", markers = "sys_platform == 'linux' or sys_platform == 'darwin'"}
winloop = {version = "*", markers = "sys_platform == 'win32'"}

Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ numpy
schedule
uvloop; sys_platform == 'linux' or sys_platform == 'darwin' # linux or macos only
winloop; sys_platform == 'win32' # windows only
opentelemetry-api>=1.20.0
opentelemetry-sdk>=1.20.0
opentelemetry-exporter-otlp>=1.20.0
237 changes: 234 additions & 3 deletions swarms/structs/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@
handle_transforms,
)
from swarms.telemetry.main import log_agent_data

try:
from swarms.telemetry.opentelemetry_integration import (
trace_span,
record_metric,
log_event,
)
Comment on lines +80 to +84

Check failure

Code scanning / Pyre

Undefined import Error

Undefined import [21]: Could not find a module corresponding to import swarms.telemetry.opentelemetry_integration.
_OTEL_AVAILABLE = True
except ImportError:
_OTEL_AVAILABLE = False
trace_span = None
record_metric = lambda *args, **kwargs: None
log_event = lambda *args, **kwargs: None
from swarms.tools.base_tool import BaseTool
from swarms.tools.mcp_client_tools import (
execute_multiple_tools_on_multiple_mcp_servers_sync,
Expand Down Expand Up @@ -470,6 +483,7 @@
mode: Literal["interactive", "fast", "standard"] = "standard",
publish_to_marketplace: bool = False,
use_cases: Optional[List[Dict[str, Any]]] = None,
enable_telemetry: Optional[bool] = None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -623,6 +637,15 @@
self.mode = mode
self.publish_to_marketplace = publish_to_marketplace

if enable_telemetry is None:
import os
self.enable_telemetry = (
_OTEL_AVAILABLE
and os.getenv("OTEL_ENABLED", "true").lower() == "true"
)
else:
self.enable_telemetry = enable_telemetry and _OTEL_AVAILABLE

# Initialize transforms
if transforms is None:
self.transforms = None
Expand Down Expand Up @@ -1215,7 +1238,42 @@
agent(task="Summarize this document.", img="path/to/image.jpg")
agent(task="Analyze this image.", img="path/to/image.jpg", is_last=True)
"""
span_attributes = {}
if self.enable_telemetry and trace_span:
span_attributes = {
"agent.id": self.id,
"agent.name": self.agent_name,
"agent.model": self.get_current_model(),
"agent.max_loops": str(self.max_loops),
"task.length": len(str(task)) if task else 0,
}

span_context = (
trace_span(
f"agent.run.{self.agent_name}",
attributes=span_attributes,
)
if (self.enable_telemetry and trace_span)
else None
)

if span_context:
span_manager = span_context.__enter__()
else:
from contextlib import nullcontext
span_manager = nullcontext()

try:
if self.enable_telemetry and record_metric:
record_metric(
"agent.executions.total",
1,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
metric_type="counter",
)

self.check_if_no_prompt_then_autogenerate(task)

Expand Down Expand Up @@ -1250,6 +1308,17 @@
):
loop_count += 1

if self.enable_telemetry and record_metric:
record_metric(
"agent.loops",
1,
{
"agent_name": self.agent_name,
"loop_number": str(loop_count),
},
metric_type="counter",
)

# Handle RAG query every loop
if (
self.long_term_memory is not None
Expand Down Expand Up @@ -1457,15 +1526,63 @@

self.save()

# Output formatting based on output_type
return history_output_formatter(
if self.enable_telemetry and record_metric:
record_metric(
"agent.executions.success",
1,
{"agent_name": self.agent_name},
metric_type="counter",
)

result = history_output_formatter(
self.short_memory, type=self.output_type
)

if span_context:
try:
span_context.__exit__(None, None, None)
except Exception:
pass

return result

except Exception as error:
if self.enable_telemetry:
if record_metric:
record_metric(
"agent.executions.errors",
1,
{
"agent_name": self.agent_name,
"error_type": type(error).__name__,
},
metric_type="counter",
)
if log_event:
log_event(
f"Agent {self.agent_name} execution failed",
level="ERROR",
attributes={
"agent_id": self.id,
"error_type": type(error).__name__,
"error_message": str(error)[:200],
},
)

if span_context:
try:
span_context.__exit__(type(error), error, None)
except Exception:
pass

self._handle_run_error(error)

except KeyboardInterrupt as error:
if span_context:
try:
span_context.__exit__(type(error), error, None)
except Exception:
pass
self._handle_run_error(error)

def _handle_run_error(self, error: any):
Expand Down Expand Up @@ -2565,6 +2682,33 @@
if "is_last" in kwargs:
del kwargs["is_last"]

span_attrs = {}
if self.enable_telemetry and trace_span:
span_attrs = {
"agent.name": self.agent_name,
"agent.model": self.get_current_model(),
"agent.loop": str(current_loop),
"task.length": len(task),
"has_image": str(img is not None),
}

llm_span_context = (
trace_span(
"agent.llm.call",
attributes=span_attrs,
)
if (self.enable_telemetry and trace_span)
else None
)

if llm_span_context:
llm_span_manager = llm_span_context.__enter__()
else:
from contextlib import nullcontext
llm_span_manager = nullcontext()

start_time = time.time()

try:
# Set streaming parameter in LLM if streaming is enabled
if self.streaming_on and hasattr(self.llm, "stream"):
Expand Down Expand Up @@ -2640,11 +2784,57 @@
# Restore original stream setting
self.llm.stream = original_stream

# Return the complete response for further processing
if self.enable_telemetry and record_metric:
execution_time = time.time() - start_time
record_metric(
"agent.llm.call.duration",
execution_time,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
)
record_metric(
"agent.llm.calls.total",
1,
{"model": self.get_current_model()},
metric_type="counter",
)

if llm_span_context:
try:
llm_span_context.__exit__(None, None, None)
except Exception:
pass

return complete_response
else:
# Restore original stream setting
self.llm.stream = original_stream

if self.enable_telemetry and record_metric:
execution_time = time.time() - start_time
record_metric(
"agent.llm.call.duration",
execution_time,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
)
record_metric(
"agent.llm.calls.total",
1,
{"model": self.get_current_model()},
metric_type="counter",
)

if llm_span_context:
try:
llm_span_context.__exit__(None, None, None)
except Exception:
pass

return streaming_response
else:
args = {
Expand All @@ -2656,6 +2846,29 @@

out = self.llm.run(**args, **kwargs)

if self.enable_telemetry and record_metric:
execution_time = time.time() - start_time
record_metric(
"agent.llm.call.duration",
execution_time,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
},
)
record_metric(
"agent.llm.calls.total",
1,
{"model": self.get_current_model()},
metric_type="counter",
)

if llm_span_context:
try:
llm_span_context.__exit__(None, None, None)
except Exception:
pass

return out

except (
Expand All @@ -2665,6 +2878,24 @@
AuthenticationError,
Exception,
) as e:
if self.enable_telemetry:
if record_metric:
record_metric(
"agent.llm.call.errors",
1,
{
"agent_name": self.agent_name,
"model": self.get_current_model(),
"error_type": type(e).__name__,
},
metric_type="counter",
)
if llm_span_context:
try:
llm_span_context.__exit__(type(e), e, None)
except Exception:
pass

logger.error(
f"Error calling LLM with model '{self.get_current_model()}': {e}. "
f"Task: {task}, Args: {args}, Kwargs: {kwargs} Traceback: {traceback.format_exc()}"
Expand Down
Loading
Loading