Skip to content

Commit d5a8b87

Browse files
committed
feat: Add Azure Data Explorer integration for enhanced logging capabilities
1 parent 0bc21bd commit d5a8b87

5 files changed

Lines changed: 252 additions & 19 deletions

File tree

src/agents/observability/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@
103103
WORKSPACE_ID_HEADER,
104104
)
105105

106+
# Azure Data Explorer integration
107+
from src.agents.observability.adx import ADXHandler
108+
106109

107110
__all__ = [
108111
# Context - Classes
@@ -154,4 +157,6 @@
154157
"CORRELATION_ID_HEADER",
155158
"CONVERSATION_ID_HEADER",
156159
"WORKSPACE_ID_HEADER",
160+
# Azure Data Explorer
161+
"ADXHandler",
157162
]

src/agents/observability/adx.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
"""
4+
Azure Data Explorer (Kusto) integration for service logs.
5+
"""
6+
7+
from __future__ import annotations
8+
import json
9+
import logging
10+
import os
11+
import threading
12+
from collections import deque
13+
from datetime import datetime, timezone
14+
from typing import Any, Optional, Deque
15+
from src.agents.observability import get_logger
16+
17+
try:
18+
from src.modules.send_telemetry_data import TelemetryDataSender
19+
except ImportError:
20+
from modules.send_telemetry_data import TelemetryDataSender
21+
22+
logger = get_logger(__name__)
23+
24+
25+
class ADXHandler(logging.Handler):
26+
"""
27+
Python logging handler that ships logs to Azure Data Explorer.
28+
"""
29+
30+
def __init__(
31+
self,
32+
cluster_fqdn: str,
33+
database_name: str,
34+
table_name: str,
35+
client_id: str,
36+
batch_size: int = 100,
37+
flush_interval: float = 5.0,
38+
max_queue_size: int = 10000,
39+
) -> None:
40+
"""
41+
Initialize ADX handler.
42+
43+
:param cluster_fqdn: ADX cluster FQDN (e.g., https://cluster.region.kusto.windows.net)
44+
:param database_name: ADX database name
45+
:param table_name: Table name for logs
46+
:param client_id: Managed identity client ID for authentication
47+
:param batch_size: Records to batch before sending
48+
:param flush_interval: Seconds between auto-flushes
49+
:param max_queue_size: Max queue size before dropping oldest
50+
"""
51+
super().__init__()
52+
self.cluster_fqdn = cluster_fqdn
53+
self.database_name = database_name
54+
self.table_name = table_name
55+
self.client_id = client_id
56+
self.batch_size = batch_size
57+
self.flush_interval = flush_interval
58+
self._queue: Deque[dict[str, Any]] = deque(maxlen=max_queue_size)
59+
self._lock = threading.Lock()
60+
self._shutdown = threading.Event()
61+
self._flush_thread = threading.Thread(target=self._flush_loop, daemon=True)
62+
self._flush_thread.start()
63+
64+
def send(
65+
self,
66+
data: dict[str, Any] | list[dict[str, Any]],
67+
table_name: Optional[str] = None,
68+
) -> bool:
69+
"""
70+
Send data to Azure Data Explorer in a single ingestion call.
71+
72+
:param data: Single record or list of records to send
73+
:type data: dict | list[dict]
74+
:param table_name: Override table name for this call
75+
:type table_name: Optional[str]
76+
:returns: True if successful
77+
:rtype: bool
78+
"""
79+
records = [data] if isinstance(data, dict) else data
80+
if not records:
81+
return True
82+
83+
try:
84+
TelemetryDataSender(
85+
module_params={
86+
"test_group_json_data": records[0] if len(records) == 1 else records,
87+
"telemetry_data_destination": "azuredataexplorer",
88+
"adx_cluster_fqdn": self.cluster_fqdn,
89+
"adx_database_name": self.database_name,
90+
"adx_client_id": self.client_id,
91+
"telemetry_table_name": table_name or self.table_name,
92+
"workspace_directory": "/tmp",
93+
}
94+
).send_telemetry_data_to_azuredataexplorer(json.dumps(records))
95+
return True
96+
except Exception as e:
97+
logger.error(f"Failed to send to ADX: {e}")
98+
return False
99+
100+
def emit(self, record: logging.LogRecord) -> None:
101+
"""Emit a log record to the queue."""
102+
try:
103+
log_entry = self._format_record(record)
104+
with self._lock:
105+
self._queue.append(log_entry)
106+
if len(self._queue) >= self.batch_size:
107+
self._flush_queue()
108+
except Exception:
109+
self.handleError(record)
110+
111+
def _format_record(self, record: logging.LogRecord) -> dict[str, Any]:
112+
"""Format a log record as a dictionary."""
113+
entry: dict[str, Any] = {
114+
"timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
115+
"level": record.levelname,
116+
"logger": record.name,
117+
"message": record.getMessage(),
118+
}
119+
120+
if record.exc_info and self.formatter:
121+
entry["exception"] = self.formatter.formatException(record.exc_info)
122+
123+
standard_attrs = {
124+
"name",
125+
"msg",
126+
"args",
127+
"created",
128+
"filename",
129+
"funcName",
130+
"levelname",
131+
"levelno",
132+
"lineno",
133+
"module",
134+
"msecs",
135+
"pathname",
136+
"process",
137+
"processName",
138+
"relativeCreated",
139+
"stack_info",
140+
"exc_info",
141+
"exc_text",
142+
"thread",
143+
"threadName",
144+
"message",
145+
"asctime",
146+
}
147+
for key, value in record.__dict__.items():
148+
if key not in standard_attrs and not key.startswith("_"):
149+
if isinstance(value, (str, int, float, bool, type(None))):
150+
entry[key] = value
151+
else:
152+
try:
153+
entry[key] = str(value)
154+
except Exception:
155+
pass
156+
157+
return entry
158+
159+
def _flush_queue(self) -> None:
160+
"""Flush queued records (must hold lock)."""
161+
if not self._queue:
162+
return
163+
164+
records = list(self._queue)
165+
self._queue.clear()
166+
167+
threading.Thread(
168+
target=self._send_records,
169+
args=(records,),
170+
daemon=True,
171+
).start()
172+
173+
def _send_records(self, records: list[dict[str, Any]]) -> None:
174+
"""Send records to ADX."""
175+
if self.send(records, self.table_name):
176+
logger.debug(f"Sent {len(records)} logs to ADX")
177+
178+
def _flush_loop(self) -> None:
179+
"""Background loop for periodic flushing."""
180+
while not self._shutdown.wait(self.flush_interval):
181+
with self._lock:
182+
self._flush_queue()
183+
184+
def flush(self) -> None:
185+
"""Flush all pending logs synchronously."""
186+
with self._lock:
187+
records = list(self._queue)
188+
self._queue.clear()
189+
190+
if records:
191+
self.send(records, self.table_name)
192+
193+
def close(self) -> None:
194+
"""Close handler and flush remaining logs."""
195+
self._shutdown.set()
196+
self.flush()
197+
super().close()
198+
199+
@classmethod
200+
def from_env(
201+
cls,
202+
table_name: Optional[str] = None,
203+
batch_size: int = 100,
204+
flush_interval: float = 5.0,
205+
) -> Optional["ADXHandler"]:
206+
"""
207+
Create handler from environment variables.
208+
209+
:param table_name: Override table name
210+
:param batch_size: Records to batch before sending
211+
:param flush_interval: Seconds between auto-flushes
212+
:returns: ADXHandler if configured, None otherwise
213+
"""
214+
cluster = os.environ.get("ADX_CLUSTER_FQDN")
215+
database = os.environ.get("ADX_DATABASE_NAME")
216+
table = table_name or os.environ.get("ADX_TABLE_NAME", "SAPQAServiceLogs")
217+
client_id = os.environ.get("ADX_CLIENT_ID")
218+
219+
if not cluster or not database or not client_id:
220+
logger.info("ADX not configured - handler not created")
221+
return None
222+
223+
logger.info(f"Creating ADX handler: cluster={cluster}, database={database}, table={table}")
224+
return cls(
225+
cluster_fqdn=cluster,
226+
database_name=database,
227+
table_name=table,
228+
client_id=client_id,
229+
batch_size=batch_size,
230+
flush_interval=flush_interval,
231+
)

src/agents/observability/logger.py

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33
"""
4-
Structured logging with OOP design.
5-
6-
This module provides:
7-
- LogFormatter: Abstract base for log formatters (JSON, Console)
8-
- StructuredLogger: Logger wrapper with context injection and event support
9-
- LoggerFactory: Factory for creating configured loggers
10-
11-
Design patterns used:
12-
- Factory: LoggerFactory creates configured loggers
13-
- Strategy: Formatters implement different output strategies
14-
- Adapter: StructuredLogger adapts stdlib logging
4+
Structured logging for agents
155
"""
16-
176
from __future__ import annotations
18-
197
import json
208
import logging
219
import os
@@ -25,7 +13,6 @@
2513
from logging.handlers import RotatingFileHandler
2614
from pathlib import Path
2715
from typing import Any, Optional, Union
28-
2916
from src.agents.observability.context import ObservabilityContextManager
3017
from src.agents.observability.events import (
3118
ServiceEvent,

src/api/app.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
initialize_logging,
2020
get_logger,
2121
add_observability_middleware,
22+
ADXHandler,
2223
)
2324
from src.agents.persistence import ConversationManager
2425
from src.agents.execution import JobStore
@@ -55,6 +56,11 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
5556
"""Application lifespan manager for startup/shutdown."""
5657
logger.info("Initializing application...")
5758

59+
adx_handler = ADXHandler.from_env()
60+
if adx_handler:
61+
logging.getLogger().addHandler(adx_handler)
62+
logger.info("ADX logging enabled - logs will be shipped to Azure Data Explorer")
63+
5864
kernel = None
5965
try:
6066
kernel = create_kernel()
@@ -132,6 +138,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
132138

133139
logger.info("Shutting down application...")
134140

141+
if adx_handler:
142+
adx_handler.close()
143+
logger.info("ADX handler closed - logs flushed")
144+
135145
if scheduler_service:
136146
try:
137147
await scheduler_service.stop()

src/modules/send_telemetry_data.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,17 +235,17 @@ def send_telemetry_data_to_azuredataexplorer(self, telemetry_json_data: str) ->
235235
"""
236236
Sends telemetry data to Azure Data Explorer.
237237
238-
:param telemetry_json_data: The JSON data to be sent to Azure Data Explorer.
238+
:param telemetry_json_data: JSON string containing a single record (dict) or
239+
multiple records (list of dicts) to send to Azure Data Explorer.
239240
:type telemetry_json_data: str
240241
:return: The response from the Kusto API.
241242
:rtype: Any
242243
"""
243244
import pandas as pd
244245

245-
telemetry_json_dict = json.loads(telemetry_json_data)
246-
data_frame = pd.DataFrame(
247-
[telemetry_json_dict.values()], columns=telemetry_json_dict.keys()
248-
)
246+
telemetry_data = json.loads(telemetry_json_data)
247+
records = telemetry_data if isinstance(telemetry_data, list) else [telemetry_data]
248+
data_frame = pd.DataFrame(records)
249249
ingestion_properties = IngestionProperties(
250250
database=self.module_params["adx_database_name"],
251251
table=self.module_params["telemetry_table_name"],

0 commit comments

Comments
 (0)