Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ dev = [

langfuse = ["langfuse>=2.0.0,<3"]
otel = ["opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0"]
langchain = [
"langchain>=0.3.0",
"langchain-aws>=0.2.0",
"langgraph>=0.2.0",
"openinference-instrumentation-langchain>=0.1.0",
"opentelemetry-instrumentation-langchain>=0.40.0,<0.50.0",
]

[tool.ruff]
line-length = 120
Expand All @@ -54,7 +61,7 @@ include = ["src/**/*.py", "tests/**/*.py", "tests_integ/**/*.py"]
[tool.hatch.envs.hatch-test]
installer = "uv"
extra-args = ["-n", "auto", "-vv"]
features = ["otel", "langfuse"]
features = ["otel", "langfuse", "langchain"]
dependencies = [
"pytest>=8.0.0,<10.0.0",
"pytest-cov>=7.0.0,<8.0.0",
Expand Down
18 changes: 17 additions & 1 deletion src/strands_evals/mappers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
"""Converters for transforming telemetry data to Session format."""

from .cloudwatch_parser import CloudWatchLogsParser, parse_cloudwatch_logs
from .cloudwatch_session_mapper import CloudWatchSessionMapper
from .langchain_otel_session_mapper import LangChainOtelSessionMapper
from .openinference_session_mapper import OpenInferenceSessionMapper
from .session_mapper import SessionMapper
from .strands_in_memory_session_mapper import GenAIConventionVersion, StrandsInMemorySessionMapper
from .utils import detect_otel_mapper, get_scope_name, readable_spans_to_dicts

__all__ = ["CloudWatchSessionMapper", "GenAIConventionVersion", "SessionMapper", "StrandsInMemorySessionMapper"]
__all__ = [
"CloudWatchLogsParser",
"CloudWatchSessionMapper",
"GenAIConventionVersion",
"LangChainOtelSessionMapper",
"OpenInferenceSessionMapper",
"SessionMapper",
"StrandsInMemorySessionMapper",
"detect_otel_mapper",
"get_scope_name",
"parse_cloudwatch_logs",
"readable_spans_to_dicts",
]
226 changes: 226 additions & 0 deletions src/strands_evals/mappers/cloudwatch_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""Parser for normalizing CloudWatch/ADOT logs to standard span format.

This parser converts raw CloudWatch logs (which may contain spans, events, or both)
into the normalized format expected by mappers - matching the output of
`readable_spans_to_dicts()`.

CloudWatch logs can have:
1. SPAN records (with startTimeUnixNano, endTimeUnixNano) + EVENT records
2. Only EVENT records (common for Strands telemetry)

The parser handles both cases, creating synthetic spans from events when needed.
"""

import logging
from collections import defaultdict

logger = logging.getLogger(__name__)

DEFAULT_SESSION_ID = "default_session"


class CloudWatchLogsParser:
"""Normalizes CloudWatch/ADOT logs to standard span dict format.

Converts raw CloudWatch logs to match the output format of `readable_spans_to_dicts()`:
- snake_case field names (trace_id, span_id, parent_span_id)
- Events attached to spans as span_events[]
- Consistent structure regardless of input format

Example:
>>> raw_logs = cloudwatch_provider.fetch_logs()
>>> parser = CloudWatchLogsParser(raw_logs)
>>> normalized = parser.parse()
>>> mapper = detect_otel_mapper(normalized)
>>> session = mapper.map_to_session(normalized, session_id)
"""

def __init__(self, raw_logs: list[dict]):
"""Initialize parser with raw CloudWatch logs.

Args:
raw_logs: List of raw JSON records from CloudWatch Logs Insights
"""
self.raw_logs = raw_logs

def parse(self) -> list[dict]:
"""Parse CloudWatch logs to normalized span format.

Returns:
List of normalized span dicts matching readable_spans_to_dicts() format
"""
if not self.raw_logs:
return []

# Separate spans from events
spans_by_id: dict[str, dict] = {}
events_by_span_id: dict[str, list[dict]] = defaultdict(list)

for record in self.raw_logs:
if self._is_event(record):
event = self._normalize_event(record)
if event:
span_id = event.get("span_id", "")
events_by_span_id[span_id].append(event)
elif self._is_span(record):
span = self._normalize_span(record)
if span:
spans_by_id[span["span_id"]] = span
# Skip other log records (application logs, etc.)

# If we have spans, associate events with them
if spans_by_id:
for span_id, span in spans_by_id.items():
span["span_events"] = events_by_span_id.get(span_id, [])
return list(spans_by_id.values())

# No spans - create synthetic spans from events
# Group events by span_id to create one span per event group
result = []
for span_id, events in events_by_span_id.items():
if not events:
continue
# Use first event to create synthetic span
first_event = events[0]
synthetic_span = self._create_synthetic_span(span_id, events, first_event)
result.append(synthetic_span)

return result

def _is_event(self, record: dict) -> bool:
"""Check if record is an OTEL event (has event.name attribute)."""
if not isinstance(record, dict):
return False

# Check various event name locations
if "EventName" in record or "eventName" in record:
return True

attrs = record.get("attributes", {})
if isinstance(attrs, dict) and "event.name" in attrs:
return True

return False

def _is_span(self, record: dict) -> bool:
"""Check if record is an OTEL span (has start/end time)."""
if not isinstance(record, dict):
return False

# Spans have startTimeUnixNano or start_time
has_start = "startTimeUnixNano" in record or "start_time" in record
has_end = "endTimeUnixNano" in record or "end_time" in record

return has_start and has_end

def _normalize_event(self, record: dict) -> dict | None:
"""Normalize a CloudWatch event record."""
try:
# Extract event name from various locations
event_name = (
record.get("EventName") or record.get("eventName") or record.get("attributes", {}).get("event.name", "")
)

# Extract span_id (camelCase in CloudWatch)
span_id = record.get("spanId") or record.get("span_id", "")

# Extract timestamp
timestamp = (
record.get("timeUnixNano") or record.get("time_unix_nano") or record.get("observedTimeUnixNano") or 0
)

return {
"event_name": event_name,
"span_id": span_id,
"timestamp": timestamp,
"attributes": record.get("attributes", {}),
"body": record.get("body", {}),
}
except Exception as e:
logger.warning(f"Failed to normalize event: {e}")
return None

def _normalize_span(self, record: dict) -> dict | None:
"""Normalize a CloudWatch span record to standard format."""
try:
# Normalize field names (camelCase → snake_case)
trace_id = record.get("traceId") or record.get("trace_id", "")
span_id = record.get("spanId") or record.get("span_id", "")
parent_span_id = record.get("parentSpanId") or record.get("parent_span_id")

# Normalize timestamps
start_time = record.get("startTimeUnixNano") or record.get("start_time", 0)
end_time = record.get("endTimeUnixNano") or record.get("end_time", 0)

# Extract scope
scope = record.get("scope", {})
if not isinstance(scope, dict):
scope = {"name": "", "version": ""}

# Extract status
status = record.get("status", {})
if not isinstance(status, dict):
status = {"code": "UNSET"}

return {
"trace_id": trace_id,
"span_id": span_id,
"parent_span_id": parent_span_id,
"name": record.get("name", ""),
"start_time": start_time,
"end_time": end_time,
"attributes": record.get("attributes", {}),
"scope": {
"name": scope.get("name", ""),
"version": scope.get("version", ""),
},
"status": {"code": status.get("code", "UNSET")},
"span_events": [], # Will be populated later
}
except Exception as e:
logger.warning(f"Failed to normalize span: {e}")
return None

def _create_synthetic_span(self, span_id: str, events: list[dict], first_event: dict) -> dict:
"""Create a synthetic span from events when no span records exist."""
# Get trace_id from the raw logs (need to find matching record)
trace_id = ""
for record in self.raw_logs:
if record.get("spanId") == span_id or record.get("span_id") == span_id:
trace_id = record.get("traceId") or record.get("trace_id", "")
break

# Get scope from event name
event_name = first_event.get("event_name", "")
scope_name = first_event.get("attributes", {}).get("event.name", event_name)

# Use event timestamp for span times
timestamp = first_event.get("timestamp", 0)

return {
"trace_id": trace_id,
"span_id": span_id,
"parent_span_id": None,
"name": event_name,
"start_time": timestamp,
"end_time": timestamp,
"attributes": first_event.get("attributes", {}),
"scope": {
"name": scope_name,
"version": "",
},
"status": {"code": "OK"},
"span_events": events,
}


def parse_cloudwatch_logs(raw_logs: list[dict]) -> list[dict]:
"""Convenience function to parse CloudWatch logs.

Args:
raw_logs: Raw CloudWatch log records

Returns:
Normalized span dicts matching readable_spans_to_dicts() format
"""
return CloudWatchLogsParser(raw_logs).parse()
Loading