11import json
22from pydantic import BaseModel , Field
3- from typing import List
3+ from typing import List , Optional
44from langchain_core .messages import SystemMessage , HumanMessage
55from langchain_core .output_parsers import PydanticOutputParser
66from watchtower .core .state import AgentState
77from watchtower .agents .planner import get_llm
88
9+
10+ # ── Schema ────────────────────────────────────────────────────────────────────
11+
912class IDORReasoningOutput (BaseModel ):
10- confidence_score : int = Field (description = "Confidence score from 1-10 that IDOR vulnerabilities exist here." , ge = 1 , le = 10 )
11- potential_business_impact : str = Field (description = "The potential business impact if these endpoints contain IDOR vulnerabilities." )
12- testing_plan : List [str ] = Field (description = "A comprehensive step-by-step token/UserID swapping testing plan." )
13+ confidence_score : int = Field (
14+ description = "Confidence score from 1-10 that IDOR vulnerabilities exist here." ,
15+ ge = 1 ,
16+ le = 10 ,
17+ )
18+ potential_business_impact : str = Field (
19+ description = "The potential business impact if these endpoints contain IDOR vulnerabilities."
20+ )
21+ testing_plan : List [str ] = Field (
22+ description = "A comprehensive step-by-step token/UserID swapping testing plan."
23+ )
24+ affected_endpoints : Optional [List [str ]] = Field (
25+ default = None ,
26+ description = "Specific endpoints or parameters flagged as high-risk for IDOR."
27+ )
28+
29+
30+ # ── Constants ─────────────────────────────────────────────────────────────────
31+
32+ RELEVANT_TOOLS = {"httpx" , "kiterunner" , "arjun" , "gobuster" , "ffuf" }
33+
34+ SYSTEM_PROMPT = """\
35+ You are a Senior Security Researcher specialising in Insecure Direct Object \
36+ Reference (IDOR) and business logic flaws.
37+
38+ Your responsibilities:
39+ - Analyse discovered endpoints and parameters for IDOR risk.
40+ - Assess likelihood of token/UserID swapping vulnerabilities.
41+ - Produce a concrete, step-by-step testing methodology.
42+ - Identify specific high-risk endpoints when visible in the data.
43+
44+ Rules:
45+ - Base every conclusion strictly on the supplied recon data.
46+ - Never fabricate endpoints or parameters not present in the input.
47+ - Output ONLY valid JSON matching the requested schema — no prose, no markdown fences.\
48+ """
49+
50+
51+ # ── Helpers ───────────────────────────────────────────────────────────────────
52+
53+ def _build_recon_summary (observations : list ) -> str :
54+ """Concatenate output from relevant recon tools into a single block."""
55+ lines = []
56+ for obs in observations :
57+ if obs .get ("tool" ) in RELEVANT_TOOLS :
58+ lines .append (
59+ f"[Tool: { obs ['tool' ]} ]\n { obs .get ('output' , '(no output)' )} "
60+ )
61+ return "\n \n " .join (lines ) if lines else ""
62+
63+
64+ def _build_finding (result : IDORReasoningOutput ) -> dict :
65+ plan_text = "\n " .join (f" { i + 1 } . { step } " for i , step in enumerate (result .testing_plan ))
66+ endpoints_text = (
67+ "\n " .join (f" - { ep } " for ep in result .affected_endpoints )
68+ if result .affected_endpoints
69+ else " None flagged explicitly."
70+ )
71+
72+ return {
73+ "title" : "IDOR Logic Analysis Plan" ,
74+ "severity" : "Info" ,
75+ "confidence" : result .confidence_score ,
76+ "description" : (
77+ f"Confidence: { result .confidence_score } /10\n \n "
78+ f"Business Impact:\n { result .potential_business_impact } \n \n "
79+ f"High-Risk Endpoints:\n { endpoints_text } \n \n "
80+ f"Testing Plan:\n { plan_text } "
81+ ),
82+ "evidence" : "Generated by Logic Analysis Agent" ,
83+ }
84+
85+
86+ # ── Node ──────────────────────────────────────────────────────────────────────
1387
1488def logic_analysis_node (state : AgentState ) -> dict :
15- observations = state .get ("observations" , [])
89+ observations : list = state .get ("observations" , [])
90+
1691 if not observations :
1792 return {"messages" : ["No observations found for logic analysis." ]}
18-
19- # Gather output from recent tools, especially web recon tools like httpx, kiterunner, arjun
20- recon_data = "\n " .join (
21- f"Tool: { obs .get ('tool' )} \n Output: { obs .get ('output' )} "
22- for obs in observations if obs .get ("tool" ) in ["httpx" , "kiterunner" , "arjun" , "gobuster" , "ffuf" ]
23- )
24-
93+
94+ latest_tool = observations [- 1 ].get ("tool" )
95+ if latest_tool not in RELEVANT_TOOLS :
96+ return {
97+ "messages" : [f"Skipping logic analysis — last tool '{ latest_tool } ' is not a recon trigger." ]
98+ }
99+
100+ recon_summary = _build_recon_summary (observations )
101+ if not recon_summary :
102+ return {"messages" : ["Relevant recon tools ran but produced no usable output." ]}
103+
25104 parser = PydanticOutputParser (pydantic_object = IDORReasoningOutput )
26-
27- prompt = f"""
28- You are a Senior Security Researcher specializing in Insecure Direct Object Reference (IDOR) and business logic flaws.
29- Review the following web recon endpoints and parameters discovered on the target.
30105
31- Web Recon Data:
32- { recon_data }
106+ user_prompt = (
107+ "Analyse the following web recon data for IDOR vulnerabilities "
108+ "and respond with a JSON object that matches the schema exactly.\n \n "
109+ f"--- RECON DATA ---\n { recon_summary } \n --- END RECON DATA ---\n \n "
110+ f"{ parser .get_format_instructions ()} "
111+ )
112+
113+ messages = [
114+ SystemMessage (content = SYSTEM_PROMPT ),
115+ HumanMessage (content = user_prompt ),
116+ ]
33117
34- Analyze the data for potential IDOR vulnerabilities. Generate a strict JSON output matching the expected schema.
35- Assess the likelihood of IDOR, describe the potential business impact, and provide a detailed step-by-step methodology to test for token and UserID swapping.
36- { parser .get_format_instructions ()}
37- """
38-
39118 try :
40119 llm = get_llm ()
41120 chain = llm | parser
42- result = chain .invoke ([HumanMessage (content = prompt )])
43-
44- finding = {
45- "title" : "IDOR Logic Analysis Plan" ,
46- "severity" : "Info" ,
47- "description" : f"Confidence: { result .confidence_score } /10\n Impact: { result .potential_business_impact } \n Plan: { ', ' .join (result .testing_plan )} " ,
48- "evidence" : "Generated by Logic Analysis Agent"
49- }
50- return {"findings" : [finding ]}
121+ result : IDORReasoningOutput = chain .invoke (messages )
122+
123+ finding = _build_finding (result )
124+ existing_findings : list = state .get ("findings" , [])
125+ return {"findings" : existing_findings + [finding ]}
126+
51127 except Exception as e :
52- return {"messages" : [f"Logic analysis error: { e } " ]}
128+ return {"messages" : [f"Logic analysis error: { e } " ]}
0 commit comments