Skip to content

update for IDA 9.0 Pro, python 3.10 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ IDA Pro 7.0 or newer is required to use IDA-EVM.

# Installation
* Copy `evm-loader.py` to `%IDA%/loaders`
* Copy `evm-cpu.py` and `known_hashes.py` to `%IDA%/procs`
* Copy `evm-cpu.py` and `evm-cpu/` to `%IDA%/procs`
* Restart IDA
217 changes: 169 additions & 48 deletions evm-cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from idc import *
from idaapi import *
import idautils
import os

import known_hashes

'''
Code from manticore
Expand All @@ -17,11 +17,11 @@ class EVMAsm(object):
Example use::

>>> from manticore.platforms.evm import EVMAsm
>>> EVMAsm.disassemble_one('\\x60\\x10')
>>> EVMAsm.disassemble_one(b'\\x60\\x10')
Instruction(0x60, 'PUSH', 1, 0, 1, 0, 'Place 1 byte item on stack.', 16, 0)
>>> EVMAsm.assemble_one('PUSH1 0x10')
>>> EVMAsm.assemble_one(b'PUSH1 0x10')
Instruction(0x60, 'PUSH', 1, 0, 1, 0, 'Place 1 byte item on stack.', 16, 0)
>>> tuple(EVMAsm.disassemble_all('\\x30\\x31'))
>>> tuple(EVMAsm.disassemble_all(b'\\x30\\x31'))
(Instruction(0x30, 'ADDRESS', 0, 0, 1, 2, 'Get address of currently executing account.', None, 0),
Instruction(0x31, 'BALANCE', 0, 1, 1, 20, 'Get balance of the given account.', None, 1))
>>> tuple(EVMAsm.assemble_all('ADDRESS\\nBALANCE'))
Expand Down Expand Up @@ -144,7 +144,7 @@ def parse_operand(self, buf):
operand = 0
for _ in range(self.operand_size):
operand <<= 8
operand |= ord(next(buf))
operand |= next(buf)
self._operand = operand
except StopIteration:
raise Exception("Not enough data for decoding")
Expand Down Expand Up @@ -199,7 +199,7 @@ def bytes(self):
''' Encoded instruction '''
bytes = []
bytes.append(chr(self._opcode))
for offset in reversed(xrange(self.operand_size)):
for offset in reversed(range(self.operand_size)):
c = (self.operand >> offset*8 ) & 0xff
bytes.append(chr(c))
return ''.join(bytes)
Expand Down Expand Up @@ -450,20 +450,23 @@ def is_arithmetic(self):
0xff: ('SELFDESTRUCT', 0, 1, 0, 5000, 'Halt execution and register account for later deletion.')
}

reverse_table = {}

@staticmethod
#@memoized
def _get_reverse_table():
''' Build an internal table used in the assembler '''
reverse_table = {}
for (opcode, (name, immediate_operand_size, pops, pushes, gas, description)) in EVMAsm._table.items():
mnemonic = name
if name == 'PUSH':
mnemonic = '%s%d'%(name, (opcode&0x1f) + 1)
elif name in ('SWAP', 'LOG', 'DUP'):
mnemonic = '%s%d'%(name, (opcode&0xf) + 1)
if len(EVMAsm.reverse_table) == 0:

for (opcode, (name, immediate_operand_size, pops, pushes, gas, description)) in EVMAsm._table.items():
mnemonic = name
if name == 'PUSH':
mnemonic = '%s%d' % (name, (opcode & 0x1f) + 1)
elif name in ('SWAP', 'LOG', 'DUP'):
mnemonic = '%s%d' % (name, (opcode & 0xf) + 1)

reverse_table[mnemonic] = opcode, name, immediate_operand_size, pops, pushes, gas, description
return reverse_table
EVMAsm.reverse_table[mnemonic] = opcode, name, immediate_operand_size, pops, pushes, gas, description
return EVMAsm.reverse_table

@staticmethod
def assemble_one(assembler, offset=0):
Expand Down Expand Up @@ -542,7 +545,7 @@ def disassemble_one(bytecode, offset=0):

'''
bytecode = iter(bytecode)
opcode = ord(next(bytecode))
opcode = next(bytecode)
invalid = ('INVALID', 0, 0, 0, 0, 'Unknown opcode')
name, operand_size, pops, pushes, gas, description = EVMAsm._table.get(opcode, invalid)
instruction = EVMAsm.Instruction(opcode, name, operand_size, pops, pushes, gas, description, offset=offset)
Expand Down Expand Up @@ -597,7 +600,7 @@ def disassemble(bytecode, offset=0):

Example use::

>>> EVMAsm.disassemble("\x60\x60\x60\x40\x52\x60\x02\x61\x01\x00")
>>> EVMAsm.disassemble(b"\x60\x60\x60\x40\x52\x60\x02\x61\x01\x00")
...
PUSH1 0x60
BLOCKHASH
Expand Down Expand Up @@ -677,7 +680,7 @@ def assemble_hex(asmcode, offset=0):
...
"0x6060604052600261010"
'''
return '0x' + EVMAsm.assemble(asmcode, offset=offset).encode('hex')
return b'0x' + EVMAsm.assemble(asmcode, offset=offset).encode('hex')



Expand All @@ -686,6 +689,7 @@ def assemble_hex(asmcode, offset=0):
# thanks to https://github.com/themadinventor/ida-xtensa/issues/12 for showing all the ida7 sdk changes
# and thanks quarsklab for an IDP overview at https://blog.quarkslab.com/ida-processor-module.html

__known_hash__ = None

class EVMProcessor(idaapi.processor_t):
id = 0x8000 + 0x6576
Expand All @@ -699,7 +703,7 @@ class EVMProcessor(idaapi.processor_t):
reg_names = ["SP"]
assembler = {
"header": [".evm"],
"flag": AS_NCHRE | ASH_HEXF0 | ASD_DECF0 | ASO_OCTF0 | ASB_BINF0 | AS_NOTAB,
"flag": AS_NCHRE | ASH_HEXF0 | ASD_DECF0 | ASO_OCTF0 | ASB_BINF0,
"uflag": 0,
"name": "evm assembler",
"origin": ".org",
Expand Down Expand Up @@ -732,23 +736,110 @@ class EVMProcessor(idaapi.processor_t):
"a_sizeof_fmt": "size %s",
}

# dup, swap
def __trace_stop(self, insn, ret_pos):
# determine output, if static return, else None
if insn.get_canon_mnem().startswith("PUSH"):
jump_addr = self.get_operand(insn[0])
# elif insn.get_canon_mnem().startswith("PUSH"):
# jump_addr = self.get_operand(insn[0])
else:
# print "__trace_stop else"
jump_addr = None
# print "__trace_stop end", insn.get_canon_mnem(), hex(self.get_operand(insn[0])), 'ret_pos =',ret_pos
return jump_addr, ret_pos

def add_jump(self, from_ea, to_ea, jp_type):
add_cref(from_ea, to_ea, jp_type)
if to_ea not in self.dst2src:
self.dst2src[to_ea] = []
# print 'add_jump', hex(from_ea), hex(to_ea)
if from_ea not in self.dst2src[to_ea]:
self.dst2src[to_ea].append(from_ea)

def add_jumps(self, from_ea, to_ea_list, ret_pos_list, jp_type_list):
if len(to_ea_list) > 1: # note: currently in this case, to_ea_list is not the jump dst addr, but the uppderstream branchs
cmtstr = "cant determine, have multiple upperstream branchs: " + ' '.join(
[hex(ea).strip('L') for ea in to_ea_list])
ida_bytes.set_cmt(from_ea, cmtstr, True)
elif len(to_ea_list) == 1:
dst_addr = to_ea_list[0]
self.add_jump(from_ea, dst_addr, jp_type_list[0])
cmtstr = "JUMP TO: " + hex(dst_addr).strip('L')
ida_bytes.set_cmt(from_ea, cmtstr, True)
else:
return

def get_all_preceding_insn_on_controlflow(self, insn):
if insn.get_canon_mnem() == 'JUMPDEST' and True: # TODO: and previous insn is not reachable
if insn.ea in self.dst2src:
prev_insn_branchs = [idautils.DecodeInstruction(ea) for ea in self.dst2src[insn.ea]]
else:
prev_insn_branchs = [idautils.DecodePreviousInstruction(insn.ea)]
else:
prev_insn_branchs = [idautils.DecodePreviousInstruction(insn.ea)]

return prev_insn_branchs

def trace_jumpdest(self, insn, current_stack_offset):
# if output == 0, keep trace, else stop

# prev_insn, fl = idautils.DecodePrecedingInstruction(insn.ea)
prev_insn_branchs = self.get_all_preceding_insn_on_controlflow(insn)
if len(prev_insn_branchs) > 1:
# print 'multiple prev_insn_branchs:', [hex(ins.ea) for ins in prev_insn_branchs]
# prev_insn = None
# note: here the return[0] is not the jump dst addr, but the upperstream branchs
return [_i.ea for _i in prev_insn_branchs], [0] * len(prev_insn_branchs)
elif len(prev_insn_branchs) == 1:
prev_insn = prev_insn_branchs[0]
# print 'in trace_jumpdest, cur insn:', hex(insn.ea), 'pre insn:', hex(prev_insn.ea) if prev_insn else None

_tbl = EVMAsm._get_reverse_table()
opname = prev_insn.get_canon_mnem()
info = _tbl[opname]
pops, pushes = info[3], info[4]

# print "trace_jumpdest", hex(prev_insn.ea), prev_insn.get_canon_mnem(), pops, pushes, current_stack_offset
update_stack_offset = current_stack_offset - pops + pushes
assert current_stack_offset <= 0, "current_stack_offset > 0, impossible, should be addressed in previous trace_jumpdest call"
if pushes > -current_stack_offset:
jump_addr, ret_pos = self.__trace_stop(prev_insn, -current_stack_offset)
if jump_addr is not None:
return [jump_addr], [ret_pos]
else:
return None, None
else:
return self.trace_jumpdest(prev_insn, update_stack_offset)
else:
return None, None
# TODO: implement stack modeling to resolve actual top value of stack

def trace_sp(self, insn):
pass

@staticmethod
def get_prototype(num):
hash_str = '0x%x' %(num, )
function_prototype = known_hashes.knownHashes.get(hash_str, '').encode('ascii','ignore')
return function_prototype
global __known_hash__
with open(os.path.join(os.path.dirname(__file__), "evm-cpu/knownhash.py")) as f:
__known_hash__ = eval(f.read())
if not __known_hash__:
idaapi.error("[evm-cpu] Failed to load './evm-cpu/knownhash.py'")
EVMProcessor.get_prototype = EVMProcessor.get_prototype_real
return EVMProcessor.get_prototype_real(num)

@staticmethod
def get_prototype_real(num):
global __known_hash__
return __known_hash__.get(num, '')

def notify_emu(self, insn):
feature = insn.get_canon_feature()
#print "emulating", insn.get_canon_mnem(), hex(feature)

mnemonic = insn.get_canon_mnem()
if mnemonic == "PUSH4":
function_prototype = self.get_prototype(self.get_operand(insn[0]))
function_prototype = EVMProcessor.get_prototype(self.get_operand(insn[0]))
if function_prototype:
ida_bytes.set_cmt(insn.ea, function_prototype, True)

Expand All @@ -761,7 +852,7 @@ def notify_emu(self, insn):
ida_bytes.set_cmt(insn.ea, "JUMPI", True)

jump_hash = insn[1].value
function_prototype = self.get_prototype(jump_hash)
function_prototype = EVMProcessor.get_prototype(jump_hash)
label = '%s (0x%x)' %(function_prototype, jump_hash)
if not ida_lines.get_extra_cmt(addr, ida_lines.E_PREV + 0): # don't dup
ida_lines.add_extra_cmt(addr, True, label)
Expand All @@ -773,27 +864,44 @@ def notify_emu(self, insn):
# add ref to next instruction for false branch
add_cref(insn.ea, insn.ea + insn.size, fl_JN)

# maybe we have a simple puch
prev_insn = idautils.DecodePreviousInstruction(insn.ea)
if prev_insn:
if prev_insn.get_canon_mnem().startswith("PUSH"):
jump_addr = self.get_operand(prev_insn[0])
add_cref(insn.ea, jump_addr, fl_JN)
# # maybe we have a simple puch
# prev_insn = idautils.DecodePreviousInstruction(insn.ea)
# if prev_insn:
# if prev_insn.get_canon_mnem().startswith("PUSH"):
# jump_addr = self.get_operand(prev_insn[0])
# add_cref(insn.ea, jump_addr, fl_JN)
jump_addr_list, ret_pos_list = self.trace_jumpdest(insn, 0)
if jump_addr_list is not None and len(jump_addr_list)>0:
#TODO: use ret_pos
self.add_jumps(insn.ea, jump_addr_list, ret_pos_list, [fl_JN]*len(jump_addr_list))
else:
pass

elif mnemonic == "JUMP":
prev_insn = idautils.DecodePreviousInstruction(insn.ea)
if prev_insn:
# TODO: implement stack modeling to resolve actual top value of stack
if prev_insn.get_canon_mnem().startswith("PUSH"):
jump_addr = self.get_operand(prev_insn[0])
jump_addr_list, ret_pos_list = self.trace_jumpdest(insn, 0)
if jump_addr_list is not None and len(jump_addr_list)>0:
#TODO: use ret_pos
self.add_jumps(insn.ea, jump_addr_list, ret_pos_list, [fl_JN]*len(jump_addr_list))
else:
pass

# prev_insn = idautils.DecodePreviousInstruction(insn.ea)
# if prev_insn:
# # TODO: implement stack modeling to resolve actual top value of stack
# if prev_insn.get_canon_mnem().startswith("PUSH"):
# jump_addr = self.get_operand(prev_insn[0])
#print "found jump to", hex(jump_addr)
add_cref(insn.ea, jump_addr, fl_JN)
# add_cref(insn.ea, jump_addr, fl_JN)
# print "testxhyu"

# TODO: adjust function boundary to include all code
#func = get_func(insn.ea)
#if func:
# #print "appending new tail"
# #append_func_tail(func, jump_addr, BADADDR)
# #reanalyze_function(func)
# func = get_func(insn.ea)
# if func:
# success = append_func_tail(func.start_ea, jump_addr, BADADDR)#BADADDR
# print "appending new tail", type(insn.ea), success
# print '---', get_func_name(insn.ea), insn.ea, func.start_ea, jump_addr, BADADDR
# print 'insn.ea', insn.ea, 'insn.ip', insn.ip
# reanalyze_function(func)

flows = (feature & CF_STOP) == 0
if flows:
Expand Down Expand Up @@ -830,9 +938,9 @@ def get_operand(op):
# re-read all of the bytes from instruction
buf = ida_bytes.get_bytes(op.addr, op.specval) # specval stores number of bytes for operand

for i in range(len(buf)):
for i in buf:
operand <<= 8
operand |= ord(buf[i])
operand |= i
elif op.type == o_near:
operand = op.addr
return operand
Expand Down Expand Up @@ -869,13 +977,13 @@ def notify_ana(self, insn):
try:
instruction = EVMAsm.disassemble_one(bytecode)
except Exception as e:
print e
print(e)
return

insn.size = instruction.size

#initialize operands to voids
operands = [insn[i] for i in xrange(1, 6)]
operands = [insn[i] for i in range(1, 6)]
for o in operands:
o.type = o_void

Expand All @@ -895,7 +1003,7 @@ def notify_ana(self, insn):
for i in prev_insns:
#print i.get_canon_mnem(),
if i.ea == ida_idaapi.BADADDR:
print 'ERROR'
print('ERROR')

if (prev_insns[0].get_canon_mnem().startswith("PUSH2") and
prev_insns[1].get_canon_mnem().startswith("EQ") and
Expand Down Expand Up @@ -931,7 +1039,7 @@ def notify_assemble(self, ea, cs, ip, use32, line):
try:
asm = EVMAsm.assemble_one(line, 0)
except Exception as e:
print "Error trying to assemble '%s': %s" %(line, e)
print("Error trying to assemble '%s': %s" %(line, e))
return None

return asm.bytes
Expand All @@ -955,7 +1063,7 @@ def __init__(self):
self.instruc.append({'name':"CALLI", 'feature':CF_USE2|CF_STOP|CF_CALL}) # pseudo instruction
self.instruction_index[0x101] = 1
i = len(self.instruc)
for (mnemonic, info) in EVMAsm._get_reverse_table().iteritems(): #_table.iteritems():
for (mnemonic, info) in EVMAsm._get_reverse_table().items(): #_table.iteritems():
features = 0 # initially zero

if info[2] != 0: # has immediate
Expand All @@ -975,7 +1083,20 @@ def __init__(self):
i += 1

self.instruc_end = len(self.instruc)
self.has_rebuild_cf = False
self.dst2src = {}

def notify_out_header(self, outctx):
idc.auto_wait()
self.rebuild_cf()

def rebuild_cf(self):
if self.has_rebuild_cf:
return
self.has_rebuild_cf = True
for func_ea in idautils.Functions():
print('rebuild_cf func_ea', func_ea, get_func(func_ea), idc.get_func_name(func_ea))
reanalyze_function(get_func(func_ea))


def PROCESSOR_ENTRY():
Expand Down
Loading