Skip to content

Commit cd24f99

Browse files
Teerapat-Vatpitakcodex
authored andcommitted
fix(loaders): bound ccxt + okx fetch (timeout, retry, budget) (HKUDS#121)
* fix(loaders): bound ccxt fetch timeout + retry ccxt_loader set no request timeout and paginated fetch_ohlcv in an unbounded loop with no retry budget, so a transient disconnect hung get_market_data for 10+ minutes (P12-b — it stalled a crypto E2E). Set an explicit per-call timeout on the exchange, retry the transient ccxt.NetworkError family with bounded backoff, and enforce a hard wall-clock budget that raises a clear TimeoutError instead of hanging. Non-network errors (e.g. bad symbol) are not retried; the happy path still issues one call per page (no behavior change). okx.py has the same shape — noted as a follow-up, out of scope here. (cherry picked from commit ef934fbd70209d74a110e81757dbe322a2b717ab) * fix(loaders): bound okx fetch retry + budget okx.py set a per-request timeout but had no retry and no wall-clock budget, so a transient blip dropped the whole symbol via the outer silent skip, and a slow keyless tier could stall roughly max_pages * timeout (P12-b parity with the ccxt loader). Retry the transient requests.RequestException family with bounded backoff and enforce a hard wall-clock budget that raises a clear TimeoutError; non-network errors are not retried; the happy path still issues exactly one request per page (no behavior change). (cherry picked from commit e5030bf3edc3e44cc27003b3ac23e9e4fefc3c97)
1 parent 2933056 commit cd24f99

4 files changed

Lines changed: 254 additions & 4 deletions

File tree

agent/backtest/loaders/ccxt_loader.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import logging
1111
import os
12+
import time
1213
from typing import Dict, List, Optional
1314

1415
import pandas as pd
@@ -23,6 +24,15 @@
2324
"1H": "1h", "4H": "4h", "1D": "1d",
2425
}
2526

27+
# P12-b: ccxt had no request timeout and an unbounded paginated fetch with
28+
# no retry budget, so a transient disconnect hung get_market_data for 10+
29+
# minutes. Cap each HTTP call, bound transient retries, and enforce a hard
30+
# wall-clock budget so the fetch fails fast instead of hanging.
31+
_CCXT_TIMEOUT_MS = int(os.getenv("CCXT_TIMEOUT_MS", "15000"))
32+
_CCXT_FETCH_BUDGET_S = float(os.getenv("CCXT_FETCH_BUDGET_S", "60"))
33+
_CCXT_MAX_RETRIES = 3
34+
_CCXT_BACKOFF = (0.5, 1.5, 4.0) # seconds; len == _CCXT_MAX_RETRIES
35+
2636

2737
@register
2838
class DataLoader:
@@ -51,7 +61,7 @@ def _get_exchange(self):
5161
if exchange_cls is None:
5262
logger.warning("Unknown CCXT exchange %s, falling back to binance", exchange_id)
5363
exchange_cls = ccxt.binance
54-
return exchange_cls({"enableRateLimit": True})
64+
return exchange_cls({"enableRateLimit": True, "timeout": _CCXT_TIMEOUT_MS})
5565

5666
def fetch(
5767
self,
@@ -97,12 +107,37 @@ def _fetch_one(
97107
exchange, symbol: str, timeframe: str, since_ms: int, end_ms: int,
98108
) -> Optional[pd.DataFrame]:
99109
"""Paginated OHLCV fetch for one symbol."""
110+
import ccxt
111+
100112
all_rows: list = []
101113
cursor = since_ms
102114
limit = 1000
115+
deadline = time.monotonic() + _CCXT_FETCH_BUDGET_S
103116

104117
for _ in range(200):
105-
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since=cursor, limit=limit)
118+
if time.monotonic() > deadline:
119+
raise TimeoutError(
120+
f"ccxt fetch for {symbol} exceeded "
121+
f"{_CCXT_FETCH_BUDGET_S:.0f}s budget"
122+
)
123+
ohlcv = None
124+
for attempt in range(_CCXT_MAX_RETRIES + 1):
125+
try:
126+
ohlcv = exchange.fetch_ohlcv(
127+
symbol, timeframe, since=cursor, limit=limit
128+
)
129+
break
130+
except ccxt.NetworkError as exc:
131+
# NetworkError covers RequestTimeout / DDoSProtection /
132+
# ExchangeNotAvailable — the transient family. Anything
133+
# else (e.g. ExchangeError: bad symbol) is not retried.
134+
remaining = deadline - time.monotonic()
135+
if attempt == _CCXT_MAX_RETRIES or remaining <= 0:
136+
raise TimeoutError(
137+
f"ccxt fetch for {symbol} failed after "
138+
f"{attempt + 1} attempt(s): {exc}"
139+
) from exc
140+
time.sleep(min(_CCXT_BACKOFF[attempt], max(0.0, remaining)))
106141
if not ohlcv:
107142
break
108143
all_rows.extend(ohlcv)

agent/backtest/loaders/okx.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
Up to 300 bars per request; paginates with ``after`` for longer history.
66
"""
77

8+
import os
9+
import time
810
from typing import Dict, List, Optional
911

1012
import pandas as pd
@@ -15,6 +17,13 @@
1517

1618
BASE_URL = "https://www.okx.com/api/v5"
1719
_MAX_PER_PAGE = 300
20+
# P12-b parity: OKX already sets a per-request timeout but had no retry
21+
# budget, so a transient blip dropped the whole symbol and a slow tier
22+
# could stall ~max_pages*timeout. Bound it like the ccxt loader.
23+
_OKX_TIMEOUT = int(os.getenv("OKX_TIMEOUT_S", "15"))
24+
_OKX_FETCH_BUDGET_S = float(os.getenv("OKX_FETCH_BUDGET_S", "60"))
25+
_OKX_MAX_RETRIES = 3
26+
_OKX_BACKOFF = (0.5, 1.5, 4.0) # seconds; len == _OKX_MAX_RETRIES
1827

1928

2029
@register
@@ -98,16 +107,38 @@ def _fetch_candles(
98107
"""
99108
all_rows: list = []
100109
after = str(end_ts)
110+
deadline = time.monotonic() + _OKX_FETCH_BUDGET_S
101111

102112
for _ in range(max_pages):
113+
if time.monotonic() > deadline:
114+
raise TimeoutError(
115+
f"OKX fetch for {inst_id} exceeded "
116+
f"{_OKX_FETCH_BUDGET_S:.0f}s budget"
117+
)
103118
params = {
104119
"instId": inst_id,
105120
"bar": bar,
106121
"limit": str(_MAX_PER_PAGE),
107122
"after": after,
108123
}
109-
resp = requests.get(f"{BASE_URL}/market/candles", params=params, timeout=15)
110-
data = resp.json()
124+
data = None
125+
for attempt in range(_OKX_MAX_RETRIES + 1):
126+
try:
127+
resp = requests.get(
128+
f"{BASE_URL}/market/candles",
129+
params=params,
130+
timeout=_OKX_TIMEOUT,
131+
)
132+
data = resp.json()
133+
break
134+
except requests.RequestException as exc:
135+
remaining = deadline - time.monotonic()
136+
if attempt == _OKX_MAX_RETRIES or remaining <= 0:
137+
raise TimeoutError(
138+
f"OKX fetch for {inst_id} failed after "
139+
f"{attempt + 1} attempt(s): {exc}"
140+
) from exc
141+
time.sleep(min(_OKX_BACKOFF[attempt], max(0.0, remaining)))
111142
if data.get("code") != "0" or not data.get("data"):
112143
break
113144

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""Regression tests for P12-b — the ccxt loader must fail fast instead of
2+
hanging on a transient disconnect.
3+
4+
Pre-fix: `_fetch_one` called `exchange.fetch_ohlcv` with no per-call timeout,
5+
no retry, and no wall-clock budget, so a flaky connection hung
6+
`get_market_data` for 10+ minutes. Post-fix: bounded retry on the transient
7+
`ccxt.NetworkError` family + a hard budget that raises a clear `TimeoutError`;
8+
the happy path is unchanged (one call per page).
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import pandas as pd
14+
import pytest
15+
16+
import ccxt
17+
18+
import backtest.loaders.ccxt_loader as cl
19+
from backtest.loaders.ccxt_loader import DataLoader
20+
21+
SINCE = int(pd.Timestamp("2026-05-01").timestamp() * 1000)
22+
END = int((pd.Timestamp("2026-05-05") + pd.Timedelta(days=1)).timestamp() * 1000)
23+
24+
25+
def _bars(n: int = 4) -> list:
26+
base = int(pd.Timestamp("2026-05-01").timestamp() * 1000)
27+
day = 86_400_000
28+
return [[base + i * day, 100 + i, 101 + i, 99 + i, 100 + i, 10 + i] for i in range(n)]
29+
30+
31+
class _FakeEx:
32+
"""Scripted exchange: each fetch_ohlcv call consumes the next script item;
33+
an Exception item is raised, a list item is returned."""
34+
35+
def __init__(self, script: list) -> None:
36+
self.script = script
37+
self.calls = 0
38+
39+
def fetch_ohlcv(self, symbol, timeframe, since=None, limit=None):
40+
item = self.script[min(self.calls, len(self.script) - 1)]
41+
self.calls += 1
42+
if isinstance(item, BaseException):
43+
raise item
44+
return item
45+
46+
47+
@pytest.fixture(autouse=True)
48+
def _no_sleep(monkeypatch):
49+
monkeypatch.setattr(cl.time, "sleep", lambda *_a, **_k: None)
50+
51+
52+
def test_transient_networkerror_retried_then_succeeds():
53+
ex = _FakeEx([ccxt.NetworkError("blip"), ccxt.NetworkError("blip"), _bars(), []])
54+
df = DataLoader._fetch_one(ex, "BTC/USDT", "1d", SINCE, END)
55+
assert ex.calls >= 3
56+
assert df is not None and not df.empty
57+
58+
59+
def test_persistent_disconnect_is_bounded_not_a_hang():
60+
"""The old 10-min hang: now a bounded TimeoutError after a fixed budget."""
61+
ex = _FakeEx([ccxt.NetworkError("down")]) # always fails
62+
with pytest.raises(TimeoutError):
63+
DataLoader._fetch_one(ex, "BTC/USDT", "1d", SINCE, END)
64+
assert ex.calls == cl._CCXT_MAX_RETRIES + 1 # bounded, not range(200)/forever
65+
66+
67+
def test_non_network_error_is_not_retried():
68+
ex = _FakeEx([ccxt.ExchangeError("bad symbol")])
69+
with pytest.raises(ccxt.ExchangeError):
70+
DataLoader._fetch_one(ex, "BTC/USDT", "1d", SINCE, END)
71+
assert ex.calls == 1
72+
73+
74+
def test_happy_path_single_call_unchanged():
75+
ex = _FakeEx([_bars(), []])
76+
df = DataLoader._fetch_one(ex, "BTC/USDT", "1d", SINCE, END)
77+
assert ex.calls == 1 # short page (< limit) -> exactly one call, as before
78+
assert list(df.columns) == ["open", "high", "low", "close", "volume"]
79+
80+
81+
def test_wallclock_budget_enforced(monkeypatch):
82+
seq = iter([1000.0, 1000.0, 1_000_000.0]) # deadline blown by the retry check
83+
monkeypatch.setattr(cl.time, "monotonic", lambda: next(seq, 1_000_000.0))
84+
ex = _FakeEx([ccxt.NetworkError("slow")])
85+
with pytest.raises(TimeoutError):
86+
DataLoader._fetch_one(ex, "BTC/USDT", "1d", SINCE, END)
87+
88+
89+
def test_get_exchange_sets_explicit_timeout():
90+
ex = DataLoader()._get_exchange()
91+
assert ex.timeout == cl._CCXT_TIMEOUT_MS
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""Regression tests for the P12-b okx.py parity fix — the OKX loader must
2+
fail fast on a transient disconnect instead of silently dropping the symbol
3+
or stalling ~max_pages*timeout.
4+
5+
Pre-fix: `_fetch_candles` called requests.get once per page with no retry and
6+
no wall-clock budget. Post-fix: bounded retry on the transient
7+
requests.RequestException family + a hard budget raising a clear TimeoutError;
8+
the happy path still issues one request per page (no behavior change).
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import pandas as pd
14+
import pytest
15+
import requests
16+
17+
import backtest.loaders.okx as okx
18+
from backtest.loaders.okx import DataLoader
19+
20+
S = int(pd.Timestamp("2026-05-01").timestamp() * 1000)
21+
E = int((pd.Timestamp("2026-05-05") + pd.Timedelta(days=1)).timestamp() * 1000)
22+
23+
24+
class _Resp:
25+
def __init__(self, payload):
26+
self._p = payload
27+
28+
def json(self):
29+
return self._p
30+
31+
32+
def _ok_page():
33+
# one short page (< _MAX_PER_PAGE) so the loop breaks after one call
34+
ts = int(pd.Timestamp("2026-05-02").timestamp() * 1000)
35+
return _Resp({"code": "0", "data": [[ts, "1", "2", "0.5", "1.5", "10", "0", "0", "1"]]})
36+
37+
38+
class _Seq:
39+
def __init__(self, script):
40+
self.script = script
41+
self.calls = 0
42+
43+
def __call__(self, *a, **k):
44+
item = self.script[min(self.calls, len(self.script) - 1)]
45+
self.calls += 1
46+
if isinstance(item, BaseException):
47+
raise item
48+
return item
49+
50+
51+
@pytest.fixture(autouse=True)
52+
def _no_sleep(monkeypatch):
53+
monkeypatch.setattr(okx.time, "sleep", lambda *_a, **_k: None)
54+
55+
56+
def test_transient_then_success(monkeypatch):
57+
seq = _Seq([requests.ConnectionError("blip"), requests.ConnectionError("blip"), _ok_page()])
58+
monkeypatch.setattr(okx.requests, "get", seq)
59+
df = DataLoader()._fetch_candles("BTC-USDT", S, E, "1D", 20)
60+
assert seq.calls >= 3
61+
assert df is not None and not df.empty
62+
63+
64+
def test_persistent_disconnect_is_bounded(monkeypatch):
65+
seq = _Seq([requests.ConnectionError("down")])
66+
monkeypatch.setattr(okx.requests, "get", seq)
67+
with pytest.raises(TimeoutError):
68+
DataLoader()._fetch_candles("BTC-USDT", S, E, "1D", 20)
69+
assert seq.calls == okx._OKX_MAX_RETRIES + 1 # bounded, not max_pages/forever
70+
71+
72+
def test_non_network_error_not_retried(monkeypatch):
73+
seq = _Seq([KeyError("logic bug")])
74+
monkeypatch.setattr(okx.requests, "get", seq)
75+
with pytest.raises(KeyError):
76+
DataLoader()._fetch_candles("BTC-USDT", S, E, "1D", 20)
77+
assert seq.calls == 1
78+
79+
80+
def test_happy_path_single_call(monkeypatch):
81+
seq = _Seq([_ok_page()])
82+
monkeypatch.setattr(okx.requests, "get", seq)
83+
df = DataLoader()._fetch_candles("BTC-USDT", S, E, "1D", 20)
84+
assert seq.calls == 1
85+
assert list(df.columns) == ["open", "high", "low", "close", "volume"]
86+
87+
88+
def test_wallclock_budget_enforced(monkeypatch):
89+
seq = iter([1000.0, 1000.0, 1_000_000.0])
90+
monkeypatch.setattr(okx.time, "monotonic", lambda: next(seq, 1_000_000.0))
91+
monkeypatch.setattr(okx.requests, "get", _Seq([requests.ConnectionError("slow")]))
92+
with pytest.raises(TimeoutError):
93+
DataLoader()._fetch_candles("BTC-USDT", S, E, "1D", 20)

0 commit comments

Comments
 (0)