-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMergeNessus.py
More file actions
286 lines (251 loc) · 10.8 KB
/
MergeNessus.py
File metadata and controls
286 lines (251 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/usr/bin/env python3
"""
MergeNessus.py
Merge multiple .nessus files into a single .nessus file cleanly.
Features:
- Uses argparse for CLI options.
- -d / --directory: directory containing .nessus files (default: current directory).
- -o / --output: output path for merged .nessus (default: ./Merged.nessus).
- -t / --title: title for the merged <Report> (default: "Merged Scan").
- Prints a startup message with the number of identified .nessus files.
- Shows a live progress bar (alive-progress) with files/hosts/findings counters.
- Deduplicates per host by (pluginID, port, svc_name) across all files.
- Computes overall scan window from HOST_START / HOST_END and records it in <MergeMeta>.
- Reorders ReportHost entries by HOST_START so Nessus displays the correct scan window.
- Carries over <Policy> and <ServerPreferences>/<Preferences> from the first file that has them.
Requires:
pip install alive-progress
"""
import argparse
import glob
import os
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from datetime import datetime
try:
from alive_progress import alive_bar
except ImportError:
print("[!] Missing dependency: alive-progress. Install with: pip install alive-progress", file=sys.stderr)
sys.exit(1)
HOST_START_TAG = "HOST_START"
HOST_END_TAG = "HOST_END"
# Common Nessus time formats
TIME_FORMATS = [
"%a %b %d %H:%M:%S %Y", # e.g., Tue Jun 27 16:22:00 2023
"%Y-%m-%d %H:%M:%S", # fallback
]
def parse_time(s: str):
if not s:
return None
for fmt in TIME_FORMATS:
try:
return datetime.strptime(s, fmt)
except Exception:
continue
return None
def get_hostprops_tag(host_elem, tag_name):
"""Return text for <tag name="..."> within <HostProperties>."""
hp = host_elem.find("HostProperties")
if hp is None:
return None
for tag in hp.findall("tag"):
if tag.get("name") == tag_name:
return tag.text
return None
def ensure_hostproperties(host_elem):
hp = host_elem.find("HostProperties")
if hp is None:
hp = ET.SubElement(host_elem, "HostProperties")
return hp
def clone_element(elem: ET.Element) -> ET.Element:
"""Deep clone for ET Elements."""
new = ET.Element(elem.tag, attrib=dict(elem.attrib))
if elem.text:
new.text = elem.text
if elem.tail:
new.tail = elem.tail
for child in list(elem):
new.append(clone_element(child))
return new
def indent(elem, level=0):
"""Pretty-print XML in-place for readability."""
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
for child in list(elem):
indent(child, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
def main():
parser = argparse.ArgumentParser(
description="Merge .nessus files into a single .nessus file."
)
parser.add_argument(
"-d", "--directory",
default=".",
help="Directory containing .nessus files (default: current directory)."
)
parser.add_argument(
"-o", "--output",
default=os.path.join(".", "Merged.nessus"),
help="Output merged .nessus file path (default: ./Merged.nessus)."
)
parser.add_argument(
"-t", "--title",
default="Merged Scan",
help='Title for the merged Nessus <Report name="..."> (default: "Merged Scan").'
)
args = parser.parse_args()
search_dir = os.path.abspath(args.directory)
files = sorted(glob.glob(os.path.join(search_dir, "*.nessus")))
if not files:
print(f"[!] No .nessus files found in: {search_dir}", file=sys.stderr)
sys.exit(1)
# Initial script start message
plural = "file" if len(files) == 1 else "files"
print(f"[+] Starting merge process...")
print(f"[+] Found {len(files)} .nessus {plural} in: {search_dir}")
# Prepare merged XML structure
merged_root = ET.Element("NessusClientData_v2")
merged_policy = None
merged_prefs = None
merged_report = ET.SubElement(merged_root, "Report")
merged_report.set("name", args.title)
# Dedup structures
hosts_map = {} # host name -> ReportHost element in merged
host_item_keys = defaultdict(set) # host name -> set of (pluginID, port, svc_name)
# Track per-host time windows for reordering and global window
host_times = {} # host name -> (min_start, max_end)
earliest_start = None
latest_end = None
# Counters
total_findings = 0 # deduped per host, across all files
with alive_bar(len(files), title="Merging Nessus files") as bar:
for idx, fpath in enumerate(files, start=1):
try:
tree = ET.parse(fpath)
root = tree.getroot()
except Exception as e:
print(f"[!] Error parsing {fpath}: {e}", file=sys.stderr)
bar()
continue
# Copy Policy/Preferences once from first file that has them
if merged_policy is None:
policy = root.find("Policy")
if policy is not None:
merged_policy = clone_element(policy)
merged_root.insert(0, merged_policy)
if merged_prefs is None:
prefs = root.find("ServerPreferences")
if prefs is None:
prefs = root.find("Preferences")
if prefs is not None:
merged_prefs = clone_element(prefs)
insert_index = 1 if merged_policy is not None else 0
merged_root.insert(insert_index, merged_prefs)
# Merge ReportHosts
for report in root.findall("Report"):
for host in report.findall("ReportHost"):
name = host.get("name")
if not name:
continue
# Parse host time window
h_start = get_hostprops_tag(host, HOST_START_TAG)
h_end = get_hostprops_tag(host, HOST_END_TAG)
dt_start = parse_time(h_start) if h_start else None
dt_end = parse_time(h_end) if h_end else None
# Track global window
if dt_start and (earliest_start is None or dt_start < earliest_start):
earliest_start = dt_start
if dt_end and (latest_end is None or dt_end > latest_end):
latest_end = dt_end
# Update per-host min/max
if name in host_times:
cur_s, cur_e = host_times[name]
if dt_start and (cur_s is None or dt_start < cur_s):
cur_s = dt_start
if dt_end and (cur_e is None or dt_end > cur_e):
cur_e = dt_end
host_times[name] = (cur_s, cur_e)
else:
host_times[name] = (dt_start, dt_end)
# Merge items for this host
if name in hosts_map:
existing_host = hosts_map[name]
existing_keys = host_item_keys[name]
for item in host.findall("ReportItem"):
plugin_id = item.get("pluginID", "")
port = item.get("port", "")
svc = item.get("svc_name", "")
key = (plugin_id, port, svc)
if key not in existing_keys:
existing_host.append(clone_element(item))
existing_keys.add(key)
total_findings += 1
else:
# New host
new_host = ET.Element("ReportHost", attrib=dict(host.attrib))
# Clone HostProperties
hp = host.find("HostProperties")
if hp is not None:
new_host.append(clone_element(hp))
# Clone unique items
keys = set()
for item in host.findall("ReportItem"):
plugin_id = item.get("pluginID", "")
port = item.get("port", "")
svc = item.get("svc_name", "")
key = (plugin_id, port, svc)
if key not in keys:
new_host.append(clone_element(item))
keys.add(key)
total_findings += 1
hosts_map[name] = new_host
host_item_keys[name] = keys
merged_report.append(new_host)
bar.text = f"[files:{idx}/{len(files)}] hosts:{len(hosts_map)} findings:{total_findings}"
bar()
# === Reorder ReportHosts by HOST_START so Nessus displays correct scan window ===
if hosts_map:
def sort_key(hname):
dt_start, _ = host_times.get(hname, (None, None))
# Missing starts sort to the end
return (dt_start is None, dt_start or datetime.max, hname)
# Remove existing ReportHost children
for child in list(merged_report):
if child.tag == "ReportHost":
merged_report.remove(child)
# Append hosts sorted by earliest start time
ordered_names = sorted(hosts_map.keys(), key=sort_key)
for name in ordered_names:
merged_report.append(hosts_map[name])
# Add MergeMeta with overall scan window, if available
if earliest_start or latest_end:
meta = ET.SubElement(merged_report, "MergeMeta")
if earliest_start:
ET.SubElement(meta, "EarliestHostStart").text = earliest_start.strftime("%a %b %d %H:%M:%S %Y")
if latest_end:
ET.SubElement(meta, "LatestHostEnd").text = latest_end.strftime("%a %b %d %H:%M:%S %Y")
# Pretty-print XML
indent(merged_root)
# Ensure output dir exists and write file
out_path = os.path.abspath(args.output)
os.makedirs(os.path.dirname(out_path), exist_ok=True)
ET.ElementTree(merged_root).write(out_path, encoding="utf-8", xml_declaration=True)
# Summary
print(f"\nMerged {len(files)} file(s) -> {out_path}")
print(f"Unique hosts: {len(hosts_map)}")
print(f"Total findings (deduped per host): {total_findings}")
if earliest_start or latest_end:
print("Overall scan window:")
if earliest_start:
print(f" Earliest host start: {earliest_start}")
if latest_end:
print(f" Latest host end: {latest_end}")
if __name__ == "__main__":
main()