|
| 1 | +""" |
| 2 | +AttackerKB MISP Module |
| 3 | +
|
| 4 | +Author: R7 Labs |
| 5 | +Description: Enrich CVEs via AttackerKB API and return structured MISP events. |
| 6 | +""" |
| 7 | + |
| 8 | +import json |
| 9 | +import re |
| 10 | +import logging |
| 11 | +from typing import Any |
| 12 | + |
| 13 | +import requests |
| 14 | +from pymisp import MISPEvent, MISPObject |
| 15 | + |
| 16 | +# Configure logging to stdout with standard timestamped format |
| 17 | +logging.basicConfig( |
| 18 | + level=logging.INFO, |
| 19 | + format="%(asctime)s,%(msecs)03d - %(name)s - %(levelname)s - %(message)s", |
| 20 | + datefmt="%Y-%m-%d %H:%M:%S", |
| 21 | + force=True |
| 22 | +) |
| 23 | +logger = logging.getLogger(__name__) |
| 24 | + |
| 25 | +# Constants |
| 26 | +API_BASE_URL = "https://api.attackerkb.com/v1" |
| 27 | +moduleconfig: list[str] = ["api_key"] |
| 28 | +misperrors: dict[str, str] = {"error": "Unable to query AttackerKB API"} |
| 29 | +mispattributes: dict[str, str | list[str]] = { |
| 30 | + "input": ["vulnerability", "comment"], |
| 31 | + "output": ["MISPObject"], |
| 32 | + "format": "misp_standard" |
| 33 | +} |
| 34 | +moduleinfo: dict[str, Any] = { |
| 35 | + "version": "48", |
| 36 | + "author": "R7 Labs", |
| 37 | + "description": "Enrich CVEs via AttackerKB and return structured MISP events. Handles rate limits, regex CVE detection, and markdown cleanup.", |
| 38 | + "module-type": ["expansion", "hover"], |
| 39 | + "name": "r7_akb", |
| 40 | + "requirements": ["pymisp", "requests"], |
| 41 | + "input": "Vulnerability attribute (CVE ID or comment containing CVE).", |
| 42 | + "output": "Structured MISP Objects." |
| 43 | +} |
| 44 | + |
| 45 | +# Global HTTP session (stateless re-use; not an application state container) |
| 46 | +session = requests.Session() |
| 47 | + |
| 48 | + |
| 49 | +# --------------------------- |
| 50 | +# HTTP / API helpers |
| 51 | +# --------------------------- |
| 52 | + |
| 53 | +def fetch_json(path: str, headers: dict[str, str]) -> dict[str, Any]: |
| 54 | + """Send GET request to the AttackerKB API and return JSON object.""" |
| 55 | + url = f"{API_BASE_URL}/{path.lstrip('/')}" |
| 56 | + logger.info(f"GET {url}") |
| 57 | + resp = session.get(url, headers=headers) |
| 58 | + logger.info(f"Response status: {resp.status_code}") |
| 59 | + resp.raise_for_status() |
| 60 | + data = resp.json() |
| 61 | + if not isinstance(data, dict): |
| 62 | + raise TypeError("Expected JSON object at top level") |
| 63 | + return data |
| 64 | + |
| 65 | + |
| 66 | +def get_topic_id(cve_id: str, headers: dict[str, str]) -> str | None: |
| 67 | + """Retrieve the topic ID for a given CVE.""" |
| 68 | + data = fetch_json(f"topics?name={cve_id}", headers) |
| 69 | + topics = data.get("data", []) |
| 70 | + logger.info(f"Topics found for {cve_id}: {len(topics) if isinstance(topics, list) else 0}") |
| 71 | + if isinstance(topics, list) and topics: |
| 72 | + first = topics[0] |
| 73 | + if isinstance(first, dict): |
| 74 | + tid = first.get("id") |
| 75 | + return tid if isinstance(tid, str) else None |
| 76 | + return None |
| 77 | + |
| 78 | + |
| 79 | +def get_detail(topic_id: str, headers: dict[str, str]) -> dict[str, Any]: |
| 80 | + """Get detailed topic data with tags and references.""" |
| 81 | + logger.info(f"Fetching details for topic ID {topic_id}") |
| 82 | + data = fetch_json(f"topics/{topic_id}?expand=tags,references", headers) |
| 83 | + detail = data.get("data", {}) |
| 84 | + return detail if isinstance(detail, dict) else {} |
| 85 | + |
| 86 | + |
| 87 | +def get_assessments(topic_id: str, headers: dict[str, str]) -> list[dict[str, Any]]: |
| 88 | + """Retrieve all assessments for a topic.""" |
| 89 | + logger.info(f"Fetching assessments for topic ID {topic_id}") |
| 90 | + data = fetch_json(f"assessments?topicId={topic_id}", headers) |
| 91 | + items = data.get("data", []) |
| 92 | + return items if isinstance(items, list) else [] |
| 93 | + |
| 94 | + |
| 95 | +def get_contributor_username(editor_id: str, headers: dict[str, str]) -> str: |
| 96 | + data = fetch_json(f"contributors/{editor_id}", headers) |
| 97 | + user = data.get("data", {}) |
| 98 | + if isinstance(user, dict): |
| 99 | + username = user.get("username") |
| 100 | + if isinstance(username, str): |
| 101 | + return username |
| 102 | + return f"Unknown ({editor_id})" |
| 103 | + |
| 104 | + |
| 105 | +# --------------------------- |
| 106 | +# Data mapping / formatting |
| 107 | +# --------------------------- |
| 108 | + |
| 109 | +def map_score_label(value: float | int | str | None) -> str: |
| 110 | + """Map numeric score to descriptive label.""" |
| 111 | + try: |
| 112 | + score = float(value) # type: ignore[arg-type] |
| 113 | + except (TypeError, ValueError): |
| 114 | + return "Unknown" |
| 115 | + if score <= 0: |
| 116 | + return "Unknown" |
| 117 | + for threshold, label in [(1.5, "Very Low"), (2.5, "Low"), (3.5, "Medium"), (4.5, "High")]: |
| 118 | + if score < threshold: |
| 119 | + return label |
| 120 | + return "Very High" |
| 121 | + |
| 122 | + |
| 123 | +def remove_markdown(text: str) -> str: |
| 124 | + """Strip markdown formatting from text.""" |
| 125 | + if not text: |
| 126 | + return '' |
| 127 | + text = re.sub(r'#+\s*', '', text) |
| 128 | + text = re.sub(r'(\*\*?|__|_|`)(.*?)\1', r'\2', text) |
| 129 | + text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text) |
| 130 | + text = re.sub(r'!\[.*?\]\(.*?\)', '', text) |
| 131 | + text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE) |
| 132 | + text = re.sub(r'^(-{3,}|_{3,}|\*{3,})$', '', text, flags=re.MULTILINE) |
| 133 | + text = re.sub(r'<[^>]+>', '', text) |
| 134 | + return text.strip() |
| 135 | + |
| 136 | + |
| 137 | +# --------------------------- |
| 138 | +# MISP object builders |
| 139 | +# --------------------------- |
| 140 | + |
| 141 | +def build_report(detail: dict[str, Any]) -> MISPObject: |
| 142 | + """Create attackerkb-report MISP object.""" |
| 143 | + obj = MISPObject("attackerkb-report") |
| 144 | + meta = detail.get("metadata", {}) if isinstance(detail.get("metadata", {}), dict) else {} |
| 145 | + score_data = detail.get("score", {}) if isinstance(detail.get("score", {}), dict) else {} |
| 146 | + |
| 147 | + obj.add_attribute("cve-id", detail.get("name"), type="text") |
| 148 | + obj.add_attribute("description", detail.get("document", ""), type="text", disable_correlation=True) |
| 149 | + obj.add_attribute("permalink", f"https://attackerkb.com/topics/{detail.get('id')}", type="link") |
| 150 | + |
| 151 | + cvss_metric = meta.get("cvssMetricV31", {}) if isinstance(meta.get("cvssMetricV31", {}), dict) else {} |
| 152 | + cvss_data = cvss_metric.get("cvssData", {}) if isinstance(cvss_metric.get("cvssData", {}), dict) else {} |
| 153 | + obj.add_attribute("cvss-score", cvss_data.get("baseScore"), type="float") |
| 154 | + |
| 155 | + obj.add_attribute("attacker-value", map_score_label(score_data.get("attackerValue")), type="text") |
| 156 | + obj.add_attribute("exploitability", map_score_label(score_data.get("exploitability")), type="text") |
| 157 | + |
| 158 | + return obj |
| 159 | + |
| 160 | + |
| 161 | +def build_references(detail: dict[str, Any]) -> MISPObject | None: |
| 162 | + """Build attackerkb-references MISP object from detail.""" |
| 163 | + refs = detail.get("references", []) |
| 164 | + if not isinstance(refs, list) or not refs: |
| 165 | + return None |
| 166 | + obj = MISPObject("attackerkb-references") |
| 167 | + for ref in refs: |
| 168 | + if isinstance(ref, dict): |
| 169 | + url = ref.get("url") |
| 170 | + if isinstance(url, str) and url: |
| 171 | + obj.add_attribute("reference-url", url, type="link") |
| 172 | + return obj |
| 173 | + |
| 174 | + |
| 175 | +def build_assessments(topic_id: str, headers: dict[str, str]) -> list[MISPObject]: |
| 176 | + """Create attackerkb-assessment MISP objects from assessments.""" |
| 177 | + objs: list[MISPObject] = [] |
| 178 | + for a in get_assessments(topic_id, headers): |
| 179 | + if not isinstance(a, dict): |
| 180 | + continue |
| 181 | + mo = MISPObject("attackerkb-assessment") |
| 182 | + editor_id = a.get("editorId") |
| 183 | + if isinstance(editor_id, str): |
| 184 | + contributor = get_contributor_username(editor_id, headers) |
| 185 | + else: |
| 186 | + contributor = "Unknown" |
| 187 | + mo.add_attribute("contributor", contributor, type="text") |
| 188 | + mo.add_attribute("attacker-value", map_score_label(a.get("score")), type="text") |
| 189 | + |
| 190 | + md = a.get("metadata", {}) |
| 191 | + if not isinstance(md, dict): |
| 192 | + md = {} |
| 193 | + mo.add_attribute("exploitability", map_score_label(md.get("exploitability")), type="text") |
| 194 | + |
| 195 | + doc = a.get("document") |
| 196 | + if isinstance(doc, str) and doc: |
| 197 | + mo.add_attribute("notes", doc, type="text", disable_correlation=True) |
| 198 | + objs.append(mo) |
| 199 | + return objs |
| 200 | + |
| 201 | + |
| 202 | +# --------------------------- |
| 203 | +# Result / error helpers |
| 204 | +# --------------------------- |
| 205 | + |
| 206 | +def build_error_event(message: str) -> MISPEvent: |
| 207 | + """Construct a MISP event containing an error object.""" |
| 208 | + event = MISPEvent() |
| 209 | + obj = MISPObject("attackerkb-error") |
| 210 | + obj.add_attribute("error", message, type="text", disable_correlation=True) |
| 211 | + event.add_object(**obj.to_dict()) |
| 212 | + return event |
| 213 | + |
| 214 | + |
| 215 | +def get_result(event: MISPEvent) -> dict[str, Any]: |
| 216 | + """Serialize and return the MISP event.""" |
| 217 | + ev = json.loads(event.to_json()) |
| 218 | + # Only return keys MISP expects in results |
| 219 | + return {"results": {k: ev[k] for k in ("Attribute", "Object") if ev.get(k)}} |
| 220 | + |
| 221 | + |
| 222 | +# --------------------------- |
| 223 | +# MISP module entrypoints |
| 224 | +# --------------------------- |
| 225 | + |
| 226 | +def handler(q: Any = False) -> dict[str, Any]: |
| 227 | + """Main handler for MISP expansion module.""" |
| 228 | + try: |
| 229 | + payload = json.loads(q.decode()) if isinstance(q, (bytes, bytearray)) else q |
| 230 | + if not isinstance(payload, dict): |
| 231 | + return get_result(build_error_event("Invalid payload")) |
| 232 | + |
| 233 | + attribute = payload.get("attribute", {}) |
| 234 | + attribute_value = attribute.get("value", "") if isinstance(attribute, dict) else "" |
| 235 | + config = payload.get("config", {}) |
| 236 | + api_key = config.get("api_key") if isinstance(config, dict) else None |
| 237 | + |
| 238 | + if not attribute_value or not isinstance(api_key, str): |
| 239 | + return get_result(build_error_event("Missing CVE input or API key")) |
| 240 | + |
| 241 | + cve_matches = set(re.findall(r"CVE-\d{4}-\d{4,7}", attribute_value)) |
| 242 | + if not cve_matches: |
| 243 | + return get_result(build_error_event("No valid CVE found")) |
| 244 | + |
| 245 | + headers: dict[str, str] = {"Authorization": f"Bearer {api_key}"} |
| 246 | + event = MISPEvent() |
| 247 | + |
| 248 | + for cve in sorted(cve_matches): |
| 249 | + logger.info(f"Processing CVE: {cve}") |
| 250 | + topic_id = get_topic_id(cve, headers) |
| 251 | + if not topic_id: |
| 252 | + logger.info(f"CVE not found in AttackerKB: {cve}") |
| 253 | + # Optionally, add a lightweight note object indicating a miss |
| 254 | + miss = MISPObject("attackerkb-miss") |
| 255 | + miss.add_attribute("cve-id", cve, type="text") |
| 256 | + miss.add_attribute("note", "CVE not found in AttackerKB", type="text", disable_correlation=True) |
| 257 | + event.add_object(**miss.to_dict()) |
| 258 | + continue |
| 259 | + |
| 260 | + detail = get_detail(topic_id, headers) |
| 261 | + event.add_object(**build_report(detail).to_dict()) |
| 262 | + |
| 263 | + ref = build_references(detail) |
| 264 | + if ref: |
| 265 | + event.add_object(**ref.to_dict()) |
| 266 | + |
| 267 | + ra = detail.get("rapid7Analysis") |
| 268 | + if isinstance(ra, str) and ra: |
| 269 | + mo = MISPObject("attackerkb-rapid7-analysis") |
| 270 | + mo.add_attribute("rapid7-analysis", remove_markdown(ra), type="text", disable_correlation=True) |
| 271 | + event.add_object(**mo.to_dict()) |
| 272 | + |
| 273 | + for mo in build_assessments(topic_id, headers): |
| 274 | + event.add_object(**mo.to_dict()) |
| 275 | + |
| 276 | + return get_result(event) |
| 277 | + |
| 278 | + except Exception as e: |
| 279 | + logger.error(f"Exception occurred: {e}") |
| 280 | + return get_result(build_error_event(str(e))) |
| 281 | + |
| 282 | + |
| 283 | +def introspection() -> dict[str, Any]: |
| 284 | + """Return MISP module attributes.""" |
| 285 | + return mispattributes |
| 286 | + |
| 287 | + |
| 288 | +def version() -> dict[str, Any]: |
| 289 | + """Return MISP module version and configuration.""" |
| 290 | + moduleinfo["config"] = moduleconfig |
| 291 | + return moduleinfo |
0 commit comments