Skip to content

Commit 614e677

Browse files
author
Alan
authored
Fix bugs and issues found with static type checking (#307)
* Fix issues found with static type checking * Cleanup for CI, finalize for majority of Python code * Update CI configuration for static checking
1 parent f5bdb72 commit 614e677

File tree

11 files changed

+202
-158
lines changed

11 files changed

+202
-158
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ jobs:
2020
- name: Install dependencies
2121
run: |
2222
python -m pip install --upgrade pip
23-
pip install pyflakes
23+
pip install pyflakes mypy
2424
- name: Lint
2525
run: |
2626
pyflakes bin/deepstate/*.py
2727
pyflakes tests/*.py
28+
mypy --strict-optional --config-file mypy.ini bin/deepstate
2829
build:
2930
strategy:
3031
matrix:

bin/deepstate/core/base.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import argparse
2121
import configparser
2222

23-
from typing import Dict, ClassVar, Optional, Union, List
23+
from typing import Dict, ClassVar, Optional, Union, List, Any
2424

2525

2626
L = logging.getLogger("deepstate.core.base")
@@ -41,7 +41,7 @@ class AnalysisBackend(object):
4141
COMPILER: ClassVar[Optional[str]] = None
4242

4343
# temporary attribute for argparsing, and should be used to build up object attributes
44-
_ARGS: ClassVar[Optional[Dict[str, str]]] = None
44+
_ARGS: ClassVar[Optional[argparse.Namespace]] = None
4545

4646
# temporary attribute for parser instantiation, should be used to check if user parsed args
4747
parser: ClassVar[Optional[argparse.ArgumentParser]] = None
@@ -52,20 +52,30 @@ def __init__(self):
5252

5353

5454
@classmethod
55-
def parse_args(cls) -> None:
55+
def parse_args(cls) -> Optional[argparse.Namespace]:
5656
"""
5757
Base root-level argument parser. After the executors initializes its application-specific arguments, and the frontend
5858
builds up further with analysis-specific arguments, this base parse_args finalizes with all other required args every
5959
executor should consume.
6060
"""
61-
parser = cls.parser
61+
62+
if cls._ARGS:
63+
L.debug("Returning already-parsed arguments")
64+
return cls._ARGS
65+
66+
# checks if frontend executor already implements an argparser, since we want to extend on that.
67+
if cls.parser is not None:
68+
parser: argparse.ArgumentParser = cls.parser
69+
else:
70+
parser = argparse.ArgumentParser(description="Use {} as a backend for DeepState".format(cls.NAME))
6271

6372
# Compilation/instrumentation support, only if COMPILER is set
6473
# TODO: extends compilation interface for symex engines that "compile" source to
6574
# binary, IR format, or boolean expressions for symbolic VM to reason with
6675
if cls.COMPILER:
6776
L.debug("Adding compilation support since a compiler was specified")
6877

78+
# type: ignore
6979
compile_group = parser.add_argument_group("Compilation and Instrumentation")
7080
compile_group.add_argument("--compile_test", type=str,
7181
help="Path to DeepState test source for compilation and instrumentation by analysis tool.")
@@ -86,8 +96,8 @@ def parse_args(cls) -> None:
8696

8797
# Analysis-related configurations
8898
parser.add_argument(
89-
"-o", "--output_test_dir", type=str, default="{}_out".format(cls.NAME),
90-
help="Output directory where tests will be saved (default is `{FUZZER}_out`).")
99+
"-o", "--output_test_dir", type=str, default="out",
100+
help="Output directory where tests will be saved (default is `out`).")
91101

92102
parser.add_argument(
93103
"-c", "--config", type=str,
@@ -113,25 +123,27 @@ def parse_args(cls) -> None:
113123
help="Other DeepState flags to pass to harness before execution, in format `--arg=val`.")
114124

115125
args = parser.parse_args()
126+
127+
# from parsed arguments, modify dict copy if configuration is specified
116128
_args: Dict[str, str] = vars(args)
117129

118130
# if configuration is specified, parse and replace argument instantiations
119131
if args.config:
120-
_args.update(cls.build_from_config(args.config))
132+
_args.update(cls.build_from_config(args.config)) # type: ignore
121133

122134
# Cleanup: force --no_exit_compile to be on, meaning if user specifies a `[test]` section,
123135
# execution will continue. Delete config as well
124-
_args["no_exit_compile"] = True
136+
_args["no_exit_compile"] = True # type: ignore
125137
del _args["config"]
126138

127139
cls._ARGS = args
128140
return cls._ARGS
129141

130142

131-
ConfigType = Dict[str, Dict[str, Union[str, List[str]]]]
143+
ConfigType = Dict[str, Dict[str, Any]]
132144

133145
@staticmethod
134-
def build_from_config(config: str, allowed_keys: Optional[List[str]] = None, include_sections: bool = False) -> Union[ConfigType, Dict[str, str]]:
146+
def build_from_config(config: str, allowed_keys: Optional[List[str]] = None, include_sections: bool = False) -> Union[ConfigType, Dict[str, Any]]:
135147
"""
136148
Simple auxiliary helper that does safe and correct parsing of DeepState configurations. This can be used
137149
in the following manners:
@@ -145,7 +157,7 @@ def build_from_config(config: str, allowed_keys: Optional[List[str]] = None, in
145157
:param include_sections: if true, parse all sections, and return a ConfigType where keys are section names
146158
"""
147159

148-
context: ConfigType = dict()
160+
context: ConfigType = dict() # type: ignore
149161

150162
# reserved sections are ignored by executors, but can be used by other auxiliary tools
151163
# to reason about with.
@@ -164,7 +176,7 @@ def build_from_config(config: str, allowed_keys: Optional[List[str]] = None, in
164176
parser = configparser.SafeConfigParser()
165177
parser.read(config)
166178

167-
for section, kv in parser._sections.items():
179+
for section, kv in parser._sections.items(): # type: ignore
168180

169181
# if `include_sections` is not set, parse only from allowed_sections
170182
if not include_sections:
@@ -191,7 +203,7 @@ def build_from_config(config: str, allowed_keys: Optional[List[str]] = None, in
191203
else:
192204
_context[key] = val
193205

194-
return context
206+
return context # type: ignore
195207

196208

197209
def init_from_dict(self, _args: Optional[Dict[str, str]] = None) -> None:

bin/deepstate/core/fuzz.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,17 @@ def __init__(self, envvar: str = "PATH") -> None:
8282
compiler_paths: List[str] = [f"{path}/{compiler}" for path in potential_paths if os.path.isfile(path + '/' + compiler)]
8383
if len(compiler_paths) == 0:
8484

85-
# if not in envvar, check to see if user supplied absolute path
85+
# if not in earlier envvar, check to see if user supplied absolute path
8686
if os.path.isfile(compiler):
8787
self.compiler: str = compiler
8888

8989
# .. or check if in $PATH before tossing exception
9090
else:
91-
for path in os.environ.get("PATH").split(os.pathsep):
91+
path_env: Optional[str] = os.environ.get("PATH")
92+
if path_env is None:
93+
raise FuzzFrontendError("$PATH envvar is not defined.")
94+
95+
for path in path_env.split(os.pathsep):
9296
compiler_path: str = os.path.join(path, compiler)
9397

9498
L.debug(f"Checking if `{compiler_path}` is a valid compiler path")
@@ -106,6 +110,9 @@ def __init__(self, envvar: str = "PATH") -> None:
106110

107111
L.debug(f"Initialized compiler: {self.compiler}")
108112

113+
# store envvar for re-use across API
114+
self.env: str = env
115+
109116
# in case name supplied as `bin/fuzzer`, strip executable name
110117
self.name: str = fuzzer_name.split('/')[-1] if '/' in fuzzer_name else fuzzer_name
111118
L.debug(f"Fuzzer name: {self.name}")
@@ -125,7 +132,7 @@ def __init__(self, envvar: str = "PATH") -> None:
125132
self.num_workers: int = 1
126133

127134
self.compile_test: Optional[str] = None
128-
self.compiler_args: List[str] = []
135+
self.compiler_args: Optional[str] = None
129136
self.out_test_name: str = "out"
130137

131138
self.enable_sync: bool = False
@@ -134,6 +141,7 @@ def __init__(self, envvar: str = "PATH") -> None:
134141
self.sync_dir: str = "out_sync"
135142

136143
self.which_test: Optional[str] = None
144+
self.post_stats: bool = False
137145

138146

139147
def __repr__(self) -> str:
@@ -191,6 +199,12 @@ def parse_args(cls) -> Optional[argparse.Namespace]:
191199
help="Time in seconds the executor should sync to sync directory (default is 5 seconds).")
192200

193201

202+
# Post-processing
203+
post_group = parser.add_argument_group("Execution Post-processing")
204+
post_group.add_argument("--post_stats", action="store_true",
205+
help="Output post-fuzzing statistics to user (if any).")
206+
207+
194208
# Miscellaneous options
195209
parser.add_argument(
196210
"--fuzzer_help", action="store_true",
@@ -201,6 +215,8 @@ def parse_args(cls) -> Optional[argparse.Namespace]:
201215
cls.parser = parser
202216
super(FuzzerFrontend, cls).parse_args()
203217

218+
return None
219+
204220

205221
def print_help(self) -> None:
206222
"""
@@ -214,18 +230,18 @@ def print_help(self) -> None:
214230
##############################################
215231

216232

217-
def compile(self, lib_path: str, flags: List[str], _out_bin: str, env = os.environ.copy()) -> Optional[str]:
233+
def compile(self, lib_path: str, flags: List[str], _out_bin: str, env = os.environ.copy()) -> None:
218234
"""
219235
Provides a simple interface that allows the user to compile a test harness
220236
with instrumentation using the specified compiler. Users should implement an
221237
inherited method that constructs the arguments necessary, and then pass it to the
222-
base object. Returns string of generated binary if successful.
238+
base object.
223239
224240
`compile()` also supports compiling arbitrary harnesses without instrumentation if a compiler
225241
isn't set.
226242
227243
:param lib_path: path to DeepState static library for linking
228-
:param flags: list of compiler flags (TODO: parse from compilation database)
244+
:param flags: list of compiler flags (TODO: support parsing from compilation database path)
229245
:param _out_bin: name of linked test harness binary
230246
:param env: optional envvars to set during compilation
231247
"""
@@ -237,7 +253,7 @@ def compile(self, lib_path: str, flags: List[str], _out_bin: str, env = os.envir
237253
raise FuzzFrontendError(f"User-specified test binary conflicts with compiling from source.")
238254

239255
if not os.path.isfile(lib_path):
240-
raise FuzzFrontendError("No {}-instrumented DeepState static library found in {}".format(cls, lib_path))
256+
raise FuzzFrontendError("No {}-instrumented DeepState static library found in {}".format(self, lib_path))
241257
L.debug(f"Static library path: {lib_path}")
242258

243259
# initialize compiler envvars
@@ -246,8 +262,7 @@ def compile(self, lib_path: str, flags: List[str], _out_bin: str, env = os.envir
246262
L.debug(f"CC={env['CC']} and CXX={env['CXX']}")
247263

248264
# initialize command with prepended compiler
249-
compiler_args = ["-std=c++11", self.compile_test] + flags + \
250-
["-o", _out_bin]
265+
compiler_args: List[str] = ["-std=c++11", self.compile_test] + flags + ["-o", _out_bin] # type: ignore
251266
compile_cmd = [self.compiler] + compiler_args
252267
L.debug(f"Compilation command: {str(compile_cmd)}")
253268

@@ -259,7 +274,7 @@ def compile(self, lib_path: str, flags: List[str], _out_bin: str, env = os.envir
259274
raise FuzzFrontendError(f"{self.compiler} interrupted due to exception:", e)
260275

261276
# extra check if target binary was successfully compiled, and set that as target binary
262-
out_bin = os.path.join(os.environ.get("PWD"), _out_bin)
277+
out_bin = os.path.join(os.getcwd(), _out_bin)
263278
if os.path.exists(out_bin):
264279
self.binary = out_bin
265280

@@ -351,7 +366,7 @@ def _dict_to_cmd(cmd_dict: Dict[str, Optional[str]]) -> List[Optional[str]]:
351366
return cmd_args
352367

353368

354-
def build_cmd(self, cmd_dict: Dict[str, Optional[str]], input_symbol: str = "@@") -> Dict[str, Optional[str]]:
369+
def build_cmd(self, cmd_dict: Dict[str, Any], input_symbol: str = "@@") -> Dict[str, Optional[str]]:
355370
"""
356371
Helper method to be invoked by child fuzzer class's cmd() property method in order
357372
to finalize command called by the fuzzer executable with appropriate arguments for the
@@ -409,15 +424,15 @@ def run(self, compiler: Optional[str] = None, no_exec: bool = False):
409424
self.pre_exec()
410425

411426
# initialize cmd from property
412-
command = [self.fuzzer] + self._dict_to_cmd(self.cmd)
427+
command = [self.fuzzer] + self._dict_to_cmd(self.cmd) # type: ignore
413428

414429
# prepend compiler that invokes fuzzer
415430
if compiler:
416431
command.insert(0, compiler)
417432

418433
results: List[int] = []
419434
pool = multiprocessing.Pool(processes=self.num_workers)
420-
results = pool.apply_async(self._run, args=(command,))
435+
results = pool.apply_async(self._run, args=(command,)) # type: ignore
421436

422437
pool.close()
423438
pool.join()

bin/deepstate/core/symex.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,18 @@
1414

1515
import logging
1616
logging.basicConfig()
17-
logging.addLevelName(15, "TRACE")
1817

18+
import os
19+
import struct
1920
import argparse
20-
#import md5
2121
import hashlib
2222
import functools
23-
import os
24-
import struct
2523

2624
from deepstate.core.base import AnalysisBackend
2725

2826

29-
class TestInfo(object):
30-
"""Represents a `DeepState_TestInfo` data structure from the program, as
31-
well as associated meta-data about the test."""
32-
def __init__(self, ea, name, file_name, line_number):
33-
self.ea = ea
34-
self.name = name
35-
self.file_name = file_name
36-
self.line_number = line_number
37-
27+
LOGGER = logging.getLogger("deepstate")
28+
LOGGER.setLevel(os.environ.get("DEEPSTATE_LOG", "INFO").upper())
3829

3930
LOG_LEVEL_DEBUG = 0
4031
LOG_LEVEL_TRACE = 1
@@ -44,23 +35,29 @@ def __init__(self, ea, name, file_name, line_number):
4435
LOG_LEVEL_EXTERNAL = 5
4536
LOG_LEVEL_FATAL = 6
4637

47-
48-
LOGGER = logging.getLogger("deepstate")
49-
LOGGER.setLevel(logging.DEBUG)
50-
51-
LOGGER.trace = functools.partial(LOGGER.log, 15)
52-
logging.TRACE = 15
38+
LOGGER.trace = functools.partial(LOGGER.log, 15) # type: ignore
39+
logging.TRACE = 15 # type: ignore
5340

5441
LOG_LEVEL_TO_LOGGER = {
5542
LOG_LEVEL_DEBUG: LOGGER.debug,
56-
LOG_LEVEL_TRACE: LOGGER.trace,
43+
LOG_LEVEL_TRACE: LOGGER.trace, # type: ignore
5744
LOG_LEVEL_INFO: LOGGER.info,
5845
LOG_LEVEL_WARNING: LOGGER.warning,
5946
LOG_LEVEL_ERROR: LOGGER.error,
6047
LOG_LEVEL_FATAL: LOGGER.critical
6148
}
6249

6350

51+
class TestInfo(object):
52+
"""Represents a `DeepState_TestInfo` data structure from the program, as
53+
well as associated meta-data about the test."""
54+
def __init__(self, ea, name, file_name, line_number):
55+
self.ea = ea
56+
self.name = name
57+
self.file_name = file_name
58+
self.line_number = line_number
59+
60+
6461
class Stream(object):
6562
def __init__(self, entries):
6663
self.entries = entries
@@ -146,13 +143,13 @@ def parse_args(cls):
146143

147144
@property
148145
def context(self):
149-
"""Gives convenient property-based access to a dictionary holding state-
150-
local varaibles."""
146+
"""Gives convenient property-based access to a dictionary holding state-local variables."""
151147
return self.get_context()
152148

149+
153150
def read_c_string(self, ea, concretize=True, constrain=False):
154-
"""Read a NUL-terminated string from `ea`."""
155-
assert isinstance(ea, (int))
151+
"""Read a NULL-terminated string from address `ea`."""
152+
156153
chars = []
157154
while True:
158155
b, ea = self.read_uint8_t(ea, concretize=concretize, constrain=constrain)
@@ -173,6 +170,7 @@ def read_c_string(self, ea, concretize=True, constrain=False):
173170
else:
174171
return chars, next_ea
175172

173+
176174
def _read_test_info(self, ea):
177175
"""Read in a `DeepState_TestInfo` info structure from memory."""
178176
prev_test_ea, ea = self.read_uintptr_t(ea)

0 commit comments

Comments
 (0)