Skip to content

Commit c4a9793

Browse files
author
r0BIT
committed
feat: Add TaskRow dataclass replacing Dict[str, Any] for type safety
- Create taskhound/models/ package with TaskType enum and TaskRow dataclass - TaskRow has 27 typed fields with from_meta(), failure(), to_dict() methods - Integrate TaskRow into engine.py (remove _build_row function) - Update classification.py to use TaskRow attribute access - Update output/summary.py and writer.py to support TaskRow serialization - Add cred_* fields to CSV export for credential validation data - Add 34 new unit tests for TaskRow (223 total tests passing) - Live test verified: JSON/CSV export working correctly
1 parent 3ad75df commit c4a9793

File tree

10 files changed

+664
-139
lines changed

10 files changed

+664
-139
lines changed

taskhound/classification.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
# PRIV (high-value), or TASK (normal) based on the runas account.
66

77
from dataclasses import dataclass
8-
from typing import Any, Dict, Optional, Tuple
8+
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
99

1010
from .utils.logging import warn
1111
from .utils.sid_resolver import looks_like_domain_user
1212

13+
if TYPE_CHECKING:
14+
from .models.task import TaskRow
15+
16+
from .models.task import TaskType
17+
1318

1419
@dataclass
1520
class ClassificationResult:
@@ -82,7 +87,7 @@ def _analyze_password_age(
8287

8388

8489
def classify_task(
85-
row: Dict[str, Any],
90+
row: "TaskRow",
8691
meta: Dict[str, Any],
8792
runas: str,
8893
rel_path: str,
@@ -97,7 +102,7 @@ def classify_task(
97102
used by both online and offline processing modes.
98103
99104
Args:
100-
row: Task row dict (modified in place with type/reason/password_analysis)
105+
row: TaskRow instance (modified in place with type/reason/password_analysis)
101106
meta: Parsed task XML metadata
102107
runas: The account the task runs as
103108
rel_path: Task path for display/warnings
@@ -108,8 +113,8 @@ def classify_task(
108113
Returns:
109114
ClassificationResult with task_type, reason, password_analysis, should_include
110115
"""
111-
has_no_saved_creds = row.get("credentials_hint") == "no_saved_credentials"
112-
has_stored_creds = row.get("credentials_hint") == "stored_credentials"
116+
has_no_saved_creds = row.credentials_hint == "no_saved_credentials"
117+
has_stored_creds = row.credentials_hint == "stored_credentials"
113118

114119
# Skip tasks without saved credentials unless user explicitly requested them
115120
if has_no_saved_creds and not show_unsaved_creds:
@@ -132,9 +137,9 @@ def classify_task(
132137
password_analysis = _analyze_password_age(hv, runas, meta, rel_path)
133138

134139
# Update row in place
135-
row["type"] = "TIER-0"
136-
row["reason"] = reason
137-
row["password_analysis"] = password_analysis
140+
row.task_type = TaskType.TIER0
141+
row.reason = reason
142+
row.password_analysis = password_analysis
138143

139144
return ClassificationResult(
140145
task_type="TIER-0",
@@ -154,9 +159,9 @@ def classify_task(
154159
password_analysis = _analyze_password_age(hv, runas, meta, rel_path)
155160

156161
# Update row in place
157-
row["type"] = "PRIV"
158-
row["reason"] = reason
159-
row["password_analysis"] = password_analysis
162+
row.task_type = TaskType.PRIV
163+
row.reason = reason
164+
row.password_analysis = password_analysis
160165

161166
return ClassificationResult(
162167
task_type="PRIV",
@@ -178,7 +183,7 @@ def classify_task(
178183
)
179184

180185
if should_include:
181-
row["password_analysis"] = password_analysis
186+
row.password_analysis = password_analysis
182187

183188
return ClassificationResult(
184189
task_type="TASK",

taskhound/engine.py

Lines changed: 48 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
LAPSFailure,
2020
get_laps_credential_for_host,
2121
)
22+
from .models.task import TaskRow
2223
from .output.printer import format_block
2324
from .parsers.highvalue import HighValueLoader
2425
from .parsers.task_xml import parse_task_xml
@@ -190,7 +191,7 @@ def _process_offline_host(
190191
hv: Optional[HighValueLoader],
191192
show_unsaved_creds: bool,
192193
include_local: bool,
193-
all_rows: List[Dict],
194+
all_rows: List[TaskRow],
194195
debug: bool,
195196
no_ldap: bool = False,
196197
concise: bool = False,
@@ -243,7 +244,7 @@ def _process_offline_host(
243244
what = f"{what} {meta.get('arguments')}"
244245

245246
# For offline processing, target_ip is not applicable (already offline)
246-
row = _build_row(hostname, rel_path, meta, target_ip=None)
247+
row = TaskRow.from_meta(hostname, rel_path, meta, target_ip=None)
247248

248249
# Determine if the task stores credentials or runs with token/S4U (no saved credentials)
249250
logon_type = (meta.get("logon_type") or "").strip()
@@ -253,7 +254,7 @@ def _process_offline_host(
253254
"interactivetokenorpassword",
254255
)
255256
if no_saved_creds:
256-
row["credentials_hint"] = "no_saved_credentials"
257+
row.credentials_hint = "no_saved_credentials"
257258

258259
# Use shared classification logic
259260
result = classify_task(
@@ -373,58 +374,10 @@ def get_block_priority(block):
373374
return result
374375

375376

376-
def _build_row(
377-
host: str, rel_path: str, meta: Dict[str, str], target_ip: Optional[str] = None, computer_sid: Optional[str] = None
378-
) -> Dict[str, Optional[str]]:
379-
# Create a structured dict for CSV/JSON export representing a task.
380-
#
381-
# Keeps the same keys used by the writer so rows can be dumped directly.
382-
# Now includes both FQDN (host) and IP address (target_ip) for flexibility.
383-
# Also stores computer_sid for efficient BloodHound lookups without LDAP.
384-
385-
# Determine credentials hint based on logon type
386-
logon_type_raw = meta.get("logon_type")
387-
logon_type = logon_type_raw.strip().lower() if logon_type_raw else ""
388-
if logon_type == "password":
389-
credentials_hint = "stored_credentials"
390-
elif logon_type in ("interactive", "interactivetoken", "s4u"):
391-
credentials_hint = "no_saved_credentials"
392-
else:
393-
credentials_hint = None
394-
395-
return {
396-
"host": host,
397-
"target_ip": target_ip, # Store the original target (IP or hostname)
398-
"computer_sid": computer_sid, # Computer account SID from SMB (enables SID-based lookups)
399-
"path": rel_path,
400-
"type": "TASK",
401-
"runas": meta.get("runas"),
402-
"command": meta.get("command"),
403-
"arguments": meta.get("arguments"),
404-
"author": meta.get("author"),
405-
"date": meta.get("date"),
406-
"logon_type": meta.get("logon_type"),
407-
"enabled": meta.get("enabled"),
408-
"trigger_type": meta.get("trigger_type"),
409-
"start_boundary": meta.get("start_boundary"),
410-
"interval": meta.get("interval"),
411-
"duration": meta.get("duration"),
412-
"days_interval": meta.get("days_interval"),
413-
"reason": None,
414-
"credentials_hint": credentials_hint,
415-
# Credential validation fields (populated when --validate-creds is used)
416-
"cred_status": None, # valid, valid_restricted, invalid, blocked, unknown
417-
"cred_password_valid": None, # True/False - key field for DPAPI feasibility
418-
"cred_hijackable": None, # True/False - can the task be hijacked?
419-
"cred_last_run": None, # datetime of last run
420-
"cred_return_code": None, # hex return code from last execution
421-
"cred_detail": None, # human-readable detail
422-
}
423-
424377

425378
def process_target(
426379
target: str,
427-
all_rows: List[Dict],
380+
all_rows: List[TaskRow],
428381
*,
429382
auth: AuthContext,
430383
include_ms: bool = False,
@@ -536,12 +489,11 @@ def process_target(
536489
# No LAPS password for this host - skip target
537490
warn(laps_failure.message)
538491
status(f"[Collecting] {target} [-] (No LAPS password)")
539-
all_rows.append({
540-
"host": discovered_hostname,
541-
"target_ip": target,
542-
"type": "FAILURE",
543-
"reason": f"LAPS: {laps_failure.failure_type}"
544-
})
492+
all_rows.append(TaskRow.failure(
493+
discovered_hostname,
494+
f"LAPS: {laps_failure.failure_type}",
495+
target_ip=target,
496+
))
545497
try:
546498
smb.close()
547499
except Exception:
@@ -574,12 +526,11 @@ def process_target(
574526
laps_user_tried=laps_cred.username,
575527
laps_type_tried=laps_cred.laps_type,
576528
)
577-
all_rows.append({
578-
"host": discovered_hostname,
579-
"target_ip": target,
580-
"type": "FAILURE",
581-
"reason": f"LAPS auth failed: {e}"
582-
})
529+
all_rows.append(TaskRow.failure(
530+
discovered_hostname,
531+
f"LAPS auth failed: {e}",
532+
target_ip=target,
533+
))
583534
try:
584535
smb.close()
585536
except Exception:
@@ -655,11 +606,10 @@ def process_target(
655606
traceback.print_exc()
656607
msg = str(e)
657608
status(f"[Collecting] {target} [-] ({msg})")
658-
all_rows.append({
659-
"host": target,
660-
"type": "FAILURE",
661-
"reason": f"SMB connection failed: {msg}"
662-
})
609+
all_rows.append(TaskRow.failure(
610+
target,
611+
f"SMB connection failed: {msg}",
612+
))
663613
if "STATUS_MORE_PROCESSING_REQUIRED" in msg:
664614
warn(f"{target}: Kerberos auth failed (SPN not found?). Try using FQDNs or switch to NTLM (-k off).")
665615
else:
@@ -688,12 +638,11 @@ def process_target(
688638
laps_user_tried=laps_cred.username if laps_cache else None,
689639
laps_type_tried=laps_type_used,
690640
)
691-
all_rows.append({
692-
"host": discovered_hostname or target,
693-
"target_ip": target,
694-
"type": "FAILURE",
695-
"reason": "Remote UAC (token filtered)"
696-
})
641+
all_rows.append(TaskRow.failure(
642+
discovered_hostname or target,
643+
"Remote UAC (token filtered)",
644+
target_ip=target,
645+
))
697646
return out_lines, laps_failure
698647
else:
699648
warn(f"{target}: Local admin check failed")
@@ -709,22 +658,20 @@ def process_target(
709658
if debug:
710659
traceback.print_exc()
711660
status(f"[Collecting] {target} [-] (Access Denied)")
712-
all_rows.append({
713-
"host": target,
714-
"type": "FAILURE",
715-
"reason": "Access Denied (Failed to crawl tasks)"
716-
})
661+
all_rows.append(TaskRow.failure(
662+
target,
663+
"Access Denied (Failed to crawl tasks)",
664+
))
717665
warn(f"{target}: Failed to Crawl Tasks. Skipping... (Are you Local Admin?)")
718666
return out_lines, laps_result
719667
except Exception as e:
720668
if debug:
721669
traceback.print_exc()
722670
status(f"[Collecting] {target} [-] ({e})")
723-
all_rows.append({
724-
"host": target,
725-
"type": "FAILURE",
726-
"reason": f"Crawling failed: {e}"
727-
})
671+
all_rows.append(TaskRow.failure(
672+
target,
673+
f"Crawling failed: {e}",
674+
))
728675
warn(f"{target}: Unexpected error while crawling tasks: {e}")
729676
return out_lines, laps_result
730677

@@ -902,7 +849,7 @@ def process_target(
902849
# Use resolved FQDN as host, keep original target as IP
903850
# This ensures BloodHound gets proper FQDNs even when connecting via IP
904851
hostname = server_fqdn if server_fqdn else target
905-
row = _build_row(hostname, rel_path, meta, target_ip=target, computer_sid=server_sid)
852+
row = TaskRow.from_meta(hostname, rel_path, meta, target_ip=target, computer_sid=server_sid)
906853

907854
# Enrich row with credential validation data if available
908855
# Task paths need normalization: SMB uses "TaskName", RPC uses "\TaskName"
@@ -913,26 +860,26 @@ def process_target(
913860

914861
task_run_info = cred_validation_results.get(rpc_path) or cred_validation_results.get(rpc_path_alt)
915862
if task_run_info:
916-
row["cred_status"] = task_run_info.credential_status.value
917-
row["cred_password_valid"] = task_run_info.password_valid
918-
row["cred_hijackable"] = task_run_info.task_hijackable
919-
row["cred_last_run"] = task_run_info.last_run.isoformat() if task_run_info.last_run else None
920-
row["cred_return_code"] = f"0x{task_run_info.return_code:08X}" if task_run_info.return_code is not None else None
863+
row.cred_status = task_run_info.credential_status.value
864+
row.cred_password_valid = task_run_info.password_valid
865+
row.cred_hijackable = task_run_info.task_hijackable
866+
row.cred_last_run = task_run_info.last_run.isoformat() if task_run_info.last_run else None
867+
row.cred_return_code = f"0x{task_run_info.return_code:08X}" if task_run_info.return_code is not None else None
921868
# Build human-readable detail
922869
if task_run_info.password_valid:
923870
if task_run_info.task_hijackable:
924-
row["cred_detail"] = "Password VALID - task can be hijacked"
871+
row.cred_detail = "Password VALID - task can be hijacked"
925872
else:
926-
row["cred_detail"] = f"Password VALID but restricted ({task_run_info.credential_status.value})"
873+
row.cred_detail = f"Password VALID but restricted ({task_run_info.credential_status.value})"
927874
elif task_run_info.credential_status == CredentialStatus.INVALID:
928-
row["cred_detail"] = "Password INVALID - DPAPI dump not viable"
875+
row.cred_detail = "Password INVALID - DPAPI dump not viable"
929876
elif task_run_info.credential_status == CredentialStatus.BLOCKED:
930-
row["cred_detail"] = "Account blocked/expired - DPAPI dump not viable"
877+
row.cred_detail = "Account blocked/expired - DPAPI dump not viable"
931878
else:
932-
row["cred_detail"] = f"Unknown status (code: {row['cred_return_code']})"
879+
row.cred_detail = f"Unknown status (code: {row.cred_return_code})"
933880

934881
# Add Credential Guard status to each row
935-
row["credential_guard"] = credguard_status
882+
row.credential_guard = credguard_status
936883
# Determine if the task stores credentials or runs with token/S4U (no saved credentials)
937884
logon_type = (meta.get("logon_type") or "").strip()
938885
no_saved_creds = (not logon_type) or logon_type.lower() in (
@@ -941,9 +888,9 @@ def process_target(
941888
"interactivetokenorpassword",
942889
)
943890
if no_saved_creds:
944-
row["credentials_hint"] = "no_saved_credentials"
891+
row.credentials_hint = "no_saved_credentials"
945892
elif logon_type.lower() == "password":
946-
row["credentials_hint"] = "stored_credentials"
893+
row.credentials_hint = "stored_credentials"
947894

948895
# Use shared classification logic
949896
result = classify_task(
@@ -989,7 +936,7 @@ def process_target(
989936
meta=meta,
990937
decrypted_creds=decrypted_creds,
991938
concise=concise,
992-
cred_validation=row if row.get("cred_status") else None,
939+
cred_validation=row.to_dict() if row.cred_status else None,
993940
)
994941
)
995942
priv_count += 1
@@ -1022,7 +969,7 @@ def process_target(
1022969
meta=meta,
1023970
decrypted_creds=decrypted_creds,
1024971
concise=concise,
1025-
cred_validation=row if row.get("cred_status") else None,
972+
cred_validation=row.to_dict() if row.cred_status else None,
1026973
)
1027974
)
1028975

taskhound/models/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Data models for TaskHound.
2+
#
3+
# This package contains dataclasses and type definitions for
4+
# structured data used throughout the application.
5+
6+
from .task import TaskRow, TaskType
7+
8+
__all__ = ["TaskRow", "TaskType"]

0 commit comments

Comments
 (0)