Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ mcp
numpy
schedule
uvloop; sys_platform == 'linux' or sys_platform == 'darwin' # linux or macos only
winloop; sys_platform == 'win32' # windows only
winloop; sys_platform == 'win32' # windows only
# Optional: OpenTelemetry for improved telemetry/tracing. Install if you want tracing export
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
14 changes: 14 additions & 0 deletions swarms/structs/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,20 @@
),
)

# Telemetry: log agent creation/initialization to swarms telemetry platform.

Check failure

Code scanning / Pyre

Incompatible parameter type Error

Incompatible parameter type [6]: In call add_prompt_to_marketplace, for argument use_cases, expected List[Dict[str, str]] but got Optional[List[Dict[str, typing.Any]]].
# This can be disabled by setting environment variable `SWARMS_TELEMETRY_ENABLED` to
# `0`, `false`, or `no`.
try:
telemetry_flag = os.getenv("SWARMS_TELEMETRY_ENABLED", "true").lower()
if telemetry_flag in ("1", "true", "yes"):
try:
log_agent_data(self.to_dict())
except Exception:
pass
except Exception:
# Do not let telemetry errors prevent agent initialization
pass

def _get_agent_workspace_dir(self) -> str:
"""
Get the agent-specific workspace directory path.
Expand Down
165 changes: 145 additions & 20 deletions swarms/telemetry/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
import datetime
import hashlib
import os
import platform
import socket
import threading
import uuid
from typing import Any, Dict

import psutil
import requests
from functools import lru_cache

# Optional OpenTelemetry support (best-effort)
try:
from opentelemetry import trace

Check failure

Code scanning / Pyre

Undefined import Error

Undefined import [21]: Could not find a module corresponding to import opentelemetry.
from opentelemetry.sdk.resources import Resource

Check failure

Code scanning / Pyre

Undefined import Error

Undefined import [21]: Could not find a module corresponding to import opentelemetry.sdk.resources.
from opentelemetry.sdk.trace import TracerProvider

Check failure

Code scanning / Pyre

Undefined import Error

Undefined import [21]: Could not find a module corresponding to import opentelemetry.sdk.trace.
from opentelemetry.sdk.trace.export import BatchSpanProcessor

Check failure

Code scanning / Pyre

Undefined import Error

Undefined import [21]: Could not find a module corresponding to import opentelemetry.sdk.trace.export.
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
Comment on lines +20 to +22

Check failure

Code scanning / Pyre

Undefined import Error

Undefined import [21]: Could not find a module corresponding to import opentelemetry.exporter.otlp.proto.grpc.trace_exporter.

_OPENTELEMETRY_AVAILABLE = True
except Exception:
_OPENTELEMETRY_AVAILABLE = False


# Helper functions
def generate_user_id():
Expand Down Expand Up @@ -92,43 +108,152 @@
return system_data


def _sanitize_agent_payload(data_dict: dict) -> Dict[str, Any]:
"""Return a small, non-sensitive representation of an agent for telemetry.
Avoid shipping secrets (API keys, full llm args, etc.).
"""
if not isinstance(data_dict, dict):
return {"note": "invalid-agent-payload"}

def _safe_list(v):
if isinstance(v, (list, tuple)):
return list(v)[:50]
return v

# Try common attribute names used in Agent.to_dict()
agent_name = data_dict.get("agent_name") or data_dict.get("name")
description = data_dict.get("agent_description") or data_dict.get("description")
tags = _safe_list(data_dict.get("tags") or data_dict.get("capabilities") or [])
capabilities = _safe_list(data_dict.get("capabilities") or [])
tools = data_dict.get("tools") or data_dict.get("tools_list_dictionary") or []
handoffs = data_dict.get("handoffs")

# Extract handoff agent names if possible
handoff_names = []
try:
if isinstance(handoffs, dict):
handoff_names = list(handoffs.keys())
elif isinstance(handoffs, (list, tuple)):
for h in handoffs:
if isinstance(h, dict) and "agent_name" in h:
handoff_names.append(h.get("agent_name"))
else:
handoff_names.append(getattr(h, "agent_name", str(h)))
except Exception:
handoff_names = []

sanitized = {
"id": data_dict.get("id"),
"agent_name": agent_name,
"description": (description[:240] if isinstance(description, str) else description),
"role": data_dict.get("role"),
"tags": tags,
"capabilities": capabilities,
"model_name": data_dict.get("model_name") or data_dict.get("model"),
"tools_count": len(tools) if tools is not None else 0,
"handoff_agents": handoff_names,
"workspace_dir_present": bool(data_dict.get("workspace_dir")),
"telemetry_version": "v1",
"reported_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
}

return sanitized


def _log_agent_data(data_dict: dict):
"""Simple function to log agent data using requests library"""

url = "https://swarms.world/api/get-agents/log-agents"
# Allow telemetry endpoint and API key to be configured via environment
url = os.getenv(
"SWARMS_TELEMETRY_URL", "https://swarms.world/api/get-agents/log-agents"
)
api_key = os.getenv("SWARMS_TELEMETRY_API_KEY")

# Build a structured, sanitized telemetry payload for agents
agent_payload = _sanitize_agent_payload(data_dict)

log = {
"data": data_dict,
"agent": agent_payload,
"system_data": get_comprehensive_system_info(),
"timestamp": datetime.datetime.now(
datetime.timezone.utc
).isoformat(),
}

payload = {
"data": log,
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
}

key = "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24"
payload = {"data": log}

headers = {
"Content-Type": "application/json",
"Authorization": key,
}

response = requests.post(
url, json=payload, headers=headers, timeout=10
)
headers = {"Content-Type": "application/json"}
if api_key:
# Allow both raw key or already prefixed Bearer
headers["Authorization"] = api_key if api_key.startswith("Bearer ") else f"Bearer {api_key}"

# If OpenTelemetry is available, emit an event/span so observability systems
# can pick up and index agent creation/updates quickly. This is best-effort.
try:
if response.status_code == 200:
return
if _OPENTELEMETRY_AVAILABLE:
tracer = _init_tracer()
with tracer.start_as_current_span("swarms.agent.telemetry") as span:
# Add attributes for quick search in tracing backends
try:
span.set_attribute("agent.id", str(data_dict.get("id")))
span.set_attribute("agent.name", str(data_dict.get("agent_name") or data_dict.get("name")))
span.set_attribute(
"agent.type", str(data_dict.get("type", "unknown"))
)
span.add_event("agent.logged", attributes={"payload": "shallow"})
except Exception:
pass

except Exception:
# OpenTelemetry failures should not block normal telemetry
pass

# Send HTTP POST in background to avoid blocking agent creation paths
def _post():
try:
response = requests.post(url, json=payload, headers=headers, timeout=8)
# Silently ignore non-200 responses, don't raise in library code
return response.status_code
except Exception:
return None

t = threading.Thread(target=_post, daemon=True)
t.start()


def log_agent_data(data_dict: dict):
try:
_log_agent_data(data_dict)
except Exception:
pass


def _init_tracer():
"""Initialize and return an OpenTelemetry tracer (cached)."""
# Use a simple caching approach on the module
global _TRACER
try:
_TRACER # type: ignore
return _TRACER
except Exception:
pass

if not _OPENTELEMETRY_AVAILABLE:
# Fallback to noop tracer
return trace.get_tracer(__name__)

try:
# Configure OTLP exporter if endpoint provided
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
resource = Resource.create({"service.name": "swarms-telemetry"})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

if otlp_endpoint:
exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
span_processor = BatchSpanProcessor(exporter)
provider.add_span_processor(span_processor)

_TRACER = trace.get_tracer(__name__)
return _TRACER
except Exception:
return trace.get_tracer(__name__)
Loading