|
| 1 | +from mythril.laser.ethereum.svm import LaserEVM |
| 2 | +from mythril.laser.plugin.interface import LaserPlugin |
| 3 | +from mythril.laser.plugin.builder import PluginBuilder |
| 4 | +from mythril.laser.ethereum.state.global_state import GlobalState |
| 5 | +from .coverage_data import ( |
| 6 | + CoverageTimeSeries, |
| 7 | + InstructionCoverageInfo, |
| 8 | +) |
| 9 | +from .constants import BATCH_OF_STATES |
| 10 | +from typing import Dict, Tuple, List |
| 11 | + |
| 12 | +import time |
| 13 | +import logging |
| 14 | + |
| 15 | +log = logging.getLogger(__name__) |
| 16 | + |
| 17 | + |
| 18 | +class CoverageMetricsPluginBuilder(PluginBuilder): |
| 19 | + """CoveragePlugin |
| 20 | + Checks Instruction and branch coverage and puts it to data.json file |
| 21 | + which appears in the directory in which mythril is run. |
| 22 | + """ |
| 23 | + |
| 24 | + plugin_default_enabled = True |
| 25 | + enabled = True |
| 26 | + |
| 27 | + author = "MythX Development Team" |
| 28 | + plugin_name = "MythX Coverage Metrics" |
| 29 | + plugin_license = "All rights reserved." |
| 30 | + plugin_type = "Laser Plugin" |
| 31 | + plugin_description = ( |
| 32 | + "This plugin measures coverage throughout symbolic execution," |
| 33 | + " reporting it at the end in the MythX coverage format." |
| 34 | + ) |
| 35 | + |
| 36 | + def __call__(self, *args, **kwargs): |
| 37 | + """Constructs the plugin""" |
| 38 | + return LaserCoveragePlugin() |
| 39 | + |
| 40 | + |
| 41 | +class LaserCoveragePlugin(LaserPlugin): |
| 42 | + def __init__(self): |
| 43 | + self.instruction_coverage_data = {} # type: Dict[str, Tuple[int, Dict[bool]]] |
| 44 | + self.branch_possibilities = {} # type: Dict[str, Dict[int, List]] |
| 45 | + self.tx_id = 0 |
| 46 | + self.state_counter = 0 |
| 47 | + self.coverage = CoverageTimeSeries() |
| 48 | + self.instruction_coverage_info = InstructionCoverageInfo() |
| 49 | + self.start_time = time.time_ns() |
| 50 | + |
| 51 | + def initialize(self, symbolic_vm: LaserEVM) -> None: |
| 52 | + """Initializes the instruction coverage plugin |
| 53 | +
|
| 54 | + Introduces hooks for each instruction |
| 55 | + :param symbolic_vm: The symbolic virtual machine to initialise this plugin for |
| 56 | + """ |
| 57 | + log.info("Initializing coverage metrics plugin") |
| 58 | + |
| 59 | + self.instruction_coverage_data = {} |
| 60 | + self.branch_possibilities = {} |
| 61 | + self.tx_id = 0 |
| 62 | + |
| 63 | + # Add the instruction coverage ExecutionInfo to laser vm for use in reporting |
| 64 | + symbolic_vm.execution_info.append(self.instruction_coverage_info) |
| 65 | + symbolic_vm.execution_info.append(self.coverage) |
| 66 | + |
| 67 | + @symbolic_vm.laser_hook("execute_state") |
| 68 | + def execute_state_hook(global_state: GlobalState): |
| 69 | + self._update_instruction_coverage_data(global_state) |
| 70 | + self._update_branch_coverage_data(global_state) |
| 71 | + self.state_counter += 1 |
| 72 | + if self.state_counter == BATCH_OF_STATES: |
| 73 | + self._record_coverage() |
| 74 | + self.state_counter = 0 |
| 75 | + |
| 76 | + @symbolic_vm.laser_hook("stop_sym_trans") |
| 77 | + def execute_stop_sym_trans_hook(): |
| 78 | + self.tx_id += 1 |
| 79 | + |
| 80 | + # The following is useful for debugging |
| 81 | + # @symbolic_vm.laser_hook("stop_sym_exec") |
| 82 | + # def execute_stop_sym_exec_hook(): |
| 83 | + # self.coverage.output("coverage_data.json") |
| 84 | + # self.instruction_coverage_info.output("instruction_discovery_data.json") |
| 85 | + |
| 86 | + def _update_instruction_coverage_data(self, global_state: GlobalState): |
| 87 | + """Records instruction coverage""" |
| 88 | + code = global_state.environment.code.bytecode |
| 89 | + if code not in self.instruction_coverage_data.keys(): |
| 90 | + number_of_instructions = len(global_state.environment.code.instruction_list) |
| 91 | + self.instruction_coverage_data[code] = (number_of_instructions, {}) |
| 92 | + current_instr = global_state.get_current_instruction()["address"] |
| 93 | + if self.instruction_coverage_info.is_covered(code, current_instr) is False: |
| 94 | + self.instruction_coverage_info.add_data( |
| 95 | + code, current_instr, time.time_ns() - self.start_time |
| 96 | + ) |
| 97 | + self.instruction_coverage_data[code][1][current_instr] = True |
| 98 | + |
| 99 | + def _update_branch_coverage_data(self, global_state: GlobalState): |
| 100 | + """Records branch coverage""" |
| 101 | + code = global_state.environment.code.bytecode |
| 102 | + if code not in self.branch_possibilities: |
| 103 | + self.branch_possibilities[code] = {} |
| 104 | + |
| 105 | + if global_state.get_current_instruction()["opcode"] != "JUMPI": |
| 106 | + return |
| 107 | + addr = global_state.get_current_instruction()["address"] |
| 108 | + jump_addr = global_state.mstate.stack[-1] |
| 109 | + if jump_addr.symbolic: |
| 110 | + log.debug("Encountered a symbolic jump, ignoring it for branch coverage") |
| 111 | + return |
| 112 | + self.branch_possibilities[code][addr] = [addr + 1, jump_addr.value] |
| 113 | + |
| 114 | + def _record_coverage(self): |
| 115 | + for code, code_cov in self.instruction_coverage_data.items(): |
| 116 | + total_branches = 0 |
| 117 | + branches_covered = 0 |
| 118 | + for jumps, branches in self.branch_possibilities[code].items(): |
| 119 | + for branch in branches: |
| 120 | + total_branches += 1 |
| 121 | + branches_covered += branch in code_cov[1] |
| 122 | + self.coverage.add_data( |
| 123 | + code=code, |
| 124 | + instructions_covered=len(code_cov[1]), |
| 125 | + total_instructions=code_cov[0], |
| 126 | + branches_covered=branches_covered, |
| 127 | + tx_id=self.tx_id, |
| 128 | + total_branches=total_branches, |
| 129 | + state_counter=self.state_counter, |
| 130 | + time_elapsed=time.time_ns() - self.start_time, |
| 131 | + ) |
0 commit comments