Skip to content

Commit be78fad

Browse files
author
gensecai-dev
committed
Fix 10 broken MCP tools calling non-existent Wazuh Manager API endpoints
- get_agents: filter None params, map agent_id to agents_list - check_agent_health: replace /agents/{id}/health with real agent query - get_agent_configuration: replace /agents/{id}/config with agent+group config - search_security_events: route through Wazuh Indexer instead of /security/events - get_alert_summary: aggregate from Indexer instead of /alerts/summary - analyze_alert_patterns: aggregate from Indexer instead of /alerts/patterns - analyze_security_threat: search Indexer alerts instead of /security/threat/analyze - check_ioc_reputation: search Indexer alert history instead of /security/ioc/reputation - perform_risk_assessment: aggregate real agent+vuln data instead of /security/risk - get_top_security_threats: count alert rules from Indexer instead of /security/threats/top - generate_security_report: aggregate from real endpoints instead of /security/reports/generate - run_compliance_check: use Wazuh SCA endpoint instead of /security/compliance/check
1 parent 24073a1 commit be78fad

File tree

1 file changed

+267
-34
lines changed

1 file changed

+267
-34
lines changed

src/wazuh_mcp_server/api/wazuh_client.py

Lines changed: 267 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""Wazuh API client optimized for Wazuh 4.8.0 to 4.14.1 compatibility with latest features."""
22

33
import asyncio
4+
import json
45
import logging
56
import time
67
from collections import deque
8+
from datetime import datetime, timedelta, timezone
79
from typing import Any, Dict, Optional, Tuple
810

911
import httpx
@@ -14,6 +16,9 @@
1416

1517
logger = logging.getLogger(__name__)
1618

19+
# Time range to hours mapping for indexer-based queries
20+
_TIME_RANGE_HOURS = {"1h": 1, "6h": 6, "12h": 12, "24h": 24, "7d": 168, "30d": 720}
21+
1722

1823
class WazuhClient:
1924
"""Simplified Wazuh API client with rate limiting, circuit breaker, and retry logic."""
@@ -130,9 +135,19 @@ async def get_alerts(self, **params) -> Dict[str, Any]:
130135
timestamp_end=params.get("timestamp_end"),
131136
)
132137

133-
async def get_agents(self, **params) -> Dict[str, Any]:
138+
async def get_agents(self, agent_id=None, status=None, limit=100, **params) -> Dict[str, Any]:
134139
"""Get agents from Wazuh."""
135-
return await self._request("GET", "/agents", params=params)
140+
clean_params: Dict[str, Any] = {}
141+
if agent_id:
142+
clean_params["agents_list"] = agent_id
143+
if status:
144+
clean_params["status"] = status
145+
if limit:
146+
clean_params["limit"] = limit
147+
for k, v in params.items():
148+
if v is not None:
149+
clean_params[k] = v
150+
return await self._request("GET", "/agents", params=clean_params)
136151

137152
async def get_vulnerabilities(self, **params) -> Dict[str, Any]:
138153
"""
@@ -408,28 +423,106 @@ async def get_manager_info(self) -> Dict[str, Any]:
408423
cache_key = "manager_info"
409424
return await self._get_cached(cache_key, "/")
410425

426+
def _time_range_to_start(self, time_range: str) -> str:
427+
"""Convert a time_range string like '24h' or '7d' to an ISO 8601 start timestamp."""
428+
hours = _TIME_RANGE_HOURS.get(time_range, 24)
429+
return (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
430+
411431
async def get_alert_summary(self, time_range: str, group_by: str) -> Dict[str, Any]:
412-
"""Get alert summary grouped by field."""
413-
params = {"time_range": time_range, "group_by": group_by}
414-
return await self._request("GET", "/alerts/summary", params=params)
432+
"""Get alert summary — aggregated from Wazuh Indexer."""
433+
if not self._indexer_client:
434+
raise IndexerNotConfiguredError()
435+
start = self._time_range_to_start(time_range)
436+
result = await self._indexer_client.get_alerts(limit=1000, timestamp_start=start)
437+
alerts = result.get("data", {}).get("affected_items", [])
438+
groups: Dict[str, int] = {}
439+
for alert in alerts:
440+
value: Any = alert
441+
for part in group_by.split("."):
442+
value = value.get(part, {}) if isinstance(value, dict) else "unknown"
443+
key = str(value) if not isinstance(value, dict) else "unknown"
444+
groups[key] = groups.get(key, 0) + 1
445+
return {
446+
"data": {
447+
"time_range": time_range,
448+
"group_by": group_by,
449+
"total_alerts": len(alerts),
450+
"groups": groups,
451+
}
452+
}
415453

416454
async def analyze_alert_patterns(self, time_range: str, min_frequency: int) -> Dict[str, Any]:
417-
"""Analyze alert patterns."""
418-
params = {"time_range": time_range, "min_frequency": min_frequency}
419-
return await self._request("GET", "/alerts/patterns", params=params)
455+
"""Analyze alert patterns — aggregated from Wazuh Indexer."""
456+
if not self._indexer_client:
457+
raise IndexerNotConfiguredError()
458+
start = self._time_range_to_start(time_range)
459+
result = await self._indexer_client.get_alerts(limit=1000, timestamp_start=start)
460+
alerts = result.get("data", {}).get("affected_items", [])
461+
rule_counts: Dict[str, Dict[str, Any]] = {}
462+
for alert in alerts:
463+
rule = alert.get("rule", {})
464+
rule_id = rule.get("id", "unknown")
465+
if rule_id not in rule_counts:
466+
rule_counts[rule_id] = {
467+
"count": 0,
468+
"description": rule.get("description", ""),
469+
"level": rule.get("level", 0),
470+
}
471+
rule_counts[rule_id]["count"] += 1
472+
patterns = [
473+
{"rule_id": k, **v} for k, v in rule_counts.items() if v["count"] >= min_frequency
474+
]
475+
patterns.sort(key=lambda x: x["count"], reverse=True)
476+
return {
477+
"data": {
478+
"time_range": time_range,
479+
"min_frequency": min_frequency,
480+
"patterns": patterns,
481+
"total_patterns": len(patterns),
482+
}
483+
}
420484

421485
async def search_security_events(self, query: str, time_range: str, limit: int) -> Dict[str, Any]:
422-
"""Search security events."""
423-
params = {"q": query, "time_range": time_range, "limit": limit}
424-
return await self._request("GET", "/security/events", params=params)
486+
"""Search security events via the Wazuh Indexer."""
487+
if not self._indexer_client:
488+
raise IndexerNotConfiguredError()
489+
start = self._time_range_to_start(time_range)
490+
return await self._indexer_client.get_alerts(limit=limit, timestamp_start=start)
425491

426492
async def get_running_agents(self) -> Dict[str, Any]:
427493
"""Get running agents."""
428494
return await self._request("GET", "/agents", params={"status": "active"})
429495

430496
async def check_agent_health(self, agent_id: str) -> Dict[str, Any]:
431-
"""Check agent health."""
432-
return await self._request("GET", f"/agents/{agent_id}/health")
497+
"""Check agent health by fetching agent info and extracting status."""
498+
result = await self._request(
499+
"GET",
500+
"/agents",
501+
params={
502+
"agents_list": agent_id,
503+
"select": "id,name,status,ip,os.name,os.version,version,lastKeepAlive,dateAdd,group,node_name",
504+
},
505+
)
506+
agents = result.get("data", {}).get("affected_items", [])
507+
if not agents:
508+
raise ValueError(f"Agent {agent_id} not found")
509+
agent = agents[0]
510+
status = agent.get("status", "unknown")
511+
return {
512+
"data": {
513+
"agent_id": agent.get("id"),
514+
"name": agent.get("name"),
515+
"status": status,
516+
"health": "healthy" if status == "active" else "unhealthy",
517+
"ip": agent.get("ip"),
518+
"os": agent.get("os", {}),
519+
"version": agent.get("version"),
520+
"last_keep_alive": agent.get("lastKeepAlive"),
521+
"date_add": agent.get("dateAdd"),
522+
"group": agent.get("group"),
523+
"node_name": agent.get("node_name"),
524+
}
525+
}
433526

434527
async def get_agent_processes(self, agent_id: str, limit: int) -> Dict[str, Any]:
435528
"""Get agent processes."""
@@ -440,8 +533,26 @@ async def get_agent_ports(self, agent_id: str, limit: int) -> Dict[str, Any]:
440533
return await self._request("GET", f"/syscollector/{agent_id}/ports", params={"limit": limit})
441534

442535
async def get_agent_configuration(self, agent_id: str) -> Dict[str, Any]:
443-
"""Get agent configuration."""
444-
return await self._request("GET", f"/agents/{agent_id}/config")
536+
"""Get agent configuration by fetching agent info and its group config."""
537+
agent_result = await self._request(
538+
"GET",
539+
"/agents",
540+
params={"agents_list": agent_id, "select": "id,name,group,configSum,mergedSum,status,version"},
541+
)
542+
agents = agent_result.get("data", {}).get("affected_items", [])
543+
if not agents:
544+
raise ValueError(f"Agent {agent_id} not found")
545+
agent = agents[0]
546+
config_data: Dict[str, Any] = {"agent": agent, "group_configuration": []}
547+
groups = agent.get("group", [])
548+
if groups:
549+
group_name = groups[0] if isinstance(groups, list) else groups
550+
try:
551+
group_config = await self._request("GET", f"/groups/{group_name}/configuration")
552+
config_data["group_configuration"] = group_config.get("data", {}).get("affected_items", [])
553+
except Exception:
554+
config_data["group_configuration"] = []
555+
return {"data": config_data}
445556

446557
async def get_critical_vulnerabilities(self, limit: int) -> Dict[str, Any]:
447558
"""
@@ -480,36 +591,158 @@ async def get_vulnerability_summary(self, time_range: str) -> Dict[str, Any]:
480591
return await self._indexer_client.get_vulnerability_summary()
481592

482593
async def analyze_security_threat(self, indicator: str, indicator_type: str) -> Dict[str, Any]:
483-
"""Analyze security threat."""
484-
data = {"indicator": indicator, "type": indicator_type}
485-
return await self._request("POST", "/security/threat/analyze", json=data)
594+
"""Analyze security threat by searching alerts for the indicator."""
595+
if not self._indexer_client:
596+
raise IndexerNotConfiguredError()
597+
result = await self._indexer_client.get_alerts(limit=100)
598+
alerts = result.get("data", {}).get("affected_items", [])
599+
matches = []
600+
for alert in alerts:
601+
alert_str = json.dumps(alert)
602+
if indicator.lower() in alert_str.lower():
603+
matches.append(alert)
604+
return {
605+
"data": {
606+
"indicator": indicator,
607+
"type": indicator_type,
608+
"matching_alerts": len(matches),
609+
"alerts": matches[:20],
610+
}
611+
}
486612

487613
async def check_ioc_reputation(self, indicator: str, indicator_type: str) -> Dict[str, Any]:
488-
"""Check IoC reputation."""
489-
params = {"indicator": indicator, "type": indicator_type}
490-
return await self._request("GET", "/security/ioc/reputation", params=params)
614+
"""Check IoC reputation by searching alert history."""
615+
if not self._indexer_client:
616+
raise IndexerNotConfiguredError()
617+
result = await self._indexer_client.get_alerts(limit=500)
618+
alerts = result.get("data", {}).get("affected_items", [])
619+
occurrences = 0
620+
max_level = 0
621+
for alert in alerts:
622+
alert_str = json.dumps(alert)
623+
if indicator.lower() in alert_str.lower():
624+
occurrences += 1
625+
level = alert.get("rule", {}).get("level", 0)
626+
if isinstance(level, int) and level > max_level:
627+
max_level = level
628+
risk = "high" if max_level >= 10 else "medium" if max_level >= 5 else "low"
629+
return {
630+
"data": {
631+
"indicator": indicator,
632+
"type": indicator_type,
633+
"occurrences": occurrences,
634+
"max_alert_level": max_level,
635+
"risk": risk,
636+
}
637+
}
491638

492639
async def perform_risk_assessment(self, agent_id: str = None) -> Dict[str, Any]:
493-
"""Perform risk assessment."""
494-
endpoint = f"/security/risk/{agent_id}" if agent_id else "/security/risk"
495-
return await self._request("GET", endpoint)
640+
"""Perform risk assessment from real agent and vulnerability data."""
641+
risk_factors: list = []
642+
params: Dict[str, Any] = {"select": "id,name,status,os.name,version"}
643+
if agent_id:
644+
params["agents_list"] = agent_id
645+
agents = await self._request("GET", "/agents", params=params)
646+
items = agents.get("data", {}).get("affected_items", [])
647+
disconnected = [a for a in items if a.get("status") != "active"]
648+
if disconnected:
649+
risk_factors.append({"factor": "disconnected_agents", "count": len(disconnected), "severity": "high"})
650+
if self._indexer_client:
651+
try:
652+
vuln_summary = await self._indexer_client.get_vulnerability_summary()
653+
critical = vuln_summary.get("data", {}).get("critical", 0)
654+
if critical > 0:
655+
risk_factors.append(
656+
{"factor": "critical_vulnerabilities", "count": critical, "severity": "critical"}
657+
)
658+
except Exception:
659+
pass
660+
if any(f["severity"] == "critical" for f in risk_factors):
661+
risk_level = "critical"
662+
elif any(f["severity"] == "high" for f in risk_factors):
663+
risk_level = "high"
664+
else:
665+
risk_level = "medium"
666+
return {
667+
"data": {
668+
"total_agents": len(items),
669+
"risk_factors": risk_factors,
670+
"risk_level": risk_level,
671+
}
672+
}
496673

497674
async def get_top_security_threats(self, limit: int, time_range: str) -> Dict[str, Any]:
498-
"""Get top security threats."""
499-
params = {"limit": limit, "time_range": time_range}
500-
return await self._request("GET", "/security/threats/top", params=params)
675+
"""Get top threats by alert rule frequency from Indexer."""
676+
if not self._indexer_client:
677+
raise IndexerNotConfiguredError()
678+
start = self._time_range_to_start(time_range)
679+
result = await self._indexer_client.get_alerts(limit=1000, timestamp_start=start)
680+
alerts = result.get("data", {}).get("affected_items", [])
681+
rule_counts: Dict[str, Dict[str, Any]] = {}
682+
for alert in alerts:
683+
rule = alert.get("rule", {})
684+
rule_id = rule.get("id", "unknown")
685+
if rule_id not in rule_counts:
686+
rule_counts[rule_id] = {
687+
"rule_id": rule_id,
688+
"description": rule.get("description", ""),
689+
"level": rule.get("level", 0),
690+
"count": 0,
691+
"groups": rule.get("groups", []),
692+
}
693+
rule_counts[rule_id]["count"] += 1
694+
threats = sorted(rule_counts.values(), key=lambda x: (-x.get("level", 0), -x["count"]))[:limit]
695+
return {"data": {"time_range": time_range, "threats": threats, "total_unique_rules": len(rule_counts)}}
501696

502697
async def generate_security_report(self, report_type: str, include_recommendations: bool) -> Dict[str, Any]:
503-
"""Generate security report."""
504-
data = {"type": report_type, "include_recommendations": include_recommendations}
505-
return await self._request("POST", "/security/reports/generate", json=data)
698+
"""Generate security report by aggregating data from multiple real endpoints."""
699+
report: Dict[str, Any] = {
700+
"report_type": report_type,
701+
"generated_at": datetime.now(timezone.utc).isoformat(),
702+
"sections": {},
703+
}
704+
try:
705+
agents = await self._request("GET", "/agents", params={"limit": 500})
706+
items = agents.get("data", {}).get("affected_items", [])
707+
active = sum(1 for a in items if a.get("status") == "active")
708+
report["sections"]["agents"] = {"total": len(items), "active": active, "disconnected": len(items) - active}
709+
except Exception as e:
710+
report["sections"]["agents"] = {"error": str(e)}
711+
try:
712+
info = await self._request("GET", "/")
713+
report["sections"]["manager"] = info.get("data", {})
714+
except Exception as e:
715+
report["sections"]["manager"] = {"error": str(e)}
716+
if self._indexer_client:
717+
try:
718+
vuln_summary = await self._indexer_client.get_vulnerability_summary()
719+
report["sections"]["vulnerabilities"] = vuln_summary.get("data", {})
720+
except Exception as e:
721+
report["sections"]["vulnerabilities"] = {"error": str(e)}
722+
return {"data": report}
506723

507724
async def run_compliance_check(self, framework: str, agent_id: str = None) -> Dict[str, Any]:
508-
"""Run compliance check."""
509-
data = {"framework": framework}
725+
"""Run compliance check using Wazuh SCA data."""
510726
if agent_id:
511-
data["agent_id"] = agent_id
512-
return await self._request("POST", "/security/compliance/check", json=data)
727+
try:
728+
return await self._request("GET", f"/sca/{agent_id}")
729+
except Exception:
730+
return await self._request(
731+
"GET", "/agents", params={"agents_list": agent_id, "select": "id,name,status,group,configSum"}
732+
)
733+
agents_result = await self._request(
734+
"GET", "/agents", params={"status": "active", "limit": 10, "select": "id,name"}
735+
)
736+
agents = agents_result.get("data", {}).get("affected_items", [])
737+
sca_results = []
738+
for agent in agents[:5]:
739+
aid = agent.get("id")
740+
try:
741+
sca = await self._request("GET", f"/sca/{aid}")
742+
sca_results.append({"agent_id": aid, "agent_name": agent.get("name"), "sca": sca.get("data", {})})
743+
except Exception:
744+
sca_results.append({"agent_id": aid, "agent_name": agent.get("name"), "sca": {"error": "unavailable"}})
745+
return {"data": {"framework": framework, "agents_checked": len(sca_results), "results": sca_results}}
513746

514747
async def get_wazuh_statistics(self) -> Dict[str, Any]:
515748
"""Get Wazuh statistics."""

0 commit comments

Comments
 (0)