-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathauto_exploiter.py
391 lines (332 loc) · 13.7 KB
/
auto_exploiter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
import os
import time
from itertools import combinations
from exploiter.exploiter import Exploiter
from fuzzlog.fuzzlog import Crash, FuzzedPkt, FuzzLog
from utils import ae_logger, calc_file_sha256, count_mut_dup
class AutoExploiter:
"""
AutoExploiter class contains the logic to run and coordinate the generated exploits.
Attributes
----------
fuzzlog : FuzzLog
The fuzzing logs of a device
exploiter : Exploiter
The device-specific exploiter
session_id : str
Session ID to identify the different session
max_fuzzed_pkts : int
Number of fuzzed packets allowed inside one exploit script
min_trial_pkts : int
The minimum number of packets in the fuzzing logs that needs to be tested
min_trial_iter : int
The minimum number of iterations in the fuzzing logs that needs to be tested
max_trial_time : int
The maximum experiment time
enable_mutation : bool
Whether to include the mutated packets inside exploit script
enable_duplication : bool
Whether to include the duplicated packets inside exploit script
enable_flooding : bool
Whether to include the flooding packets inside exploit script
"""
def __init__(
self,
*,
fuzzlog: FuzzLog,
exploiter: Exploiter,
session_id: str,
max_fuzzed_pkts: int,
min_trial_pkts: int,
min_trial_iter: int,
max_trial_time: int,
enable_mutation: bool,
enable_duplication: bool,
enable_flooding: bool,
) -> None:
self.fuzzlog = fuzzlog
self.exploiter = exploiter
self.session_id = session_id
self.max_fuzzed_pkts = max_fuzzed_pkts
self.min_trial_pkts = min_trial_pkts
self.min_trial_iter = min_trial_iter
self.max_trial_time = max_trial_time
self.enable_mutation = enable_mutation
self.enable_duplication = enable_duplication
self.enable_flooding = enable_flooding
self.crash_info_by_exploit_hash: dict[str, tuple[list[str], str]] = (
{}
) # {"8a3de1": (crash_ids, exploit_path )}
def cause_is_flooding(self, crash: Crash) -> bool:
"""
Check if one crash belong to flooding crash type.
Flooding exploit script is different from duplication and mutation exploit script.
A crash might be caused by flooding if there are many identical packets before it
that are duplicated.
Parameters
----------
crash : Crash
Returns
-------
is_flooding : bool
"""
dup_count = 0
mut_count = 0
for pkt in reversed(crash.fuzzed_pkts):
if crash.iteration - pkt.iteration > 30:
# TODO:
break
if pkt.type == "duplication":
dup_count += 1
else:
mut_count += 1
if dup_count / (mut_count + dup_count) < 0.8: # TODO: configurable
return False
return True
def flooding_exploit_script_generator(self, crash: Crash):
"""
Placeholder function for flooding script generator.
TODO: Consider clean/remove this function.
Parameters
----------
crash : Crash
The crash which the flooding exploits are generated based on
Yields
------
exploit_name : str
Name of the generated flooding exploit
exploit_path : str
Path to the generated flooding exploit
"""
return self.exploiter.flood_script_generator(crash)
def exploit_script_generator(self, crash_group: list[Crash]):
"""
Exploit scripts generator, including mutation, duplication and flooding.
Parameters
----------
crash_group : list[Crash]
Crash group, where crash group refers to a list of crashes that are classified as the same.
Returns
-------
exploit_name : str
Name of the generated exploit
exploit_path : str
Path to the generated exploit
mut_count : int
Number of mutated packets inside the generated exploit
dup_count : int
Number of duplicated packets inside the generated exploit
See Also
--------
`flooding_exploit_script_generator`
"""
# time limit is for crash_group
start_time = time.time()
for crash in crash_group:
if self.enable_flooding and self.cause_is_flooding(crash):
for i, j in self.flooding_exploit_script_generator(crash):
yield i, j, 0, 0
continue
experiment_pkts: list[FuzzedPkt] = []
# TODO: ensure the sort is correct by sorting by loc
for fuzzed_pkt in reversed(crash.fuzzed_pkts):
if (not self.enable_mutation) and fuzzed_pkt.type == "mutation":
continue
if (not self.enable_duplication) and fuzzed_pkt.type == "duplication":
continue
if len(experiment_pkts) >= self.min_trial_pkts and (
crash.iteration - fuzzed_pkt.iteration >= self.min_trial_iter
):
break
experiment_pkts.append(fuzzed_pkt)
for num_fuzzed_pkts in range(1, self.max_fuzzed_pkts + 1):
for comb in combinations(range(len(experiment_pkts)), num_fuzzed_pkts):
if time.time() - start_time > self.max_trial_time:
ae_logger.error(
f"Maximum trial time has reached for crash located at {crash.loc}."
)
return
trial_pkts = [experiment_pkts[i] for i in comb]
exploit_name, exploit_path = self.exploiter.gen_script(trial_pkts)
mut_count, dup_count = count_mut_dup(exploit_path)
yield exploit_name, exploit_path, mut_count, dup_count
def desired_crash_found(
self, crash_ids_to_check: list[str | None], desired_crash_group: list[Crash]
) -> bool:
"""
Check if the desired crash has been found.
Parameters
----------
crash_ids_to_check : list[str | None]
Crash identifiers to check if any of them are the desired crash identifier
desired_crash_group : list[Crash]
Target crash group
Returns
-------
desired_crash : bool
"""
for crash in desired_crash_group:
for crash_id_to_check in crash_ids_to_check:
if self.fuzzlog.is_same_crash_id(crash.identifier, crash_id_to_check):
return True
return False
def is_crash_group_in_prev_result(self, crash_group: list[Crash]):
"""
Check if crash identifiers inside crash_group can be found in the previous results.
Parameters
----------
crash_group : list[Crash]
Returns
-------
exploit_path : str
Corresponding exploit path or empty string if nothing is found from previous result
"""
for crash_ids, exploit_path in self.crash_info_by_exploit_hash.values():
if self.desired_crash_found(crash_ids, crash_group):
return exploit_path
return ""
def _is_crash_id_duplicate(
self, crash_id_to_check: str | None, crash_ids: list[str | None]
):
"""
Check if a crash identifier can be found in another list of crash identifiers.
Parameters
----------
crash_id_to_check : str | None
Crash identifier to check
crash_ids : list[str | None]
List of crash identifiers to compare
Returns
-------
is_duplicate : bool
"""
for crash_id in crash_ids:
if self.fuzzlog.is_same_crash_id(crash_id_to_check, crash_id):
return True
return False
def run_exploit(self, exploit_name: str, exploit_path: str, target_crash_type: str):
"""
Exploit runner.
Parameters
----------
exploit_name : str
Name of the generated exploit
exploit_path : str
Path to the generated exploit
target_crash_type : str
Type of the target crash, possible values are `normal`, `timeout` and `flooding`
Returns
-------
num_crash_ids : int
Number of crash identifiers triggered in the exploit execution
crash_ids : list[str | None]
Crash identifiers triggered in the exploit execution
"""
crash_identifiers = []
max_try = 3 # TODO: configurable
ae_logger.debug(f"Run exploit: {exploit_name}")
for trial in range(max_try):
# trial here is used to gather all possible crash states
try:
crash_triggered, crash_identifier = self.exploiter.run_exploit_once(
exploit_name, exploit_path, target_crash_type
)
except Exception as e:
ae_logger.error(e, exc_info=True)
continue
if not crash_triggered:
break
if self._is_crash_id_duplicate(crash_identifier, crash_identifiers):
# Check if crash_identifier generated in this trial occurs previously.
# Check should be done by using fuzzlog crash compare function instead of vanilla string comparison.
break
crash_identifiers.append(crash_identifier)
if len(crash_identifiers) > 0:
ae_logger.info(f"Exploit {exploit_name} triggers crash: {crash_identifiers}")
return len(crash_identifiers) > 0, crash_identifiers
def run(self):
"""
Logic for running and coordinating exploits.
"""
# analysis stat variables
mut_total = 0
mut_max = 0
dup_total = 0
dup_max = 0
num_crash = 0
trial_num = 0
# Indicate which crash group has been reproduced
crash_group_reproduced = [False] * len(self.fuzzlog.grouped_crashes)
start_time = time.time()
for crash_group_idx, crash_group in enumerate(self.fuzzlog.grouped_crashes):
ae_logger.info(
f"Auto exploit for {[crash.loc for crash in crash_group]}, expected: {crash_group[0].identifier}"
)
# Skip if crash_group identifiers can be found in previous results
if (prev_exploit_path := self.is_crash_group_in_prev_result(crash_group)) != "":
ae_logger.info(
f"Same crash found for {[crash.loc for crash in crash_group]} using {prev_exploit_path} from previous results."
)
crash_group_reproduced[crash_group_idx] = True
continue
crash_ever_happened = False
for (
exploit_name,
exploit_path,
mut_count,
dup_count,
) in self.exploit_script_generator(crash_group):
# Skip exploit if already run before by checking file hash
exploit_hash = calc_file_sha256(exploit_path)
if exploit_hash in self.crash_info_by_exploit_hash:
ae_logger.info(f"Skipped {exploit_path}.")
os.remove(exploit_path)
continue
trial_num += 1
mut_total += mut_count
mut_max = max(mut_max, mut_count)
dup_total += dup_count
dup_max = max(dup_max, dup_count)
crash_found, crash_ids = self.run_exploit(
exploit_name, exploit_path, crash_group[0].type
)
# Store result
self.crash_info_by_exploit_hash[exploit_hash] = (
crash_ids,
exploit_path,
)
if not crash_found:
continue
crash_ever_happened = True
num_crash += len(crash_ids)
if self.desired_crash_found(crash_ids, crash_group):
ae_logger.info(
f"Same crash found for {[crash.loc for crash in crash_group]} using {exploit_path}"
)
crash_group_reproduced[crash_group_idx] = True
break
if not crash_ever_happened:
ae_logger.info(
f"No crash ever happened when running for {[crash.loc for crash in crash_group]}"
)
# Check the crash groups that have not been reproduced against the results from the whole session
# to see if any later exploits can reproduce the earlier crash group.
ae_logger.info(
"AirBugCatcher has finished. Now check crash groups that are not reproduced."
)
for crash_group_idx, reproduced in enumerate(crash_group_reproduced):
if reproduced:
continue
crash_group = self.fuzzlog.grouped_crashes[crash_group_idx]
if (prev_exploit_path := self.is_crash_group_in_prev_result(crash_group)) != "":
ae_logger.info(
f"Same crash found for {[crash.loc for crash in crash_group]} using {prev_exploit_path}"
)
crash_group_reproduced[crash_group_idx] = True
ae_logger.info(f"Total time: {time.time() - start_time} seconds")
ae_logger.info(f"mut_total: {mut_total}")
ae_logger.info(f"mut_max: {mut_max}")
ae_logger.info(f"dup_total: {dup_total}")
ae_logger.info(f"dup_max: {dup_max}")
ae_logger.info(f"num_crash: {num_crash}")
ae_logger.info(f"trial_num: {trial_num}")