Skip to content

Commit eebd902

Browse files
authored
Merge pull request #45 from TNT-Likely/fix/proxy-and-sqlite-lock
fix: 生产代理拦截行情接口 + 缓解 SQLite database is locked
2 parents 59ef015 + b48804b commit eebd902

11 files changed

Lines changed: 89 additions & 19 deletions

src/collectors/akshare_collector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def _fetch_tencent_quotes(symbols: list[str]) -> list[dict]:
130130
if not symbols:
131131
return []
132132
url = TENCENT_QUOTE_URL + ",".join(symbols)
133-
with httpx.Client() as client:
133+
with httpx.Client(trust_env=False) as client: # 行情直连,绕过 env 代理
134134
resp = client.get(url, timeout=10)
135135
content = resp.content.decode("gbk", errors="ignore")
136136

src/collectors/capital_flow_collector.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ def get_capital_flow(self, symbol: str) -> CapitalFlow | None:
7777
}
7878

7979
try:
80-
with httpx.Client(follow_redirects=True, timeout=8) as client:
80+
with httpx.Client(
81+
follow_redirects=True, timeout=8, trust_env=False
82+
) as client: # 行情直连,绕过 env 代理(生产代理会拦 push2his.eastmoney)
8183
resp = client.get(EASTMONEY_FLOW_URL, params=params, headers=headers)
8284
data = resp.json()
8385

src/collectors/events_collector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ async def fetch_events(
116116
verify=self.verify_ssl,
117117
headers=headers,
118118
follow_redirects=True,
119-
trust_env=True,
119+
trust_env=False, # 不吃 env 代理(Telegram/AI 用),仅用显式配置的 self.proxy
120120
proxy=self.proxy,
121121
) as client:
122122
resp = await client.get(self.API_URL, params=params)

src/collectors/kline_collector.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ def _fetch_stooq_us_klines(symbol: str) -> list[KlineData]:
5252
try:
5353
timeout = 12 + attempt * 6
5454
with httpx.Client(
55-
follow_redirects=True, timeout=timeout, headers=headers
55+
follow_redirects=True,
56+
timeout=timeout,
57+
headers=headers,
58+
trust_env=False, # 行情直连,绕过 env 代理(生产代理会拦行情接口)
5659
) as client:
5760
resp = client.get(url, params=params)
5861
resp.raise_for_status()
@@ -155,6 +158,7 @@ def _fetch_eastmoney_klines(
155158
follow_redirects=True,
156159
timeout=12 + attempt * 6,
157160
headers=headers,
161+
trust_env=False, # 行情直连,绕过 env 代理(生产代理会拦 push2his.eastmoney)
158162
) as client:
159163
resp = client.get(EASTMONEY_KLINE_URL, params=params)
160164
resp.raise_for_status()
@@ -492,7 +496,9 @@ def get_klines(self, symbol: str, days: int = 60) -> list[KlineData]:
492496
}
493497

494498
try:
495-
with httpx.Client(follow_redirects=True, timeout=10) as client:
499+
with httpx.Client(
500+
follow_redirects=True, timeout=10, trust_env=False
501+
) as client: # 行情直连,绕过 env 代理(生产代理会拦行情接口)
496502
resp = client.get(TENCENT_KLINE_URL, params=params)
497503
text = resp.text
498504

@@ -550,7 +556,9 @@ def get_klines(self, symbol: str, days: int = 60) -> list[KlineData]:
550556

551557
# CN/HK: Tencent 在高 days 时可能只返回近几年,尝试 Eastmoney 补全更长历史
552558
if self.market in (MarketCode.CN, MarketCode.HK):
553-
need_em = days >= 500 or len(klines) < max(120, int(days * 0.6))
559+
# 仅当腾讯返回不足时才回退东财。去掉原 `days >= 500` 的无条件触发——
560+
# 否则评估循环(days=600)每只标的都额外打一次东财,把它打到自我限流。
561+
need_em = len(klines) < max(120, int(days * 0.6))
554562
if need_em:
555563
# 额外放大窗口,提升拿到更长历史的概率
556564
em_target_days = min(max(days, 3000), 20000)

src/collectors/news_collector.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ async def fetch_news(self, symbols: list[str] | None = None, since: datetime | N
109109
if self.cookies:
110110
headers["Cookie"] = self.cookies
111111

112-
async with httpx.AsyncClient(timeout=8, headers=headers) as client:
112+
async with httpx.AsyncClient(timeout=8, headers=headers, trust_env=False) as client: # CN 源直连,绕过 env 代理
113113
tasks = [self._fetch_for_symbol(client, symbol, since) for symbol in a_share_symbols]
114114
results = await asyncio.gather(*tasks, return_exceptions=True)
115115

@@ -284,7 +284,7 @@ async def fetch_with_limit(client, symbol, stock_name):
284284
"Referer": "https://so.eastmoney.com/",
285285
"Accept": "*/*",
286286
}
287-
async with httpx.AsyncClient(timeout=8, verify=False, headers=headers) as client:
287+
async with httpx.AsyncClient(timeout=8, verify=False, headers=headers, trust_env=False) as client: # CN 源直连,绕过 env 代理
288288
tasks = [
289289
fetch_with_limit(client, symbol, symbol_names.get(symbol, symbol))
290290
for symbol in symbols
@@ -323,7 +323,7 @@ async def fetch_by_keyword(self, keyword: str) -> list[NewsItem]:
323323
"Referer": "https://so.eastmoney.com/",
324324
"Accept": "*/*",
325325
}
326-
async with httpx.AsyncClient(timeout=8, verify=False, headers=headers) as client:
326+
async with httpx.AsyncClient(timeout=8, verify=False, headers=headers, trust_env=False) as client: # CN 源直连,绕过 env 代理
327327
return await self._fetch_for_symbol(client, keyword, keyword, None)
328328

329329
async def _fetch_for_symbol(self, client: httpx.AsyncClient, symbol: str, stock_name: str, since: datetime | None) -> list[NewsItem]:
@@ -482,7 +482,7 @@ async def fetch_news(self, symbols: list[str] | None = None, since: datetime | N
482482
}
483483

484484
try:
485-
async with httpx.AsyncClient(timeout=5, verify=False) as client:
485+
async with httpx.AsyncClient(timeout=5, verify=False, trust_env=False) as client: # CN 源直连,绕过 env 代理
486486
resp = await client.get(self.API_URL, params=params)
487487
resp.raise_for_status()
488488
data = resp.json()

src/core/entry_candidates.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,6 +1709,7 @@ def evaluate_entry_candidate_outcomes(
17091709

17101710
today = date.today()
17111711
kline_cache: dict[tuple[str, str], list] = {}
1712+
pending = 0 # 分批提交计数,缩短写事务窗口
17121713

17131714
for c in candidates:
17141715
snap_day = _parse_day(c.snapshot_date)
@@ -1804,6 +1805,12 @@ def evaluate_entry_candidate_outcomes(
18041805
db.add(row)
18051806
stats["evaluated"] += 1
18061807
existing.add((c.id, horizon))
1808+
pending += 1
1809+
1810+
# 分批提交:累计到阈值即落盘,缩短写事务,避免与 60s 调度器并发写长时间持锁
1811+
if pending >= 50:
1812+
db.commit()
1813+
pending = 0
18071814

18081815
db.commit()
18091816
return stats

src/core/paper_trading_engine.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -470,16 +470,20 @@ def _check_exits(
470470

471471
# 检查信号反转
472472
if pos.signal_run_id:
473-
latest = (
474-
db.query(StrategySignalRun)
475-
.filter(
476-
StrategySignalRun.stock_symbol == pos.stock_symbol,
477-
StrategySignalRun.stock_market == pos.stock_market,
478-
StrategySignalRun.status == "active",
473+
# no_autoflush: 信号查询是只读的,不要把本轮累积的持仓现价更新提前 flush——
474+
# 否则扫描中途会反复抢 SQLite 写锁,与其它调度器并发写时触发 "database is locked"。
475+
# 所有写入统一在本方法末尾 db.commit() 时一次性落盘。
476+
with db.no_autoflush:
477+
latest = (
478+
db.query(StrategySignalRun)
479+
.filter(
480+
StrategySignalRun.stock_symbol == pos.stock_symbol,
481+
StrategySignalRun.stock_market == pos.stock_market,
482+
StrategySignalRun.status == "active",
483+
)
484+
.order_by(StrategySignalRun.created_at.desc())
485+
.first()
479486
)
480-
.order_by(StrategySignalRun.created_at.desc())
481-
.first()
482-
)
483487
if latest and latest.action in ("sell", "reduce"):
484488
trade = self._close_position(db, account, pos, current_price, "signal_reversal")
485489
exit_events.append((pos, trade))

src/core/paper_trading_scheduler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def start(self):
6262
self._scan_job,
6363
"interval",
6464
seconds=self.interval_seconds,
65+
jitter=20, # 抖动错峰,避免与价格提醒扫描每 60s 同刻并发写 SQLite
6566
id="paper_trading_scan",
6667
replace_existing=True,
6768
coalesce=True,

src/core/price_alert_scheduler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def start(self):
5050
self._scan_job,
5151
"interval",
5252
seconds=self.interval_seconds,
53+
jitter=20, # 抖动错峰,避免与模拟盘扫描每 60s 同刻并发写 SQLite
5354
id="price_alert_scan",
5455
replace_existing=True,
5556
coalesce=True,

src/core/strategy_engine.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,6 +1567,7 @@ def evaluate_strategy_outcomes(
15671567

15681568
today = date.today()
15691569
kline_cache: dict[tuple[str, str], list] = {}
1570+
pending = 0 # 分批提交计数,缩短写事务窗口
15701571

15711572
for s in signals:
15721573
snap_day = _parse_day(s.snapshot_date)
@@ -1662,6 +1663,12 @@ def evaluate_strategy_outcomes(
16621663
)
16631664
stats["evaluated"] += 1
16641665
existing.add((s.id, horizon))
1666+
pending += 1
1667+
1668+
# 分批提交:累计到阈值即落盘,缩短写事务,避免与 60s 调度器并发写长时间持锁
1669+
if pending >= 50:
1670+
db.commit()
1671+
pending = 0
16651672

16661673
db.commit()
16671674
return stats

0 commit comments

Comments
 (0)