Skip to content

Commit 3ad75df

Browse files
author
r0BIT
committed
refactor: extract task classification logic into dedicated module
- Create taskhound/classification.py with ClassificationResult dataclass - Add classify_task() as single source of truth for TIER-0/PRIV/TASK logic - Integrate into both _process_offline_host() and process_target() - Remove ~200 lines of duplicate code (engine.py: 1244 → 1044 lines) - Add 19 unit tests for classification module (189 total tests passing)
1 parent eed1ba6 commit 3ad75df

File tree

3 files changed

+663
-338
lines changed

3 files changed

+663
-338
lines changed

taskhound/classification.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
# Task classification logic for determining privilege levels.
2+
#
3+
# This module provides shared classification logic used by both online
4+
# and offline processing modes. It determines whether a task is TIER-0,
5+
# PRIV (high-value), or TASK (normal) based on the runas account.
6+
7+
from dataclasses import dataclass
8+
from typing import Any, Dict, Optional, Tuple
9+
10+
from .utils.logging import warn
11+
from .utils.sid_resolver import looks_like_domain_user
12+
13+
14+
@dataclass
15+
class ClassificationResult:
16+
"""Result of task classification."""
17+
18+
task_type: str # "TIER-0", "PRIV", or "TASK"
19+
reason: Optional[str] = None
20+
password_analysis: Optional[str] = None
21+
should_include: bool = True # Whether to include in output
22+
23+
24+
def _get_task_date_for_analysis(meta: Dict) -> Tuple[Optional[str], bool]:
25+
"""
26+
Get the best available date for password freshness analysis.
27+
Prefers RegistrationInfo/Date, falls back to StartBoundary from trigger.
28+
29+
Args:
30+
meta: Task metadata dict containing date and start_boundary fields
31+
32+
Returns:
33+
Tuple of (date_string, is_fallback) where:
34+
- date_string: ISO format date string or None if no date available
35+
- is_fallback: True if using StartBoundary fallback, False if using explicit date
36+
"""
37+
# Prefer explicit registration date
38+
if meta.get("date"):
39+
return meta.get("date"), False
40+
41+
# Fall back to start boundary (trigger time) as proxy for task creation
42+
# This is less accurate but better than no analysis at all
43+
if meta.get("start_boundary"):
44+
return meta.get("start_boundary"), True
45+
46+
return None, False
47+
48+
49+
def _analyze_password_age(
50+
hv: Any,
51+
runas: str,
52+
meta: Dict,
53+
rel_path: str,
54+
) -> Optional[str]:
55+
"""
56+
Analyze password age for DPAPI dump viability.
57+
58+
Args:
59+
hv: HighValueLoader instance
60+
runas: The account the task runs as
61+
meta: Task metadata dict
62+
rel_path: Task path for warning messages
63+
64+
Returns:
65+
Password analysis string or None if not applicable
66+
"""
67+
if not hv or not hv.loaded:
68+
return None
69+
70+
task_date, is_fallback = _get_task_date_for_analysis(meta)
71+
if is_fallback and task_date:
72+
warn(
73+
f"Task {rel_path} has no explicit creation date - "
74+
"using trigger StartBoundary for password analysis (may be inaccurate)"
75+
)
76+
77+
risk_level, pwd_analysis = hv.analyze_password_age(runas, task_date)
78+
if risk_level != "UNKNOWN":
79+
return pwd_analysis
80+
81+
return None
82+
83+
84+
def classify_task(
85+
row: Dict[str, Any],
86+
meta: Dict[str, Any],
87+
runas: str,
88+
rel_path: str,
89+
hv: Optional[Any],
90+
show_unsaved_creds: bool,
91+
include_local: bool,
92+
) -> ClassificationResult:
93+
"""
94+
Classify a task as TIER-0, PRIV, or TASK based on the runas account.
95+
96+
This is the single source of truth for task classification logic,
97+
used by both online and offline processing modes.
98+
99+
Args:
100+
row: Task row dict (modified in place with type/reason/password_analysis)
101+
meta: Parsed task XML metadata
102+
runas: The account the task runs as
103+
rel_path: Task path for display/warnings
104+
hv: HighValueLoader instance (can be None)
105+
show_unsaved_creds: Whether to include tasks without saved credentials
106+
include_local: Whether to include local system accounts
107+
108+
Returns:
109+
ClassificationResult with task_type, reason, password_analysis, should_include
110+
"""
111+
has_no_saved_creds = row.get("credentials_hint") == "no_saved_credentials"
112+
has_stored_creds = row.get("credentials_hint") == "stored_credentials"
113+
114+
# Skip tasks without saved credentials unless user explicitly requested them
115+
if has_no_saved_creds and not show_unsaved_creds:
116+
return ClassificationResult(
117+
task_type="TASK",
118+
should_include=False,
119+
)
120+
121+
# Check for Tier 0 first, then high-value
122+
if hv and hv.loaded:
123+
# Check Tier 0 classification
124+
is_tier0, tier0_reasons = hv.check_tier0(runas)
125+
if is_tier0:
126+
reason = "; ".join(tier0_reasons)
127+
password_analysis = None
128+
129+
if has_no_saved_creds:
130+
reason = f"{reason} (no saved credentials — DPAPI dump not applicable; manipulation requires an interactive session)"
131+
else:
132+
password_analysis = _analyze_password_age(hv, runas, meta, rel_path)
133+
134+
# Update row in place
135+
row["type"] = "TIER-0"
136+
row["reason"] = reason
137+
row["password_analysis"] = password_analysis
138+
139+
return ClassificationResult(
140+
task_type="TIER-0",
141+
reason=reason,
142+
password_analysis=password_analysis,
143+
should_include=True,
144+
)
145+
146+
# Check high-value (PRIV)
147+
if hv.check_highvalue(runas):
148+
reason = "High Value match found (Check BloodHound Outbound Object Control for Details)"
149+
password_analysis = None
150+
151+
if has_no_saved_creds:
152+
reason = f"{reason} (no saved credentials — DPAPI dump not applicable; manipulation requires an interactive session)"
153+
else:
154+
password_analysis = _analyze_password_age(hv, runas, meta, rel_path)
155+
156+
# Update row in place
157+
row["type"] = "PRIV"
158+
row["reason"] = reason
159+
row["password_analysis"] = password_analysis
160+
161+
return ClassificationResult(
162+
task_type="PRIV",
163+
reason=reason,
164+
password_analysis=password_analysis,
165+
should_include=True,
166+
)
167+
168+
# Regular task - still analyze password age if credentials are stored
169+
password_analysis = None
170+
if hv and hv.loaded and has_stored_creds:
171+
password_analysis = _analyze_password_age(hv, runas, meta, rel_path)
172+
173+
# Determine if we should include this regular task
174+
should_include = (
175+
looks_like_domain_user(runas)
176+
or has_stored_creds
177+
or (include_local and not looks_like_domain_user(runas))
178+
)
179+
180+
if should_include:
181+
row["password_analysis"] = password_analysis
182+
183+
return ClassificationResult(
184+
task_type="TASK",
185+
reason=None,
186+
password_analysis=password_analysis,
187+
should_include=should_include,
188+
)

0 commit comments

Comments
 (0)