Skip to content

Commit 500b760

Browse files
committed
Add diagnostics
1 parent 7dbd015 commit 500b760

File tree

6 files changed

+482
-0
lines changed

6 files changed

+482
-0
lines changed

acquire/acquire.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from dissect.util.stream import RunlistStream
3232

3333
from acquire.collector import Collector, get_full_formatted_report, get_report_summary
34+
from acquire.diagnostics.common import diagnostics_info_json
3435
from acquire.dynamic.windows.named_objects import NamedObjectType
3536
from acquire.esxi import esxi_memory_context_manager
3637
from acquire.gui import GUI
@@ -2398,6 +2399,30 @@ def main() -> None:
23982399
exit_failure(args.config.get("arguments"))
23992400
exit_success(args.config.get("arguments"))
24002401

2402+
# Check for diagnostics argument here. Process and execute.
2403+
if args.diagnostics:
2404+
print("\nWARNING: Gathering diagnostics may destroy forensic evidence.")
2405+
confirm = input("Do you want to continue? [y/N]: ").strip().lower()
2406+
if confirm not in ("y", "yes"):
2407+
print("Aborted diagnostics.")
2408+
exit_success(args.config.get("arguments"))
2409+
try:
2410+
# Determine output path
2411+
if isinstance(args.diagnostics, str):
2412+
diag_path = Path(args.diagnostics)
2413+
elif log_file:
2414+
diag_path = Path(log_file).with_name(Path(log_file).stem + "_diag.json")
2415+
else:
2416+
diag_path = Path("diag.json")
2417+
2418+
diagnostics_info_json(diag_path)
2419+
log.info("Diagnostics written to file %s", diag_path.resolve())
2420+
except Exception:
2421+
acquire_gui.message("Failed to upload files")
2422+
log.exception("")
2423+
exit_failure(args.config.get("arguments"))
2424+
exit_success(args.config.get("arguments"))
2425+
24012426
target_paths = []
24022427
for target_path in args.targets:
24032428
target_path = args_to_uri([target_path], args.loader, rest)[0] if args.loader else target_path

acquire/diagnostics/__init__.py

Whitespace-only changes.

acquire/diagnostics/common.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import json
2+
import platform
3+
from pathlib import Path
4+
5+
6+
def diagnostics_info_json(output: Path) -> None:
7+
# Dynamic imports are required to avoid import errors on unsupported platforms
8+
system = platform.system().lower()
9+
if system == "windows":
10+
from .windows import diagnostics_info # noqa: PLC0415
11+
elif system == "linux":
12+
from .linux import diagnostics_info # noqa: PLC0415
13+
else:
14+
raise NotImplementedError(f"Diagnostics not implemented for OS: {system}")
15+
data = diagnostics_info()
16+
with output.open("w") as f:
17+
json.dump(data, f, default=str, indent=2)

acquire/diagnostics/linux.py

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import json
2+
import os
3+
import platform
4+
import stat
5+
import subprocess
6+
from pathlib import Path
7+
8+
9+
def check_sudo() -> bool:
10+
return os.geteuid() == 0
11+
12+
13+
def get_disk_info() -> dict:
14+
info = {}
15+
# RAID
16+
try:
17+
with Path("/proc/mdstat").open() as f:
18+
info["raid"] = f.read()
19+
except Exception as e:
20+
info["raid"] = f"Error: {e}"
21+
# LVM: Distinguish logical volumes and backing devices
22+
logical_volumes = []
23+
backing_devices = []
24+
lvm_error = None
25+
try:
26+
# Get logical volumes
27+
try:
28+
lvs_result = subprocess.run(
29+
["lvs", "--noheadings", "-o", "lv_path"], capture_output=True, text=True, check=False
30+
)
31+
if lvs_result.returncode == 0:
32+
logical_volumes = [line.strip() for line in lvs_result.stdout.splitlines() if line.strip()]
33+
else:
34+
lvm_error = f"lvs failed: {lvs_result.stderr.strip()}"
35+
except FileNotFoundError:
36+
lvm_error = "lvs not found"
37+
# Get backing devices
38+
try:
39+
pvs_result = subprocess.run(
40+
["pvs", "--noheadings", "-o", "pv_name"], capture_output=True, text=True, check=False
41+
)
42+
if pvs_result.returncode == 0:
43+
backing_devices = [line.strip() for line in pvs_result.stdout.splitlines() if line.strip()]
44+
else:
45+
lvm_error = (lvm_error or "") + f"; pvs failed: {pvs_result.stderr.strip()}"
46+
except FileNotFoundError:
47+
lvm_error = (lvm_error or "") + "; pvs not found"
48+
except Exception as e:
49+
lvm_error = f"Error: {e}"
50+
info["lvm"] = {
51+
"logical_volumes": logical_volumes,
52+
"backing_devices": backing_devices,
53+
"error": lvm_error,
54+
}
55+
56+
luks_devices = []
57+
# Check /dev/mapper for dm-crypt devices
58+
try:
59+
luks_devices.extend(
60+
[
61+
Path("/dev/mapper") / entry.name
62+
for entry in Path("/dev/mapper").iterdir()
63+
if entry.name.startswith(("dm_crypt", "crypt"))
64+
]
65+
)
66+
except Exception as e:
67+
luks_devices.append(f"Error: {e}")
68+
# Parse /etc/crypttab for configured LUKS devices
69+
try:
70+
if Path("/etc/crypttab").exists():
71+
with Path("/etc/crypttab").open() as f:
72+
for line in f:
73+
line = line.strip()
74+
if line and not line.startswith("#"):
75+
parts = line.split()
76+
if len(parts) > 1:
77+
luks_devices.append(parts[1])
78+
except Exception as e:
79+
luks_devices.append(f"Error: {e}")
80+
info["luks"] = luks_devices
81+
return info
82+
83+
84+
def walk_dev() -> list[str]:
85+
dev_tree = []
86+
for root, _, files in os.walk("/dev"):
87+
for name in files:
88+
dev_path = str(Path(root) / name)
89+
info = {"path": dev_path}
90+
try:
91+
st = Path.stat(dev_path)
92+
if stat.S_ISBLK(st.st_mode):
93+
info["type"] = "block"
94+
elif stat.S_ISCHR(st.st_mode):
95+
info["type"] = "char"
96+
else:
97+
info["type"] = "other"
98+
info["major"] = os.major(st.st_rdev)
99+
info["minor"] = os.minor(st.st_rdev)
100+
info["mode"] = oct(st.st_mode)
101+
info["owner"] = st.st_uid
102+
info["group"] = st.st_gid
103+
if info["type"] == "block":
104+
try:
105+
blkid = subprocess.run(["blkid", dev_path], capture_output=True, text=True)
106+
blkid_str = blkid.stdout.strip()
107+
info["blkid"] = parse_blkid_output(blkid_str) if blkid_str else None
108+
except Exception:
109+
info["blkid"] = None
110+
except Exception as e:
111+
info["error"] = str(e)
112+
dev_tree.append(info)
113+
return dev_tree
114+
115+
116+
def get_dmesg() -> list[str]:
117+
try:
118+
with os.popen("dmesg | tail -n 100") as f:
119+
return f.read().splitlines()
120+
except Exception as e:
121+
return [f"Error: {e}"]
122+
123+
124+
def get_hardware_info() -> dict:
125+
info = {}
126+
# CPU
127+
try:
128+
with Path("/proc/cpuinfo").open() as f:
129+
cpuinfo_raw = f.read()
130+
# Parse into list of dicts (one per processor)
131+
cpu_blocks = cpuinfo_raw.strip().split("\n\n")
132+
cpuinfo = []
133+
for block in cpu_blocks:
134+
cpu = parse_key_value_lines(block.splitlines())
135+
if cpu:
136+
cpuinfo.append(cpu)
137+
info["cpuinfo"] = cpuinfo
138+
except Exception as e:
139+
info["cpuinfo"] = {"error": str(e)}
140+
# Memory
141+
try:
142+
with Path("/proc/meminfo").open() as f:
143+
meminfo_raw = f.read()
144+
meminfo = parse_key_value_lines(meminfo_raw.splitlines())
145+
info["meminfo"] = meminfo
146+
except Exception as e:
147+
info["meminfo"] = {"error": str(e)}
148+
# DMI
149+
dmi_path = Path("/sys/class/dmi/id/")
150+
dmi_info = {}
151+
if dmi_path.is_dir():
152+
for fpath in dmi_path.iterdir():
153+
if fpath.is_file() and os.access(fpath, os.R_OK):
154+
with fpath.open() as f:
155+
dmi_info[fpath.name] = f.read().strip()
156+
info["dmi"] = dmi_info
157+
return info
158+
159+
160+
def get_os_info() -> dict:
161+
info = {}
162+
try:
163+
with Path("/etc/os-release").open() as f:
164+
info["os-release"] = f.read()
165+
except Exception as e:
166+
info["os-release"] = f"Error: {e}"
167+
info["platform"] = platform.platform()
168+
info["uname"] = platform.uname()
169+
return info
170+
171+
172+
def diagnostics_info() -> dict:
173+
info = {}
174+
info["running_as_root"] = check_sudo()
175+
info["disk_info"] = get_disk_info()
176+
devs = walk_dev()
177+
info["devices"] = devs
178+
info["dmesg"] = get_dmesg()
179+
info["hardware_info"] = get_hardware_info()
180+
info["os_info"] = get_os_info()
181+
return info
182+
183+
184+
def diagnostics_info_json(output: Path) -> None:
185+
data = diagnostics_info()
186+
with output.open("w") as f:
187+
json.dump(data, f, default=str, indent=2)
188+
189+
190+
def parse_key_value_lines(lines: list[str]) -> dict[str, str]:
191+
dev_tree = []
192+
for root, _, files in os.walk("/dev"):
193+
for name in files:
194+
path_obj = Path(root) / name
195+
dev_path = str(path_obj)
196+
info = {"path": dev_path}
197+
try:
198+
st = path_obj.stat()
199+
if stat.S_ISBLK(st.st_mode):
200+
info["type"] = "block"
201+
elif stat.S_ISCHR(st.st_mode):
202+
info["type"] = "char"
203+
else:
204+
info["type"] = "other"
205+
info["major"] = os.major(st.st_rdev)
206+
info["minor"] = os.minor(st.st_rdev)
207+
info["mode"] = oct(st.st_mode)
208+
info["owner"] = st.st_uid
209+
info["group"] = st.st_gid
210+
if info["type"] == "block":
211+
try:
212+
blkid = subprocess.run(["blkid", dev_path], capture_output=True, text=True)
213+
info["blkid"] = blkid.stdout.strip()
214+
except Exception:
215+
info["blkid"] = None
216+
except Exception as e:
217+
info["error"] = str(e)
218+
dev_tree.append(info)
219+
return dev_tree
220+
221+
222+
def parse_blkid_output(blkid_str: str) -> dict:
223+
"""Parse blkid output string into a dictionary of key-value pairs."""
224+
# Example: /dev/sda1: UUID="abcd-1234" TYPE="ext4" PARTUUID="efgh-5678"
225+
parts = blkid_str.split(None, 1)
226+
blkid_info = {}
227+
if len(parts) == 2:
228+
for item in parts[1].split():
229+
if "=" in item:
230+
k, v = item.split("=", 1)
231+
blkid_info[k] = v.strip('"')
232+
return blkid_info

0 commit comments

Comments
 (0)