Skip to content

Commit 8d3ae19

Browse files
committed
refactor(loaders): extract bounded retry/budget pattern into base.py
ccxt_loader and okx each carried their own copy of the same 25-line retry+budget loop introduced in HKUDS#121. Hoist the pattern into two helpers — check_budget() and retry_with_budget() — so any future loader (yfinance, akshare, tushare) inherits the same guarantees: per-call transient retries, a wall-clock budget, non-transient exceptions left unwrapped, original exception preserved as __cause__. ccxt_loader / okx now declare only what is loader-specific (the exception class for transient errors and the budget env var); the scheduling, sleep math, and TimeoutError wrapping live in base.py. Adds 11 helper-level tests + the 11 pre-existing integration tests still pass against the refactored loaders. Follow-up to HKUDS#121.
1 parent d0d78e1 commit 8d3ae19

6 files changed

Lines changed: 295 additions & 58 deletions

File tree

agent/backtest/loaders/base.py

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1-
"""DataLoader Protocol and shared exceptions for all data source loaders."""
1+
"""DataLoader Protocol, shared exceptions, and bounded-retry helpers.
2+
3+
The retry/budget helpers are the canonical pattern for any loader that calls
4+
a flaky external API: a wall-clock deadline plus a small backoff schedule
5+
applied only to a declared transient exception class. New loaders should
6+
import :func:`check_budget` and :func:`retry_with_budget` rather than
7+
re-implementing the loop.
8+
"""
29

310
from __future__ import annotations
411

5-
from typing import Protocol, runtime_checkable
12+
import time
13+
from typing import Callable, Protocol, TypeVar, runtime_checkable
614

715
import pandas as pd
816

@@ -30,6 +38,91 @@ def validate_date_range(start_date: str, end_date: str) -> None:
3038
raise ValueError(f"start_date ({start_date}) > end_date ({end_date})")
3139

3240

41+
# ---------------------------------------------------------------------------
42+
# Bounded retry / budget helpers (shared by ccxt_loader, okx, and any future
43+
# loader calling a flaky external API).
44+
# ---------------------------------------------------------------------------
45+
46+
DEFAULT_BACKOFF: tuple[float, ...] = (0.5, 1.5, 4.0)
47+
DEFAULT_MAX_RETRIES = 3
48+
49+
50+
def check_budget(deadline: float, label: str, budget_s: float | None = None) -> None:
51+
"""Raise :class:`TimeoutError` if the monotonic clock has crossed ``deadline``.
52+
53+
Use this between pages of a paginated fetch to fail fast instead of
54+
grinding through more requests once the wall-clock budget is gone.
55+
56+
Args:
57+
deadline: ``time.monotonic()`` instant past which we abort.
58+
label: Free-form label used in the exception message
59+
(e.g. ``"ccxt fetch for BTC/USDT"``).
60+
budget_s: Original budget in seconds, included verbatim in the
61+
message when present.
62+
"""
63+
if time.monotonic() > deadline:
64+
suffix = f" exceeded {budget_s:.0f}s budget" if budget_s is not None else " exceeded budget"
65+
raise TimeoutError(f"{label}{suffix}")
66+
67+
68+
_T = TypeVar("_T")
69+
70+
71+
def retry_with_budget(
72+
fn: Callable[[], _T],
73+
*,
74+
transient: type[BaseException] | tuple[type[BaseException], ...],
75+
deadline: float,
76+
label: str,
77+
max_retries: int = DEFAULT_MAX_RETRIES,
78+
backoff: tuple[float, ...] = DEFAULT_BACKOFF,
79+
) -> _T:
80+
"""Call ``fn`` with a bounded retry budget on declared transient errors.
81+
82+
Between attempts sleeps ``min(backoff[attempt], remaining_budget)`` so a
83+
short remaining budget never spends the full backoff. The terminal
84+
transient failure — whether ``max_retries`` is exhausted OR the deadline
85+
has passed — is wrapped in :class:`TimeoutError`, preserving the original
86+
exception as ``__cause__``. Anything not in ``transient`` propagates
87+
unchanged on the first occurrence (we never retry an exception class
88+
the caller didn't opt in to).
89+
90+
Args:
91+
fn: Zero-arg callable producing the result.
92+
transient: Exception class(es) considered transient and retryable.
93+
deadline: ``time.monotonic()`` instant past which retries are aborted.
94+
label: Free-form label used in the TimeoutError message
95+
(e.g. ``"OKX fetch for BTC-USDT"``).
96+
max_retries: Additional attempts after the first call. Total
97+
attempts = ``max_retries + 1``.
98+
backoff: Per-retry sleep seconds. Must have at least
99+
``max_retries`` entries.
100+
101+
Returns:
102+
Whatever ``fn`` returns.
103+
104+
Raises:
105+
ValueError: ``backoff`` is shorter than ``max_retries``.
106+
TimeoutError: All retries exhausted or the deadline crossed.
107+
Any non-transient exception: Propagated unchanged from ``fn``.
108+
"""
109+
if len(backoff) < max_retries:
110+
raise ValueError(
111+
f"backoff has {len(backoff)} entries; need >= max_retries ({max_retries})"
112+
)
113+
for attempt in range(max_retries + 1):
114+
try:
115+
return fn()
116+
except transient as exc:
117+
remaining = deadline - time.monotonic()
118+
if attempt == max_retries or remaining <= 0:
119+
raise TimeoutError(
120+
f"{label} failed after {attempt + 1} attempt(s): {exc}"
121+
) from exc
122+
time.sleep(min(backoff[attempt], max(0.0, remaining)))
123+
raise AssertionError("unreachable: retry loop must return or raise") # pragma: no cover
124+
125+
33126
@runtime_checkable
34127
class DataLoaderProtocol(Protocol):
35128
"""Interface that every data source loader must satisfy."""

agent/backtest/loaders/ccxt_loader.py

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
import pandas as pd
1616

17-
from backtest.loaders.base import validate_date_range
17+
from backtest.loaders.base import (
18+
check_budget,
19+
retry_with_budget,
20+
validate_date_range,
21+
)
1822
from backtest.loaders.registry import register
1923

2024
logger = logging.getLogger(__name__)
@@ -27,11 +31,10 @@
2731
# P12-b: ccxt had no request timeout and an unbounded paginated fetch with
2832
# no retry budget, so a transient disconnect hung get_market_data for 10+
2933
# minutes. Cap each HTTP call, bound transient retries, and enforce a hard
30-
# wall-clock budget so the fetch fails fast instead of hanging.
34+
# wall-clock budget so the fetch fails fast instead of hanging. Retry
35+
# scheduling is delegated to :mod:`backtest.loaders.base`.
3136
_CCXT_TIMEOUT_MS = int(os.getenv("CCXT_TIMEOUT_MS", "15000"))
3237
_CCXT_FETCH_BUDGET_S = float(os.getenv("CCXT_FETCH_BUDGET_S", "60"))
33-
_CCXT_MAX_RETRIES = 3
34-
_CCXT_BACKOFF = (0.5, 1.5, 4.0) # seconds; len == _CCXT_MAX_RETRIES
3538

3639

3740
@register
@@ -113,31 +116,19 @@ def _fetch_one(
113116
cursor = since_ms
114117
limit = 1000
115118
deadline = time.monotonic() + _CCXT_FETCH_BUDGET_S
119+
label = f"ccxt fetch for {symbol}"
116120

117121
for _ in range(200):
118-
if time.monotonic() > deadline:
119-
raise TimeoutError(
120-
f"ccxt fetch for {symbol} exceeded "
121-
f"{_CCXT_FETCH_BUDGET_S:.0f}s budget"
122-
)
123-
ohlcv = None
124-
for attempt in range(_CCXT_MAX_RETRIES + 1):
125-
try:
126-
ohlcv = exchange.fetch_ohlcv(
127-
symbol, timeframe, since=cursor, limit=limit
128-
)
129-
break
130-
except ccxt.NetworkError as exc:
131-
# NetworkError covers RequestTimeout / DDoSProtection /
132-
# ExchangeNotAvailable — the transient family. Anything
133-
# else (e.g. ExchangeError: bad symbol) is not retried.
134-
remaining = deadline - time.monotonic()
135-
if attempt == _CCXT_MAX_RETRIES or remaining <= 0:
136-
raise TimeoutError(
137-
f"ccxt fetch for {symbol} failed after "
138-
f"{attempt + 1} attempt(s): {exc}"
139-
) from exc
140-
time.sleep(min(_CCXT_BACKOFF[attempt], max(0.0, remaining)))
122+
check_budget(deadline, label, budget_s=_CCXT_FETCH_BUDGET_S)
123+
# ``ccxt.NetworkError`` covers RequestTimeout / DDoSProtection /
124+
# ExchangeNotAvailable — the transient family. Anything else
125+
# (e.g. ``ExchangeError`` for a bad symbol) is not retried.
126+
ohlcv = retry_with_budget(
127+
lambda: exchange.fetch_ohlcv(symbol, timeframe, since=cursor, limit=limit),
128+
transient=ccxt.NetworkError,
129+
deadline=deadline,
130+
label=label,
131+
)
141132
if not ohlcv:
142133
break
143134
all_rows.extend(ohlcv)

agent/backtest/loaders/okx.py

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@
1212
import pandas as pd
1313
import requests
1414

15-
from backtest.loaders.base import validate_date_range
15+
from backtest.loaders.base import (
16+
check_budget,
17+
retry_with_budget,
18+
validate_date_range,
19+
)
1620
from backtest.loaders.registry import register
1721

1822
BASE_URL = "https://www.okx.com/api/v5"
1923
_MAX_PER_PAGE = 300
2024
# P12-b parity: OKX already sets a per-request timeout but had no retry
2125
# budget, so a transient blip dropped the whole symbol and a slow tier
22-
# could stall ~max_pages*timeout. Bound it like the ccxt loader.
26+
# could stall ~max_pages*timeout. Bound it like the ccxt loader; retry
27+
# scheduling is delegated to :mod:`backtest.loaders.base`.
2328
_OKX_TIMEOUT = int(os.getenv("OKX_TIMEOUT_S", "15"))
2429
_OKX_FETCH_BUDGET_S = float(os.getenv("OKX_FETCH_BUDGET_S", "60"))
25-
_OKX_MAX_RETRIES = 3
26-
_OKX_BACKOFF = (0.5, 1.5, 4.0) # seconds; len == _OKX_MAX_RETRIES
2730

2831

2932
@register
@@ -108,37 +111,31 @@ def _fetch_candles(
108111
all_rows: list = []
109112
after = str(end_ts)
110113
deadline = time.monotonic() + _OKX_FETCH_BUDGET_S
114+
label = f"OKX fetch for {inst_id}"
111115

112116
for _ in range(max_pages):
113-
if time.monotonic() > deadline:
114-
raise TimeoutError(
115-
f"OKX fetch for {inst_id} exceeded "
116-
f"{_OKX_FETCH_BUDGET_S:.0f}s budget"
117-
)
117+
check_budget(deadline, label, budget_s=_OKX_FETCH_BUDGET_S)
118118
params = {
119119
"instId": inst_id,
120120
"bar": bar,
121121
"limit": str(_MAX_PER_PAGE),
122122
"after": after,
123123
}
124-
data = None
125-
for attempt in range(_OKX_MAX_RETRIES + 1):
126-
try:
127-
resp = requests.get(
128-
f"{BASE_URL}/market/candles",
129-
params=params,
130-
timeout=_OKX_TIMEOUT,
131-
)
132-
data = resp.json()
133-
break
134-
except requests.RequestException as exc:
135-
remaining = deadline - time.monotonic()
136-
if attempt == _OKX_MAX_RETRIES or remaining <= 0:
137-
raise TimeoutError(
138-
f"OKX fetch for {inst_id} failed after "
139-
f"{attempt + 1} attempt(s): {exc}"
140-
) from exc
141-
time.sleep(min(_OKX_BACKOFF[attempt], max(0.0, remaining)))
124+
125+
def _do_request() -> dict:
126+
resp = requests.get(
127+
f"{BASE_URL}/market/candles",
128+
params=params,
129+
timeout=_OKX_TIMEOUT,
130+
)
131+
return resp.json()
132+
133+
data = retry_with_budget(
134+
_do_request,
135+
transient=requests.RequestException,
136+
deadline=deadline,
137+
label=label,
138+
)
142139
if data.get("code") != "0" or not data.get("data"):
143140
break
144141

agent/tests/test_ccxt_loader_bounded.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import ccxt
1717

1818
import backtest.loaders.ccxt_loader as cl
19+
from backtest.loaders.base import DEFAULT_MAX_RETRIES
1920
from backtest.loaders.ccxt_loader import DataLoader
2021

2122
SINCE = int(pd.Timestamp("2026-05-01").timestamp() * 1000)
@@ -61,7 +62,7 @@ def test_persistent_disconnect_is_bounded_not_a_hang():
6162
ex = _FakeEx([ccxt.NetworkError("down")]) # always fails
6263
with pytest.raises(TimeoutError):
6364
DataLoader._fetch_one(ex, "BTC/USDT", "1d", SINCE, END)
64-
assert ex.calls == cl._CCXT_MAX_RETRIES + 1 # bounded, not range(200)/forever
65+
assert ex.calls == DEFAULT_MAX_RETRIES + 1 # bounded, not range(200)/forever
6566

6667

6768
def test_non_network_error_is_not_retried():

0 commit comments

Comments
 (0)