Skip to content

Commit 63f41d9

Browse files
committed
fix(core): address CodeRabbit runtime-safety findings
Harden the CodeRabbit-reported runtime-safety and API-correctness issues across the disassembly facade, Intel/CIL backends, label providers, loaders, report DTOs, and shared utilities. Grouped fixes: - guard optional timeout callbacks and malformed parser/provider inputs - correct byte/string comparisons, alignment ordering, and provider call contracts - make report DTO construction, statistics addition, and mutable defaults safer - replace fragile/private APIs, direct prints, deprecated logger calls, and assert-only validation - convert malformed Rust demangler inputs to demangler-specific errors - preserve explicit backend selection during DEX autodetect - move regression coverage into subsystem test suites instead of a CodeRabbit-specific test file Validation: - python -m pytest tests/testDalvikDisassembler.py tests/testEscaper.py tests/testRustSymbolProvider.py tests/testFileFormatParsers.py tests/testIntelDisassembler.py tests/testIndirectCallAnalyzer.py -q - make test - make lint - python -m ruff check src/smda tests/testDalvikDisassembler.py - git diff --check
1 parent 6208ee7 commit 63f41d9

44 files changed

Lines changed: 484 additions & 162 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/smda/Disassembler.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def __init__(self, config=None, backend=None):
2626
config = SmdaConfig()
2727
self.config = config
2828
self.disassembler = None
29+
self._explicit_backend = bool(backend)
2930
if backend == "intel":
3031
self.disassembler = IntelDisassembler(self.config)
3132
elif backend == "cil":
@@ -136,8 +137,8 @@ def disassembleFile(self, file_path, pdb_path=""):
136137
self.disassembler.addPdbFile(binary_info, pdb_path)
137138
smda_report = self._disassemble(binary_info, timeout=self.config.TIMEOUT)
138139
if self.config.WITH_STRINGS:
139-
is_go_binary = GoSymbolProvider(None).getPcLntabOffset(binary_info.binary)
140-
string_mode = "go" if is_go_binary else None
140+
go_pclntab_offset = GoSymbolProvider(None).getPcLntabOffset(binary_info.binary)
141+
string_mode = "go" if go_pclntab_offset is not None else None
141142
self._addStringsToReport(smda_report, binary_info.binary, mode=string_mode)
142143
if self.config.STORE_BUFFER:
143144
smda_report.buffer = binary_info.binary
@@ -157,8 +158,8 @@ def disassembleUnmappedBuffer(self, file_content):
157158
self.initDisassembler(binary_info.architecture)
158159
smda_report = self._disassemble(binary_info, timeout=self.config.TIMEOUT)
159160
if self.config.WITH_STRINGS:
160-
is_go_binary = GoSymbolProvider(None).getPcLntabOffset(binary_info.binary)
161-
string_mode = "go" if is_go_binary else None
161+
go_pclntab_offset = GoSymbolProvider(None).getPcLntabOffset(binary_info.binary)
162+
string_mode = "go" if go_pclntab_offset is not None else None
162163
self._addStringsToReport(smda_report, file_content, mode=string_mode)
163164
if self.config.STORE_BUFFER:
164165
smda_report.buffer = file_content
@@ -186,7 +187,7 @@ def disassembleBuffer(
186187
# Auto-detect DEX when the caller did not explicitly override architecture.
187188
# disassembleUnmappedBuffer / disassembleFile already use FileLoader for detection;
188189
# this path bypasses it, so we check the magic bytes manually here.
189-
if architecture == "intel" and DexFileLoader.isCompatible(file_content):
190+
if not self._explicit_backend and architecture == "intel" and DexFileLoader.isCompatible(file_content):
190191
architecture = "dalvik"
191192
if bitness is None:
192193
bitness = DexFileLoader.getBitness(file_content)
@@ -205,8 +206,8 @@ def disassembleBuffer(
205206
try:
206207
smda_report = self._disassemble(binary_info, timeout=self.config.TIMEOUT)
207208
if self.config.WITH_STRINGS:
208-
is_go_binary = GoSymbolProvider(None).getPcLntabOffset(binary_info.binary)
209-
string_mode = "go" if is_go_binary else None
209+
go_pclntab_offset = GoSymbolProvider(None).getPcLntabOffset(binary_info.binary)
210+
string_mode = "go" if go_pclntab_offset is not None else None
210211
self._addStringsToReport(smda_report, file_content, mode=string_mode)
211212
if self.config.STORE_BUFFER:
212213
smda_report.buffer = file_content

src/smda/DisassemblyStatistics.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ class DisassemblyStatistics:
88
num_function_calls = None
99
num_failed_functions = None
1010
num_failed_instructions = None
11+
_fields = (
12+
"num_functions",
13+
"num_recursive_functions",
14+
"num_leaf_functions",
15+
"num_basic_blocks",
16+
"num_instructions",
17+
"num_api_calls",
18+
"num_function_calls",
19+
"num_failed_functions",
20+
"num_failed_instructions",
21+
)
1122

1223
def __init__(self, disassembly_result=None):
1324
if disassembly_result is not None:
@@ -74,13 +85,14 @@ def toDict(self):
7485
def __add__(self, other):
7586
if not isinstance(other, DisassemblyStatistics):
7687
raise ValueError("Needs another DisassemblyStatistics to perform addition of values")
77-
self.num_functions += other.num_functions
78-
self.num_recursive_functions += other.num_recursive_functions
79-
self.num_leaf_functions += other.num_leaf_functions
80-
self.num_basic_blocks += other.num_basic_blocks
81-
self.num_instructions += other.num_instructions
82-
self.num_api_calls += other.num_api_calls
83-
self.num_function_calls += other.num_function_calls
84-
self.num_failed_functions += other.num_failed_functions
85-
self.num_failed_instructions += other.num_failed_instructions
88+
result = DisassemblyStatistics()
89+
for field in self._fields:
90+
setattr(result, field, (getattr(self, field) or 0) + (getattr(other, field) or 0))
91+
return result
92+
93+
def __iadd__(self, other):
94+
if not isinstance(other, DisassemblyStatistics):
95+
raise ValueError("Needs another DisassemblyStatistics to perform addition of values")
96+
for field in self._fields:
97+
setattr(self, field, (getattr(self, field) or 0) + (getattr(other, field) or 0))
8698
return self

src/smda/cil/CilDisassembler.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -156,21 +156,6 @@ def analyzeFunction(self, pe, start_addr, method_body):
156156
i_size = len(i_bytes)
157157
i_mnemonic = str(insn.opcode)
158158
i_op_str = format_operand(pe, insn.operand)
159-
# debug output for all instructions
160-
if False:
161-
from smda.cil.CilInstructionEscaper import CilInstructionEscaper
162-
from smda.common.SmdaInstruction import SmdaInstruction
163-
164-
smda_ins = SmdaInstruction([i_address, i_bytes.hex(), i_mnemonic, i_op_str])
165-
escaped = CilInstructionEscaper.escapeBinary(smda_ins)
166-
print(
167-
f"{escaped:<20}"
168-
+ f"{insn.offset:04X}"
169-
+ " "
170-
+ f"{' '.join(f'{b:02x}' for b in insn.get_bytes()): <20}"
171-
+ f"{str(insn.opcode): <15}"
172-
+ format_operand(pe, insn.operand)
173-
)
174159
# https://en.wikipedia.org/wiki/List_of_CIL_instructions
175160
if i_mnemonic in ["ret"]:
176161
state.setNextInstructionReachable(False)
@@ -211,12 +196,13 @@ def analyzeFunction(self, pe, start_addr, method_body):
211196
target = int(i_op_str, 16)
212197
state.addCodeRef(i_address, target, by_jump=True)
213198
if i_mnemonic in ["jmp"]:
214-
raise Exception(
215-
"Found unhandled CIL jmp instruction, report back its structure and have Daniel fix it."
216-
)
217199
target = int(i_op_str, 16)
218200
state.addCodeRef(i_address, target, by_jump=True)
219201
state.setNextInstructionReachable(False)
202+
LOGGER.error(
203+
"Found unhandled CIL jmp instruction at 0x%x, report back its structure and have Daniel fix it.",
204+
i_address,
205+
)
220206
if i_mnemonic in ["ldstr"]:
221207
# we possibly want to extract and collect these and put them in the stringref part of SmdaFunction
222208
self.disassembly.addStringRef(start_addr, i_address, i_op_str[1:-1])
@@ -276,6 +262,6 @@ def analyzeBuffer(self, binary_info, cbAnalysisTimeout=None):
276262
self.analyzeFunction(pe, method_body.offset, method_body)
277263
# package up and finish
278264
self.disassembly.analysis_end_ts = datetime.datetime.now(datetime.timezone.utc)
279-
if cbAnalysisTimeout():
265+
if cbAnalysisTimeout and cbAnalysisTimeout():
280266
self.disassembly.analysis_timeout = True
281267
return self.disassembly

src/smda/cil/CilInstructionEscaper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,6 @@ def escapeMnemonic(mnemonic):
303303
mnemonic,
304304
)
305305
return "U"
306-
return mnemonic
307306

308307
@staticmethod
309308
def escapeField(op_field, escape_registers=True, escape_pointers=True, escape_constants=True):
@@ -451,6 +450,7 @@ def escapeBinaryJumpCall(ins, escape_intraprocedural_jumps=False):
451450
"brtrue",
452451
"brtrue.s",
453452
"brzero",
453+
"brzero.s",
454454
"switch",
455455
]:
456456
escaped_sequence = CilInstructionEscaper.escapeToOpcodeOnly(ins)

src/smda/cil/FunctionAnalysisState.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,14 @@ def getBlocks(self):
133133
and i != len(self.instructions) - 1
134134
and any(r != self.instructions[i + 1][0] for r in self.code_refs_from[current[0]])
135135
):
136-
break
137136
# if we can reach a colliding address from here, the block is broken and should end.
138137
reachable_collisions = self.code_refs_from[current[0]].intersection(self.colliding_addresses)
139138
next_addr = current[0] + current[1]
140139
is_next_addr = next_addr in reachable_collisions
141140
if reachable_collisions and is_next_addr:
142141
# we should remove the from/to code references for this collision as there should be no non CFG instruction references between instructions of different functions
143142
self.removeCodeRef(current[0], next_addr)
144-
break
143+
break
145144
if (
146145
i != len(self.instructions) - 1
147146
and self.instructions[i + 1][0] in self.code_refs_to

src/smda/common/BinaryInfo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class BinaryInfo:
1717
raw_data = b""
1818
binary_size = 0
1919
bitness = None
20-
code_areas = []
20+
code_areas = None
2121
component = ""
2222
family = ""
2323
file_path = ""
@@ -36,6 +36,7 @@ def __init__(self, binary):
3636
self.binary = binary
3737
self.raw_data = binary
3838
self.binary_size = len(binary)
39+
self.code_areas = []
3940
self._lief_binary = None
4041
self.abi = ""
4142

src/smda/common/SmdaBasicBlock.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@ class SmdaBasicBlock:
1616
def __init__(self, instructions, smda_function=None):
1717
assert isinstance(instructions, list)
1818
self.smda_function = smda_function
19+
self.instructions = instructions
20+
self.offset = instructions[0].offset if instructions else None
21+
self.length = len(instructions)
22+
self.picblockhash = None
23+
self.opcblockhash = None
1924
if instructions:
20-
self.instructions = instructions
21-
self.offset = instructions[0].offset
22-
self.length = len(instructions)
2325
self.picblockhash = self.getPicBlockHash()
2426
self.opcblockhash = self.getOpcBlockHash()
2527

2628
def getInstructions(self) -> Iterator["SmdaInstruction"]:
29+
if self.instructions is None:
30+
return
2731
yield from self.instructions
2832

2933
def getPicBlockHash(self):
@@ -90,12 +94,11 @@ def getSuccessors(self):
9094

9195
@classmethod
9296
def fromDict(cls, block_dict, smda_function=None) -> "SmdaBasicBlock":
93-
smda_block = cls(None)
94-
smda_block.smda_function = smda_function
95-
smda_block.instructions = [SmdaInstruction.fromDict(d, smda_function=smda_function) for d in block_dict]
96-
return smda_block
97+
return cls([SmdaInstruction.fromDict(d, smda_function=smda_function) for d in block_dict], smda_function)
9798

9899
def toDict(self) -> dict:
100+
if self.instructions is None:
101+
return []
99102
return [smda_ins.toDict() for smda_ins in self.instructions]
100103

101104
def __int__(self):

src/smda/common/SmdaFunction.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ class SmdaFunction:
3636
function_name = ""
3737
pic_hash = None
3838
opc_hash = None
39+
nesting_depth = 0
3940
strongly_connected_components = None
4041
tfidf = None
4142

4243
def __init__(self, disassembly=None, function_offset=None, config=None, smda_report=None):
4344
self.smda_report = smda_report
45+
self.nesting_depth = 0
4446
if disassembly is not None and function_offset is not None:
4547
self._escaper = IntelInstructionEscaper if disassembly.binary_info.architecture in ["intel"] else None
4648
self.offset = function_offset
@@ -72,14 +74,11 @@ def __init__(self, disassembly=None, function_offset=None, config=None, smda_rep
7274
# DEX strings are part of the parsed file structure, so they're always
7375
# populated for Dalvik regardless of WITH_STRINGS — no extra extraction
7476
# cost. For other architectures, honor WITH_STRINGS as usual.
75-
if (
76-
config
77-
and config.WITH_STRINGS
78-
or (
79-
disassembly.binary_info.architecture == "dalvik"
80-
and disassembly.getStringRefsForFunction(function_offset)
81-
)
82-
):
77+
is_dalvik_with_strings = (
78+
disassembly.binary_info.architecture == "dalvik"
79+
and disassembly.getStringRefsForFunction(function_offset)
80+
)
81+
if (config and config.WITH_STRINGS) or is_dalvik_with_strings:
8382
self.stringrefs = (
8483
self._normalizeDalvikStringRefs(disassembly.getStringRefsForFunction(function_offset))
8584
if disassembly.binary_info.architecture == "dalvik"
@@ -145,7 +144,7 @@ def getPicHashAsLong(self):
145144
return self.pic_hash
146145

147146
def getPicHashAsHex(self):
148-
return struct.pack("l", self.pic_hash).hex()
147+
return struct.pack("Q", self.pic_hash).hex()
149148

150149
def getInstructions(self):
151150
for block in self.getBlocks():

src/smda/common/SmdaInstruction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def getDetailed(self):
7575
# WAIT instruction before the wait version and the NOP instruction
7676
# before the no-wait version.
7777
if len(with_details) > 1:
78-
LOGGER.warn(
78+
LOGGER.warning(
7979
f"Sequence {self.bytes} disassembles to {len(with_details)} instructions but expected one - taking the last instruction only!"
8080
)
8181
self.detailed = with_details[-1]

src/smda/common/TailcallAnalyzer.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import bisect
22
import json
3+
import logging
34
from collections import defaultdict
45
from operator import itemgetter
56

67
from smda.common.ExceptionHandling import reraise_non_operational_exception
78

9+
LOGGER = logging.getLogger(__name__)
10+
811

912
class TailcallAnalyzer:
1013
def __init__(self):
@@ -87,15 +90,15 @@ def __printIntervals(self, intervals):
8790
# return
8891
if len(intervals) < 25:
8992
for one, two in intervals:
90-
print(f" 0x{one:x} -> 0x{two:x}")
93+
LOGGER.debug(" 0x%x -> 0x%x", one, two)
9194
else:
92-
print("Function has too many intervals to display")
95+
LOGGER.debug("Function has too many intervals to display")
9396

9497
def resolveTailcalls(self, disassembler, verbose=False):
9598
newly_created_functions = set()
9699
for tailcall in self.getTailcalls():
97100
if verbose:
98-
print(f"Processing tailcall:\n{json.dumps(tailcall, indent=2, sort_keys=True)}")
101+
LOGGER.debug("Processing tailcall:\n%s", json.dumps(tailcall, indent=2, sort_keys=True))
99102
# remove the information from the function-analysis state of the disassembly
100103
function = self.__getFunctionByStartAddr(tailcall["destination_function"])
101104
if not function or function.is_tailcall_function:
@@ -105,7 +108,7 @@ def resolveTailcalls(self, disassembler, verbose=False):
105108
self.__functions.remove(function)
106109
if function:
107110
if verbose:
108-
print("Old function:")
111+
LOGGER.debug("Old function:")
109112
self.__printIntervals(self.__getFunctionIntervals(function))
110113
function.revertAnalysis()
111114

@@ -123,7 +126,7 @@ def resolveTailcalls(self, disassembler, verbose=False):
123126
reraise_non_operational_exception(exc)
124127
# print ("0x{:x} -> 0x{:x}".format(tailcall["destination_function"], tailcall["destination_addr"]))
125128
elif verbose:
126-
print(
129+
LOGGER.debug(
127130
"**** 0x{:x} IS NOW PART OF 0x{:x}".format(
128131
tailcall["destination_function"], tailcall["destination_addr"]
129132
)
@@ -132,10 +135,10 @@ def resolveTailcalls(self, disassembler, verbose=False):
132135
if verbose:
133136
function = self.__getFunctionByStartAddr(tailcall["destination_function"])
134137
new_function = self.__getFunctionByStartAddr(tailcall["destination_addr"])
135-
print("New function:")
138+
LOGGER.debug("New function:")
136139
if new_function:
137140
self.__printIntervals(self.__getFunctionIntervals(new_function))
138-
print("Re-disassembled old function:")
141+
LOGGER.debug("Re-disassembled old function:")
139142
if function:
140143
self.__printIntervals(self.__getFunctionIntervals(function))
141144
return sorted(newly_created_functions)

0 commit comments

Comments
 (0)