-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline.py
More file actions
97 lines (86 loc) · 3.91 KB
/
pipeline.py
File metadata and controls
97 lines (86 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""The Arash main pipeline."""
import argparse
from typing import Optional
import logger
from agent.base_agent import BaseAgent
from results import AnaResult, FixResult, PreResult, Result
from stage.analysis_stage import AnalysisStage
from stage.prep_stage import PrepareStage
from stage.repair_stage import RepairStage
class Pipeline():
"""The Arash main pipeline, consisting of three stages:
1. Preparation Stage sets up an environment and reproduces the crash.
2. Analysis Stage analyzes the root cause behind the crash.
3. Repair Stage fixes the fuzz target and validate."""
def __init__(self,
ben_name: str,
trial: int,
args: argparse.Namespace,
analysis_stage_agents: Optional[list[BaseAgent]] = None,
repair_stage_agents: Optional[list[BaseAgent]] = None):
self.ben_name = ben_name
self.trial = trial
self.args = args
self.logger = logger.get_trial_logger(ben_name=ben_name, trial=trial)
self.prep_stage: PrepareStage = PrepareStage(args, ben_name, trial)
self.analysis_stage: AnalysisStage = AnalysisStage(args, ben_name, trial,
analysis_stage_agents)
self.repair_stage: RepairStage = RepairStage(args, ben_name, trial,
repair_stage_agents)
logger.debug('Pipeline Initialized', ben_name=ben_name, trial=trial)
def execute(self, result_history: list[Result]) -> list[Result]:
"""Executes the pipeline."""
logger.debug('Pipeline Starts', ben_name=self.ben_name, trial=self.trial)
result_history.append(
self.prep_stage.execute(result_history=result_history))
if (not isinstance(result_history[-1], PreResult) or
not result_history[-1].success):
logger.error('Pipeline failed at preparation stage',
ben_name=self.ben_name,
trial=self.trial)
self.logger.write_result(
result_status_dir=result_history[-1].work_dirs.status,
result=result_history[-1])
return result_history
result_history.append(
self.analysis_stage.execute(result_history=result_history))
if (not isinstance(result_history[-1], AnaResult) or
not result_history[-1].success):
logger.error('Pipeline failed at analysis stage',
ben_name=self.ben_name,
trial=self.trial)
self.logger.write_result(
result_status_dir=result_history[-1].work_dirs.status,
result=result_history[-1])
return result_history
if isinstance(result_history[-1],
AnaResult) and result_history[-1].true_bug is True:
logger.info('Pipeline ends early due to true bug',
ben_name=self.ben_name,
trial=self.trial)
self.logger.write_result(
result_status_dir=result_history[-1].work_dirs.status,
result=result_history[-1])
return result_history
result_history.append(
self.repair_stage.execute(result_history=result_history))
if (not isinstance(result_history[-1], FixResult) or
not result_history[-1].success):
self.logger.write_result(
result_status_dir=result_history[-2].work_dirs.status,
result=result_history[-2])
self.logger.write_result(
result_status_dir=result_history[-1].work_dirs.status,
result=result_history[-1])
logger.error('Pipeline failed at repair stage',
ben_name=self.ben_name,
trial=self.trial)
return result_history
self.logger.write_result(
result_status_dir=result_history[-2].work_dirs.status,
result=result_history[-2])
self.logger.write_result(
result_status_dir=result_history[-1].work_dirs.status,
result=result_history[-1])
logger.debug('Pipeline Ends', ben_name=self.ben_name, trial=self.trial)
return result_history