Skip to content

Commit e38ac2b

Browse files
CristiVlad25root
andauthored
Add JS surface mapper tool to Web Pentester agent (#397)
add js mapper Co-authored-by: root <root@Unknown.localdomain>
1 parent e796efb commit e38ac2b

File tree

2 files changed

+336
-0
lines changed

2 files changed

+336
-0
lines changed

src/cai/agents/web_pentester.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from cai.tools.reconnaissance.generic_linux_command import generic_linux_command
1515
from cai.tools.reconnaissance.exec_code import execute_code
1616
from cai.tools.web.headers import web_request_framework
17+
from cai.tools.web.js_surface_mapper import js_surface_mapper
1718

1819
# Optional OSINT search (requires PERPLEXITY_API_KEY)
1920
from cai.tools.web.search_web import make_web_search_with_explanation
@@ -29,6 +30,7 @@
2930
generic_linux_command, # shell one-liners (httpie/curl/waybackurls/etc if installed)
3031
execute_code, # light parsing/diffing/payload crafting
3132
web_request_framework, # HTTP request + response/header security analysis
33+
js_surface_mapper, # JS asset surface extraction (endpoints/ops/ws)
3234
]
3335

3436
# Conditional: add web search helper when available
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
"""
2+
JS Surface Mapper - extract high-signal attack surface hints from JS assets.
3+
4+
Baseline functionality (framework-agnostic):
5+
- Fetch entry HTML pages and collect JS asset URLs
6+
- Fetch JS assets (bounded)
7+
- Extract API endpoints, full URLs, GraphQL hints, WS/SSE endpoints
8+
- Optionally fetch sourcemaps and extract from sourcesContent
9+
10+
This tool is intentionally deterministic and bounded to avoid crawling noise.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import json
16+
import re
17+
from dataclasses import dataclass, field
18+
from html.parser import HTMLParser
19+
from typing import Dict, Iterable, List, Optional, Set, Tuple
20+
from urllib.parse import urljoin, urlparse
21+
22+
import requests # type: ignore
23+
24+
from cai.sdk.agents import function_tool
25+
26+
27+
_FULL_URL_RE = re.compile(r"https?://[^\s\"'<>\\)]+")
28+
_WS_URL_RE = re.compile(r"wss?://[^\s\"'<>\\)]+")
29+
_GQL_ENDPOINT_RE = re.compile(r"/graphql\b|/gql\b", re.IGNORECASE)
30+
_GQL_OPNAME_RE = re.compile(r"operationName\s*[:=]\s*[\"']([A-Za-z0-9_]{2,})[\"']")
31+
_GQL_OP_RE = re.compile(r"\b(query|mutation|subscription)\s+([A-Za-z0-9_]{2,})")
32+
_PERSISTED_HASH_RE = re.compile(r"sha256Hash\s*[:=]\s*[\"']([a-fA-F0-9]{16,64})[\"']")
33+
34+
# Broad-but-targeted path patterns for endpoints
35+
_PATH_ENDPOINT_RE = re.compile(
36+
r"(?<![A-Za-z0-9_])/(?:"
37+
r"api|graphql|gql|v\d+|admin|internal|export|download|uploads|files|"
38+
r"report|reports|billing|oauth|auth|login|logout|session|sessions|"
39+
r"token|tokens|users|user|account|accounts|tenant|tenants|org|orgs|"
40+
r"organization|organizations|project|projects|team|teams|workspace|workspaces|"
41+
r"invoice|invoices|payment|checkout|order|orders|cart|carts|subscription|subscriptions|"
42+
r"feature|features|flag|flags|debug|preview|staging"
43+
r")(?:[A-Za-z0-9_\-./?=&%]*)"
44+
)
45+
46+
_SOURCE_MAP_RE = re.compile(r"^\s*//#\s*sourceMappingURL\s*=\s*(\S+)\s*$", re.MULTILINE)
47+
48+
_HIGH_VALUE_STRINGS = [
49+
"admin", "entitlement", "featureflag", "feature_flag", "flag", "debug",
50+
"internal", "staging", "preview", "billing", "invoice", "payment", "export",
51+
"report", "impersonate", "impersonation", "role", "permission", "rbac",
52+
"tenant", "organization", "workspace",
53+
]
54+
55+
56+
@dataclass
57+
class _ExtractionResult:
58+
origins: Set[str] = field(default_factory=set)
59+
endpoints: Set[str] = field(default_factory=set)
60+
graphql_endpoints: Set[str] = field(default_factory=set)
61+
graphql_ops: Set[str] = field(default_factory=set)
62+
persisted_hashes: Set[str] = field(default_factory=set)
63+
ws_endpoints: Set[str] = field(default_factory=set)
64+
high_value: Set[str] = field(default_factory=set)
65+
66+
67+
class _AssetHTMLParser(HTMLParser):
68+
def __init__(self) -> None:
69+
super().__init__()
70+
self.script_srcs: List[str] = []
71+
self.inline_scripts: List[str] = []
72+
self._in_script: bool = False
73+
self._current_inline: List[str] = []
74+
self.link_hrefs: List[str] = []
75+
76+
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
77+
attrs_dict = {k.lower(): (v or "") for k, v in attrs}
78+
if tag.lower() == "script":
79+
src = attrs_dict.get("src", "").strip()
80+
if src:
81+
self.script_srcs.append(src)
82+
else:
83+
self._in_script = True
84+
self._current_inline = []
85+
elif tag.lower() == "link":
86+
rel = attrs_dict.get("rel", "").lower()
87+
href = attrs_dict.get("href", "").strip()
88+
as_attr = attrs_dict.get("as", "").lower()
89+
if href and (rel in ("modulepreload", "preload") or as_attr == "script"):
90+
self.link_hrefs.append(href)
91+
92+
def handle_endtag(self, tag: str) -> None:
93+
if tag.lower() == "script" and self._in_script:
94+
content = "".join(self._current_inline).strip()
95+
if content:
96+
self.inline_scripts.append(content)
97+
self._in_script = False
98+
self._current_inline = []
99+
100+
def handle_data(self, data: str) -> None:
101+
if self._in_script and data:
102+
self._current_inline.append(data)
103+
104+
105+
def _normalize_base_url(base_url: str) -> str:
106+
base_url = (base_url or "").strip()
107+
if not base_url:
108+
return ""
109+
parsed = urlparse(base_url)
110+
if not parsed.scheme:
111+
base_url = "http://" + base_url
112+
return base_url.rstrip("/")
113+
114+
115+
def _origin(url: str) -> str:
116+
p = urlparse(url)
117+
if not p.scheme or not p.netloc:
118+
return ""
119+
return f"{p.scheme}://{p.netloc}"
120+
121+
122+
def _fetch_text(url: str, headers: Optional[Dict[str, str]], cookies: Optional[Dict[str, str]],
123+
timeout: int, max_bytes: int) -> Tuple[str, Optional[str]]:
124+
try:
125+
resp = requests.get(url, headers=headers, cookies=cookies, timeout=timeout, verify=False, stream=True)
126+
resp.raise_for_status()
127+
data = bytearray()
128+
for chunk in resp.iter_content(chunk_size=16384):
129+
if not chunk:
130+
continue
131+
data.extend(chunk)
132+
if len(data) >= max_bytes:
133+
break
134+
# Best-effort decode
135+
text = data.decode(errors="replace")
136+
return text, None
137+
except Exception as exc: # pylint: disable=broad-except
138+
return "", f"{url} -> {exc}"
139+
140+
141+
def _extract_from_text(text: str, source_label: str, base_origin: str) -> _ExtractionResult:
142+
result = _ExtractionResult()
143+
if not text:
144+
return result
145+
146+
for url in _FULL_URL_RE.findall(text):
147+
result.origins.add(_origin(url))
148+
if _GQL_ENDPOINT_RE.search(url):
149+
result.graphql_endpoints.add(url)
150+
151+
for url in _WS_URL_RE.findall(text):
152+
result.ws_endpoints.add(url)
153+
result.origins.add(_origin(url))
154+
155+
for path in _PATH_ENDPOINT_RE.findall(text):
156+
if path.startswith("/"):
157+
result.endpoints.add(path)
158+
if _GQL_ENDPOINT_RE.search(path):
159+
result.graphql_endpoints.add(urljoin(base_origin + "/", path))
160+
161+
for op in _GQL_OPNAME_RE.findall(text):
162+
result.graphql_ops.add(op)
163+
for _, op in _GQL_OP_RE.findall(text):
164+
result.graphql_ops.add(op)
165+
166+
for h in _PERSISTED_HASH_RE.findall(text):
167+
result.persisted_hashes.add(h)
168+
169+
lowered = text.lower()
170+
for s in _HIGH_VALUE_STRINGS:
171+
if s in lowered:
172+
result.high_value.add(s)
173+
174+
return result
175+
176+
177+
def _merge_result(target: _ExtractionResult, src: _ExtractionResult) -> None:
178+
target.origins |= src.origins
179+
target.endpoints |= src.endpoints
180+
target.graphql_endpoints |= src.graphql_endpoints
181+
target.graphql_ops |= src.graphql_ops
182+
target.persisted_hashes |= src.persisted_hashes
183+
target.ws_endpoints |= src.ws_endpoints
184+
target.high_value |= src.high_value
185+
186+
187+
@function_tool(strict_mode=False)
188+
def js_surface_mapper( # pylint: disable=too-many-arguments,too-many-locals
189+
base_url: str,
190+
entry_paths: Optional[List[str]] = None,
191+
headers: Optional[Dict[str, str]] = None,
192+
cookies: Optional[Dict[str, str]] = None,
193+
same_origin_only: bool = True,
194+
max_assets: int = 30,
195+
max_bytes_per_asset: int = 2_000_000,
196+
include_sourcemaps: bool = False,
197+
timeout: int = 10,
198+
) -> str:
199+
"""
200+
Extract JS-derived attack surface hints from a web application.
201+
202+
Args:
203+
base_url: Base URL of the app (e.g., https://example.com)
204+
entry_paths: HTML entry paths to parse (default ["/"])
205+
headers: Optional request headers (auth)
206+
cookies: Optional request cookies (auth)
207+
same_origin_only: Only fetch JS from base origin (default True)
208+
max_assets: Cap JS assets fetched (default 30)
209+
max_bytes_per_asset: Cap bytes per asset (default 2,000,000)
210+
include_sourcemaps: Fetch and parse sourcemaps (default False)
211+
timeout: Request timeout (seconds)
212+
213+
Returns:
214+
JSON string with extracted surface hints and evidence.
215+
"""
216+
base_url = _normalize_base_url(base_url)
217+
if not base_url:
218+
return json.dumps({"error": "base_url is required"}, ensure_ascii=True)
219+
220+
base_origin = _origin(base_url)
221+
entry_paths = entry_paths or ["/"]
222+
223+
assets: List[str] = []
224+
inline_sources: List[Tuple[str, str]] = []
225+
errors: List[str] = []
226+
evidence: Dict[str, Set[str]] = {}
227+
sourcemaps_info: List[Dict[str, object]] = []
228+
229+
# Fetch entry HTML pages
230+
for path in entry_paths:
231+
entry_url = path if path.startswith("http") else urljoin(base_url + "/", path.lstrip("/"))
232+
html, err = _fetch_text(entry_url, headers, cookies, timeout, max_bytes_per_asset)
233+
if err:
234+
errors.append(err)
235+
continue
236+
parser = _AssetHTMLParser()
237+
parser.feed(html)
238+
239+
# Inline script content
240+
for idx, script in enumerate(parser.inline_scripts):
241+
inline_sources.append((f"{entry_url}#inline{idx+1}", script))
242+
243+
# External JS assets
244+
for src in parser.script_srcs + parser.link_hrefs:
245+
full = src if src.startswith("http") else urljoin(entry_url, src)
246+
assets.append(full)
247+
248+
# De-dup assets and apply limits
249+
seen: Set[str] = set()
250+
dedup_assets: List[str] = []
251+
for a in assets:
252+
if a in seen:
253+
continue
254+
seen.add(a)
255+
if same_origin_only and _origin(a) and _origin(a) != base_origin:
256+
continue
257+
dedup_assets.append(a)
258+
if len(dedup_assets) >= max_assets:
259+
break
260+
261+
extraction = _ExtractionResult(origins={base_origin})
262+
263+
# Extract from inline scripts
264+
for label, content in inline_sources:
265+
res = _extract_from_text(content, label, base_origin)
266+
_merge_result(extraction, res)
267+
268+
# Fetch JS assets and extract
269+
for asset_url in dedup_assets:
270+
js, err = _fetch_text(asset_url, headers, cookies, timeout, max_bytes_per_asset)
271+
if err:
272+
errors.append(err)
273+
continue
274+
res = _extract_from_text(js, asset_url, base_origin)
275+
_merge_result(extraction, res)
276+
277+
# Evidence mapping
278+
for ep in res.endpoints:
279+
evidence.setdefault(ep, set()).add(asset_url)
280+
for op in res.graphql_ops:
281+
evidence.setdefault(f"gql_op:{op}", set()).add(asset_url)
282+
for g in res.graphql_endpoints:
283+
evidence.setdefault(f"gql_endpoint:{g}", set()).add(asset_url)
284+
for w in res.ws_endpoints:
285+
evidence.setdefault(f"ws:{w}", set()).add(asset_url)
286+
287+
# Sourcemap discovery
288+
if include_sourcemaps:
289+
for sm in _SOURCE_MAP_RE.findall(js):
290+
sm_url = sm if sm.startswith("http") else urljoin(asset_url, sm)
291+
sm_text, sm_err = _fetch_text(sm_url, headers, cookies, timeout, max_bytes_per_asset)
292+
if sm_err:
293+
errors.append(sm_err)
294+
continue
295+
try:
296+
sm_json = json.loads(sm_text)
297+
sources_content = sm_json.get("sourcesContent") or []
298+
sourcemaps_info.append({
299+
"url": sm_url,
300+
"sourcesContent": bool(sources_content),
301+
"source_count": len(sm_json.get("sources", []) or []),
302+
})
303+
# Extract from sourcesContent (bounded)
304+
for idx, src in enumerate(sources_content[:50]):
305+
res_map = _extract_from_text(src or "", f"{sm_url}#src{idx+1}", base_origin)
306+
_merge_result(extraction, res_map)
307+
for ep in res_map.endpoints:
308+
evidence.setdefault(ep, set()).add(sm_url)
309+
except Exception as exc: # pylint: disable=broad-except
310+
errors.append(f"{sm_url} -> sourcemap parse error: {exc}")
311+
312+
# Build output
313+
endpoints_by_origin: Dict[str, List[str]] = {}
314+
for ep in sorted(extraction.endpoints):
315+
endpoints_by_origin.setdefault(base_origin, []).append(ep)
316+
317+
output = {
318+
"base_url": base_url,
319+
"origins": sorted(o for o in extraction.origins if o),
320+
"assets_fetched": dedup_assets,
321+
"endpoints": endpoints_by_origin,
322+
"graphql": {
323+
"endpoints": sorted(extraction.graphql_endpoints),
324+
"operation_names": sorted(extraction.graphql_ops),
325+
"persisted_query_hints": sorted(extraction.persisted_hashes),
326+
},
327+
"ws_sse": sorted(extraction.ws_endpoints),
328+
"sourcemaps": sourcemaps_info,
329+
"high_value_strings": sorted(extraction.high_value),
330+
"evidence": {k: sorted(list(v))[:3] for k, v in evidence.items()},
331+
"errors": errors,
332+
}
333+
334+
return json.dumps(output, ensure_ascii=True)

0 commit comments

Comments
 (0)