Skip to content

Commit ff3db67

Browse files
committed
fix: replace progressive eth_getLogs scan with Multicall3 to avoid BSC rate limits
1 parent f506a53 commit ff3db67

3 files changed

Lines changed: 568 additions & 159 deletions

File tree

bnbagent/apex/server/job_ops.py

Lines changed: 79 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ def __init__(
101101
self._deliverable_urls: dict[int, str] = {} # job_id → data_url
102102
self._last_scanned_block: int | None = None
103103
self._startup_scan_done: bool = False
104+
self._last_known_next_id: int = 0
105+
self._pending_open_ids: set[int] = set() # OPEN jobs assigned to this agent
104106

105107
def _get_client(self) -> APEXClient:
106108
"""Get or create APEXClient instance (sync — no I/O on first call beyond ABI load)."""
@@ -324,6 +326,58 @@ async def get_job_status(self, job_id: int) -> dict[str, Any]:
324326
logger.error(f"[APEXJobOps] get_job_status({job_id}) failed: {e}")
325327
return {"success": False, "error": str(e)}
326328

329+
async def _multicall_scan(self, job_ids: list[int]) -> dict[str, Any]:
330+
"""Scan a list of job IDs via Multicall3 and return funded jobs for this agent.
331+
332+
Uses a single ``eth_call`` (via ``get_jobs_batch``) instead of
333+
``eth_getLogs``, avoiding BSC public node rate limits on log queries.
334+
335+
Also tracks OPEN jobs assigned to this agent in ``_pending_open_ids``
336+
so they can be re-checked on subsequent polls (a job may transition
337+
from OPEN → FUNDED between polls without changing ``next_job_id``).
338+
339+
Args:
340+
job_ids: Job IDs to check.
341+
342+
Returns:
343+
``{"success": True, "jobs": [...]}`` with funded, non-expired jobs
344+
assigned to this agent.
345+
"""
346+
if not job_ids:
347+
return {"success": True, "jobs": []}
348+
349+
client = self._get_client()
350+
my_address = self.agent_address.lower()
351+
352+
all_jobs = await asyncio.to_thread(client.get_jobs_batch, list(job_ids))
353+
354+
now = int(time.time())
355+
pending_jobs = []
356+
for job in all_jobs:
357+
if job is None:
358+
continue
359+
provider = job.get("provider", "").lower()
360+
status = job.get("status")
361+
expired_at = job.get("expiredAt", 0)
362+
job_id = job.get("jobId")
363+
364+
if provider != my_address:
365+
# Not our job — stop tracking if we were
366+
self._pending_open_ids.discard(job_id)
367+
continue
368+
369+
if status == APEXStatus.FUNDED and expired_at > now:
370+
pending_jobs.append({"success": True, **job})
371+
self._pending_open_ids.discard(job_id)
372+
elif status == APEXStatus.OPEN:
373+
# Track OPEN jobs so we re-check them next poll
374+
self._pending_open_ids.add(job_id)
375+
else:
376+
# Terminal state (COMPLETED, REJECTED, EXPIRED, etc.)
377+
self._pending_open_ids.discard(job_id)
378+
379+
return {"success": True, "jobs": pending_jobs}
380+
327381
async def _startup_scan(self) -> dict[str, Any]:
328382
"""One-time batch scan of all jobs via Multicall3.
329383
@@ -333,13 +387,12 @@ async def _startup_scan(self) -> dict[str, Any]:
333387
334388
After completing, sets ``_startup_scan_done = True`` and records the
335389
block number at the time of the snapshot so that subsequent calls can
336-
do progressive event scanning from that point forward.
390+
do progressive Multicall3 scanning from that point forward.
337391
338392
If the Multicall3 batch read fails, falls back to the original
339393
event-based scan for this one call.
340394
"""
341395
client = self._get_client()
342-
my_address = self.agent_address.lower()
343396

344397
# Record block BEFORE scanning so progressive scanning picks up
345398
# any events emitted during or after the batch read.
@@ -352,40 +405,24 @@ async def _startup_scan(self) -> dict[str, Any]:
352405
self._startup_scan_done = True
353406
return {"success": True, "jobs": []}
354407

355-
all_jobs = await asyncio.to_thread(
356-
client.get_jobs_batch, list(range(next_id))
357-
)
358-
359-
now = int(time.time())
360-
pending_jobs = []
361-
for job in all_jobs:
362-
if job is None:
363-
continue
364-
provider = job.get("provider", "").lower()
365-
status = job.get("status")
366-
expired_at = job.get("expiredAt", 0)
367-
if (
368-
provider == my_address
369-
and status == APEXStatus.FUNDED
370-
and expired_at > now
371-
):
372-
# Add success key for consistency with get_job() wrapper
373-
pending_jobs.append({"success": True, **job})
408+
result = await self._multicall_scan(list(range(next_id)))
374409

375410
self._last_scanned_block = snapshot_block
411+
self._last_known_next_id = next_id
376412
self._startup_scan_done = True
377413
logger.info(
378-
f"[APEXJobOps] Startup scan complete: {len(pending_jobs)} pending"
414+
f"[APEXJobOps] Startup scan complete: {len(result['jobs'])} pending"
379415
f" out of {next_id} total jobs (snapshot block {snapshot_block})"
380416
)
381-
return {"success": True, "jobs": pending_jobs}
417+
return result
382418

383419
except Exception as e:
384420
logger.warning(
385421
f"[APEXJobOps] Multicall startup scan failed ({e}),"
386422
" falling back to event scan"
387423
)
388424
# Fall back to original event-based scan
425+
my_address = self.agent_address.lower()
389426
try:
390427
latest_block = snapshot_block
391428
from_block = max(0, latest_block - 45000)
@@ -446,19 +483,19 @@ async def get_pending_jobs(
446483
"""
447484
Get funded jobs assigned to this agent.
448485
449-
Uses a two-phase hybrid approach to avoid ``eth_getLogs`` rate limits:
486+
Uses Multicall3 ``eth_call`` exclusively to avoid ``eth_getLogs`` rate
487+
limits on BSC public nodes:
450488
451489
1. **Startup** (first call): Multicall3 batch scan of all existing jobs.
452-
Uses cheap ``eth_call`` instead of ``eth_getLogs`` over a large range.
453-
2. **Runtime** (subsequent calls): Progressive event scanning — only
454-
queries new blocks since the last poll (~3 blocks per 10s interval).
490+
2. **Runtime** (subsequent calls): Check ``next_job_id()`` — if unchanged,
491+
no new jobs exist (0 extra RPCs). If new jobs exist, scan only the
492+
new ID range via Multicall3 (1 ``eth_call``).
455493
456-
If the caller explicitly passes ``from_block``, the value is honored
457-
directly and ``_last_scanned_block`` is NOT updated (preserving the
458-
progressive scanning state for the next automatic call).
494+
If the caller explicitly passes ``from_block``, the original event-based
495+
scan is used instead (for backwards compatibility).
459496
460497
Args:
461-
from_block: Starting block number. When None, uses progressive
498+
from_block: Starting block number. When None, uses Multicall3
462499
scanning (startup scan on first call, then incremental).
463500
to_block: Ending block number or "latest"
464501
max_block_range: Maximum block range for fallback queries
@@ -477,34 +514,22 @@ async def get_pending_jobs(
477514
if not self._startup_scan_done:
478515
return await self._startup_scan()
479516

480-
# Subsequent calls: progressive event scanning
517+
# Subsequent calls: progressive Multicall3 scanning
481518
client = self._get_client()
482-
my_address = self.agent_address.lower()
483-
latest_block = await asyncio.to_thread(lambda: client.w3.eth.block_number)
519+
next_id = await asyncio.to_thread(client.next_job_id)
484520

485-
# 5-block overlap for reorg safety
486-
scan_from = max(0, self._last_scanned_block - 5)
521+
# Collect IDs to scan: new job IDs + previously-seen OPEN jobs
522+
scan_set: set[int] = set()
523+
if next_id > self._last_known_next_id:
524+
scan_set.update(range(self._last_known_next_id, next_id))
525+
scan_set.update(self._pending_open_ids)
487526

488-
if scan_from >= latest_block:
527+
if not scan_set:
489528
return {"success": True, "jobs": []}
490529

491-
try:
492-
result = await self._event_scan(scan_from, to_block, my_address)
493-
self._consecutive_scan_failures = 0
494-
return result
495-
except Exception as e:
496-
self._consecutive_scan_failures = getattr(
497-
self, "_consecutive_scan_failures", 0
498-
) + 1
499-
logger.warning(
500-
f"[APEXJobOps] Progressive scan failed ({e}),"
501-
f" will retry after backoff"
502-
f" (failures={self._consecutive_scan_failures})"
503-
)
504-
return {"success": False, "error": str(e), "jobs": []}
505-
finally:
506-
# Always advance so we don't re-scan the same range
507-
self._last_scanned_block = latest_block
530+
result = await self._multicall_scan(sorted(scan_set))
531+
self._last_known_next_id = next_id
532+
return result
508533

509534
except Exception as e:
510535
logger.error(f"[APEXJobOps] get_pending_jobs failed: {e}")
@@ -660,20 +685,8 @@ async def on_job(job: dict) -> tuple[str, dict] # async + per-job metadata
660685

661686
skipped_jobs: set[int] = set()
662687

663-
max_backoff = poll_interval * 12 # cap at ~2 minutes for 10s base
664-
665688
while True:
666689
try:
667-
# Backoff when rate-limited: exponential up to max_backoff
668-
consecutive_failures = getattr(job_ops, "_consecutive_scan_failures", 0)
669-
if consecutive_failures > 0:
670-
backoff = min(poll_interval * (2 ** consecutive_failures), max_backoff)
671-
logger.debug(
672-
f"[JobRunner] Backing off {backoff}s"
673-
f" (consecutive failures: {consecutive_failures})"
674-
)
675-
await asyncio.sleep(backoff)
676-
677690
result = await job_ops.get_pending_jobs()
678691

679692
if not result.get("success"):

0 commit comments

Comments
 (0)