Skip to content

Handle Incorrect Timeouts #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import ssl
import time
import warnings
from unittest.mock import AsyncMock
from hashlib import blake2b
from typing import (
Expand Down Expand Up @@ -531,15 +532,31 @@ def __init__(
self._exit_task = None
self._open_subscriptions = 0
self._options = options if options else {}
self.last_received = time.time()
try:
now = asyncio.get_running_loop().time()
except RuntimeError:
warnings.warn(
"You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. "
"Verify this is intended."
)
now = asyncio.new_event_loop().time()
self.last_received = now
self.last_sent = now

async def __aenter__(self):
async with self._lock:
self._in_use += 1
await self.connect()
return self

@staticmethod
async def loop_time() -> float:
return asyncio.get_running_loop().time()

async def connect(self, force=False):
now = await self.loop_time()
self.last_received = now
self.last_sent = now
if self._exit_task:
self._exit_task.cancel()
if not self._initialized or force:
Expand Down Expand Up @@ -595,7 +612,7 @@ async def _recv(self) -> None:
try:
# TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic
response = json.loads(await self.ws.recv(decode=False))
self.last_received = time.time()
self.last_received = await self.loop_time()
async with self._lock:
# note that these 'subscriptions' are all waiting sent messages which have not received
# responses, and thus are not the same as RPC 'subscriptions', which are unique
Expand Down Expand Up @@ -631,12 +648,12 @@ async def send(self, payload: dict) -> int:
Returns:
id: the internal ID of the request (incremented int)
"""
# async with self._lock:
original_id = get_next_id()
# self._open_subscriptions += 1
await self.max_subscriptions.acquire()
try:
await self.ws.send(json.dumps({**payload, **{"id": original_id}}))
self.last_sent = await self.loop_time()
return original_id
except (ConnectionClosed, ssl.SSLError, EOFError):
async with self._lock:
Expand Down Expand Up @@ -2126,7 +2143,11 @@ async def _make_rpc_request(

if request_manager.is_complete:
break
if time.time() - self.ws.last_received >= self.retry_timeout:
if (
(current_time := await self.ws.loop_time()) - self.ws.last_received
>= self.retry_timeout
and current_time - self.ws.last_sent >= self.retry_timeout
):
if attempt >= self.max_retries:
logger.warning(
f"Timed out waiting for RPC requests {attempt} times. Exiting."
Expand Down
Loading