diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 312fb30..3a6c225 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -9,6 +9,7 @@ import logging import ssl import time +import warnings from unittest.mock import AsyncMock from hashlib import blake2b from typing import ( @@ -531,7 +532,16 @@ def __init__( self._exit_task = None self._open_subscriptions = 0 self._options = options if options else {} - self.last_received = time.time() + try: + now = asyncio.get_running_loop().time() + except RuntimeError: + warnings.warn( + "You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. " + "Verify this is intended." + ) + now = asyncio.new_event_loop().time() + self.last_received = now + self.last_sent = now async def __aenter__(self): async with self._lock: @@ -539,7 +549,14 @@ async def __aenter__(self): await self.connect() return self + @staticmethod + async def loop_time() -> float: + return asyncio.get_running_loop().time() + async def connect(self, force=False): + now = await self.loop_time() + self.last_received = now + self.last_sent = now if self._exit_task: self._exit_task.cancel() if not self._initialized or force: @@ -595,7 +612,7 @@ async def _recv(self) -> None: try: # TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic response = json.loads(await self.ws.recv(decode=False)) - self.last_received = time.time() + self.last_received = await self.loop_time() async with self._lock: # note that these 'subscriptions' are all waiting sent messages which have not received # responses, and thus are not the same as RPC 'subscriptions', which are unique @@ -631,12 +648,12 @@ async def send(self, payload: dict) -> int: Returns: id: the internal ID of the request (incremented int) """ - # async with self._lock: original_id = get_next_id() # self._open_subscriptions += 1 await self.max_subscriptions.acquire() try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) + self.last_sent = await self.loop_time() return original_id except (ConnectionClosed, ssl.SSLError, EOFError): async with self._lock: @@ -2126,7 +2143,11 @@ async def _make_rpc_request( if request_manager.is_complete: break - if time.time() - self.ws.last_received >= self.retry_timeout: + if ( + (current_time := await self.ws.loop_time()) - self.ws.last_received + >= self.retry_timeout + and current_time - self.ws.last_sent >= self.retry_timeout + ): if attempt >= self.max_retries: logger.warning( f"Timed out waiting for RPC requests {attempt} times. Exiting."