Skip to content

Commit 83238a8

Browse files
committed
Refactoring
Support forking from transaction hash Support providing IPC path as argument Use `watchdog` to wait for IPC file creation instead of repeatedly pinging provider Convert some attributes to properties
1 parent 9fb4f6d commit 83238a8

File tree

2 files changed

+143
-67
lines changed

2 files changed

+143
-67
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ requires-python = ">=3.10"
1010
dependencies = [
1111
"scipy >=1.14.1, <1.15",
1212
"ujson >= 5.10.0, <6",
13+
"watchdog >= 5.0.3",
1314
"web3 >=7.2.0, <8",
1415
]
1516
license = {text = "MIT"}

src/degenbot/anvil_fork.py

Lines changed: 142 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,30 @@
11
import contextlib
22
import os
3+
import pathlib
34
import shutil
45
import socket
56
import subprocess
67
from collections.abc import Iterable
8+
from queue import Queue
79
from typing import Any, Literal, cast
810

11+
import watchdog.events
12+
import watchdog.observers
913
from eth_typing import HexAddress
1014
from web3 import IPCProvider, Web3
1115
from web3.middleware import Middleware
1216
from web3.types import RPCEndpoint
1317

1418
from .constants import MAX_UINT256
15-
from .exceptions import DegenbotValueError
19+
from .exceptions import DegenbotValueError, InvalidUint256
1620
from .logging import logger
1721

1822

23+
class AnvilNotFound(Exception):
24+
def __init__(self) -> None: # pragma: no cover
25+
super().__init__("Anvil path could not be located.")
26+
27+
1928
class AnvilFork:
2029
"""
2130
Launch an Anvil fork as a separate process and expose methods for commonly-used RPC calls.
@@ -27,14 +36,14 @@ def __init__(
2736
self,
2837
fork_url: str,
2938
fork_block: int | None = None,
39+
fork_transaction_hash: str | None = None,
3040
hardfork: str = "latest",
31-
port: int | None = None,
3241
chain_id: int | None = None,
3342
mining_mode: Literal["auto", "interval", "none"] = "auto",
3443
mining_interval: int = 12,
3544
storage_caching: bool = True,
3645
base_fee: int | None = None,
37-
ipc_path: str | None = None,
46+
ipc_path: pathlib.Path = pathlib.Path("/tmp/"),
3847
mnemonic: str = (
3948
# Default mnemonic used by Brownie for Ganache forks
4049
"patient rude simple dog close planet oval animal hunt sketch suspect slim"
@@ -47,20 +56,22 @@ def __init__(
4756
ipc_provider_kwargs: dict[str, Any] | None = None,
4857
prune_history: bool = False,
4958
):
50-
def build_anvil_command() -> list[str]: # pragma: no cover
59+
def build_anvil_command(path_to_anvil: pathlib.Path) -> list[str]: # pragma: no cover
5160
command = [
52-
"anvil",
61+
str(path_to_anvil),
5362
"--silent",
5463
"--auto-impersonate",
5564
"--no-rate-limit",
5665
f"--fork-url={fork_url}",
5766
f"--hardfork={hardfork}",
5867
f"--port={self.port}",
59-
f"--ipc={ipc_path}",
68+
f"--ipc={self.ipc_filename}",
6069
f"--mnemonic={mnemonic}",
6170
]
6271
if fork_block:
6372
command.append(f"--fork-block-number={fork_block}")
73+
if fork_transaction_hash:
74+
command.append(f"--fork-transaction-hash={fork_transaction_hash}")
6475
if chain_id:
6576
command.append(f"--chain-id={chain_id}")
6677
if base_fee:
@@ -79,44 +90,36 @@ def build_anvil_command() -> list[str]: # pragma: no cover
7990
command.append("--no-mining")
8091
command.append("--order=fifo")
8192
case _:
82-
raise DegenbotValueError(f"Unknown mining mode '{mining_mode}'.")
93+
raise DegenbotValueError(message=f"Unknown mining mode '{mining_mode}'.")
8394

8495
return command
8596

86-
def get_free_port_number() -> int:
87-
with socket.socket() as sock:
88-
sock.bind(("", 0))
89-
_, _port = sock.getsockname()
90-
return cast(int, _port)
91-
92-
if shutil.which("anvil") is None: # pragma: no cover
93-
raise Exception("Anvil is not installed or not accessible in the current path.")
94-
95-
self.port = port if port is not None else get_free_port_number()
96-
97-
ipc_path = f"/tmp/anvil-{self.port}.ipc" if ipc_path is None else ipc_path
97+
_path_to_anvil = shutil.which("anvil")
98+
if _path_to_anvil is None: # pragma: no cover
99+
raise AnvilNotFound
100+
path_to_anvil = pathlib.Path(_path_to_anvil)
98101

99-
self._process = subprocess.Popen(build_anvil_command())
100102
self.fork_url = fork_url
101-
self.http_url = f"http://localhost:{self.port}"
102-
self.ws_url = f"ws://localhost:{self.port}"
103+
self.port = self._get_free_port_number()
103104
self.ipc_path = ipc_path
105+
self.ipc_provider_kwargs = (
106+
ipc_provider_kwargs if ipc_provider_kwargs is not None else dict()
107+
)
104108

105-
if ipc_provider_kwargs is None:
106-
ipc_provider_kwargs = dict()
107-
self.w3 = Web3(IPCProvider(ipc_path=ipc_path, **ipc_provider_kwargs))
108-
109-
if middlewares is not None:
110-
for middleware, layer in middlewares:
111-
self.w3.middleware_onion.inject(middleware, layer=layer)
112-
113-
while self.w3.is_connected() is False:
114-
continue
109+
self._anvil_command = build_anvil_command(path_to_anvil=path_to_anvil)
110+
self._process = self._setup_subprocess(
111+
anvil_command=self._anvil_command, ipc_path=self.ipc_path
112+
)
113+
self.w3 = Web3(IPCProvider(ipc_path=self.ipc_filename, **self.ipc_provider_kwargs))
115114

116115
self._initial_block_number = (
117116
fork_block if fork_block is not None else self.w3.eth.get_block_number()
118117
)
119118

119+
if middlewares is not None:
120+
for middleware, layer in middlewares:
121+
self.w3.middleware_onion.inject(middleware, layer=layer)
122+
120123
if balance_overrides is not None:
121124
for account, balance in balance_overrides:
122125
self.set_balance(account, balance)
@@ -132,59 +135,133 @@ def get_free_port_number() -> int:
132135
if coinbase is not None:
133136
self.set_coinbase(coinbase)
134137

138+
@property
139+
def http_url(self) -> str:
140+
return f"http://localhost:{self.port}"
141+
142+
@property
143+
def ipc_filename(self) -> pathlib.Path:
144+
return self.ipc_path / f"anvil-{self.port}.ipc"
145+
146+
@property
147+
def ws_url(self) -> str:
148+
return f"ws://localhost:{self.port}"
149+
150+
@staticmethod
151+
def _get_free_port_number() -> int:
152+
with socket.socket() as sock:
153+
sock.bind(("", 0))
154+
_, _port = sock.getsockname()
155+
return cast(int, _port)
156+
157+
def _setup_subprocess(
158+
self, anvil_command: list[str], ipc_path: pathlib.Path
159+
) -> subprocess.Popen[Any]:
160+
"""
161+
Launch an Anvil subprocess, waiting for the IPC file to be created.
162+
"""
163+
164+
class WaitForIPCReady(watchdog.events.FileSystemEventHandler):
165+
def __init__(self, queue: Queue[Any], ipc_filename: str):
166+
self.queue = queue
167+
self.ipc_filename = ipc_filename
168+
169+
def on_created(self, event: watchdog.events.FileSystemEvent) -> None:
170+
if event.src_path == self.ipc_filename: # pragma: no branch
171+
self.queue.put(object())
172+
173+
queue: Queue[Any] = Queue()
174+
observer = watchdog.observers.Observer()
175+
observer.schedule(
176+
event_handler=WaitForIPCReady(
177+
queue,
178+
str(self.ipc_filename),
179+
),
180+
path=str(ipc_path),
181+
)
182+
observer.start()
183+
process = subprocess.Popen(anvil_command)
184+
queue.get(timeout=10)
185+
observer.stop()
186+
observer.join()
187+
188+
return process
189+
135190
def __del__(self) -> None:
136191
self._process.terminate()
137192
self._process.wait()
138193
with contextlib.suppress(FileNotFoundError):
139-
os.remove(self.ipc_path)
140-
141-
def create_access_list(self, transaction: dict[Any, Any]) -> Any:
142-
# Exclude transaction values that are irrelevant for the JSON-RPC method
143-
# ref: https://docs.infura.io/networks/ethereum/json-rpc-methods/eth_createaccesslist
144-
keys_to_drop = ("gasPrice", "maxFeePerGas", "maxPriorityFeePerGas", "gas", "chainId")
145-
sanitized_tx = {k: v for k, v in transaction.items() if k not in keys_to_drop}
146-
147-
# Apply int->hex conversion to some transaction values
148-
# ref: https://docs.infura.io/networks/ethereum/json-rpc-methods/eth_createaccesslist
149-
keys_to_hexify = ("value", "nonce")
150-
for key in keys_to_hexify:
151-
if key in sanitized_tx and isinstance(sanitized_tx[key], int):
152-
sanitized_tx[key] = hex(sanitized_tx[key])
153-
154-
return self.w3.provider.make_request(
155-
method=RPCEndpoint("eth_createAccessList"),
156-
params=[sanitized_tx],
157-
)["result"]["accessList"]
194+
os.remove(self.ipc_filename)
158195

159196
def mine(self) -> None:
160197
self.w3.provider.make_request(
161198
method=RPCEndpoint("evm_mine"),
162199
params=[],
163200
)
164201

165-
def reset(
202+
def _reset(
166203
self,
167-
fork_url: str | None = None,
168-
block_number: int | None = None,
169-
base_fee: int | None = None,
204+
fork_url: str,
205+
block_number: int | None,
170206
) -> None:
171-
forking_params: dict[str, Any] = {
172-
"jsonRpcUrl": fork_url if fork_url is not None else self.fork_url,
173-
"blockNumber": block_number if block_number is not None else self._initial_block_number,
174-
}
207+
forking_params: dict[str, Any] = {"jsonRpcUrl": fork_url}
208+
if block_number:
209+
forking_params["blockNumber"] = block_number
210+
175211
self.w3.provider.make_request(
176212
method=RPCEndpoint("anvil_reset"),
177213
params=[{"forking": forking_params}],
178214
)
179215

180-
if fork_url is not None:
216+
def reset(
217+
self,
218+
fork_url: str | None = None,
219+
block_number: int | None = None,
220+
) -> None:
221+
self._reset(
222+
fork_url=fork_url if fork_url is not None else self.fork_url,
223+
block_number=block_number,
224+
)
225+
if fork_url:
181226
self.fork_url = fork_url
182-
if base_fee is not None:
183-
self.set_next_base_fee(base_fee)
227+
228+
def reset_to_transaction_hash(self, transaction_hash: str) -> None:
229+
"""
230+
Reset to the state after a given transaction hash.
231+
232+
This method will launch a new Anvil process since the anvil_reset API endpoint only supports
233+
resetting to a block number.
234+
"""
235+
236+
self._process.terminate()
237+
self._process.wait()
238+
with contextlib.suppress(FileNotFoundError):
239+
os.remove(self.ipc_filename)
240+
241+
self.port = self._get_free_port_number()
242+
243+
anvil_command = []
244+
for option in self._anvil_command:
245+
if any(
246+
(
247+
"--fork-block-number" in option,
248+
"--fork-transaction-hash" in option,
249+
"--ipc" in option,
250+
"--port" in option,
251+
)
252+
):
253+
continue
254+
anvil_command.append(option)
255+
anvil_command.append(f"--fork-transaction-hash={transaction_hash}")
256+
anvil_command.append(f"--port={self.port}")
257+
anvil_command.append(f"--ipc={self.ipc_filename}")
258+
259+
self._process = self._setup_subprocess(anvil_command=anvil_command, ipc_path=self.ipc_path)
260+
self.w3 = Web3(IPCProvider(ipc_path=self.ipc_filename, **self.ipc_provider_kwargs))
184261

185262
def return_to_snapshot(self, id: int) -> bool:
186263
if id < 0:
187-
raise DegenbotValueError("ID cannot be negative")
264+
raise DegenbotValueError(message="ID cannot be negative")
188265
return bool(
189266
self.w3.provider.make_request(
190267
method=RPCEndpoint("evm_revert"),
@@ -194,9 +271,7 @@ def return_to_snapshot(self, id: int) -> bool:
194271

195272
def set_balance(self, address: str, balance: int) -> None:
196273
if not (0 <= balance <= MAX_UINT256):
197-
raise DegenbotValueError(
198-
"Invalid balance, must be within range: 0 <= balance <= 2**256 - 1"
199-
)
274+
raise InvalidUint256
200275

201276
self.w3.provider.make_request(
202277
method=RPCEndpoint("anvil_setBalance"),
@@ -217,7 +292,7 @@ def set_coinbase(self, address: str) -> None:
217292

218293
def set_next_base_fee(self, fee: int) -> None:
219294
if not (0 <= fee <= MAX_UINT256):
220-
raise DegenbotValueError("Fee outside valid range 0 <= fee <= 2**256-1")
295+
raise InvalidUint256
221296
self.w3.provider.make_request(
222297
method=RPCEndpoint("anvil_setNextBlockBaseFeePerGas"),
223298
params=[fee],

0 commit comments

Comments
 (0)