Skip to content

Commit d849edf

Browse files
fix(network): harden shared httpx client against stale connections (#1018)
RSS loop runs every 900s but reuses the same httpx.AsyncClient connection pool, well past server keep-alive expiry (60-120s). Reusing dead sockets produced ConnectTimeout errors over time. - Configure httpx.Limits with keepalive_expiry=60s so idle connections are dropped proactively - reset_shared_client() called on retry paths to force a fresh pool after ConnectError - asyncio.Semaphore(5) in refresh_rss() caps concurrent RSS fetches to avoid tripping site rate limits Closes #1008 Related #1010, #742, #701
1 parent 6235892 commit d849edf

4 files changed

Lines changed: 203 additions & 33 deletions

File tree

backend/src/module/network/request_url.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@
1212
_shared_client: httpx.AsyncClient | None = None
1313
_shared_client_proxy_key: str | None = None
1414

15+
# RSS 循环间隔 900s, 远超服务端 keep-alive 超时(60-120s)
16+
# keepalive_expiry=60 让空闲连接在过期前主动丢弃,避免复用过期连接
17+
# max_connections=20 足够覆盖典型订阅数量
18+
_CONNECTION_LIMITS = httpx.Limits(
19+
max_keepalive_connections=5,
20+
max_connections=20,
21+
keepalive_expiry=60.0,
22+
)
23+
1524

1625
def _proxy_config_key() -> str:
1726
if settings.proxy.enable:
@@ -33,22 +42,37 @@ async def get_shared_client() -> httpx.AsyncClient:
3342
proxy_url = f"http://{settings.proxy.username}:{settings.proxy.password}@{settings.proxy.host}:{settings.proxy.port}"
3443
else:
3544
proxy_url = f"http://{settings.proxy.host}:{settings.proxy.port}"
36-
_shared_client = httpx.AsyncClient(proxy=proxy_url, timeout=timeout)
45+
_shared_client = httpx.AsyncClient(
46+
proxy=proxy_url, timeout=timeout, limits=_CONNECTION_LIMITS
47+
)
3748
elif settings.proxy.type == "socks5":
3849
if settings.proxy.username:
3950
socks_url = f"socks5://{settings.proxy.username}:{settings.proxy.password}@{settings.proxy.host}:{settings.proxy.port}"
4051
else:
4152
socks_url = f"socks5://{settings.proxy.host}:{settings.proxy.port}"
4253
transport = AsyncProxyTransport.from_url(socks_url, rdns=True)
43-
_shared_client = httpx.AsyncClient(transport=transport, timeout=timeout)
54+
_shared_client = httpx.AsyncClient(
55+
transport=transport, timeout=timeout, limits=_CONNECTION_LIMITS
56+
)
4457
else:
45-
_shared_client = httpx.AsyncClient(timeout=timeout)
58+
_shared_client = httpx.AsyncClient(
59+
timeout=timeout, limits=_CONNECTION_LIMITS
60+
)
4661
else:
47-
_shared_client = httpx.AsyncClient(timeout=timeout)
62+
_shared_client = httpx.AsyncClient(timeout=timeout, limits=_CONNECTION_LIMITS)
4863
_shared_client_proxy_key = current_key
4964
return _shared_client
5065

5166

67+
async def reset_shared_client():
68+
"""关闭并清除共享客户端,下次请求时自动创建新连接池。"""
69+
global _shared_client, _shared_client_proxy_key
70+
if _shared_client is not None:
71+
await _shared_client.aclose()
72+
_shared_client = None
73+
_shared_client_proxy_key = None
74+
75+
5276
class RequestURL:
5377
# More complete User-Agent to avoid Cloudflare blocking
5478
DEFAULT_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
@@ -91,6 +115,9 @@ async def get_url(self, url, retry=3):
91115
try_time += 1
92116
if try_time >= retry:
93117
break
118+
# 连接错误时重建客户端以清除过期连接
119+
await reset_shared_client()
120+
self._client = await get_shared_client()
94121
await asyncio.sleep(5)
95122
except Exception as e:
96123
logger.warning(f"[Network] Unexpected error for {url}: {e}")
@@ -114,6 +141,9 @@ async def post_url(self, url: str, data: dict, retry=3):
114141
try_time += 1
115142
if try_time >= retry:
116143
break
144+
# 连接错误时重建客户端,清除过期连接
145+
await reset_shared_client()
146+
self._client = await get_shared_client()
117147
await asyncio.sleep(5)
118148
except Exception as e:
119149
logger.debug(e)

backend/src/module/rss/engine.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,15 @@ async def refresh_rss(self, client: DownloadClient, rss_id: Optional[int] = None
149149
else:
150150
rss_item = self.rss.search_id(rss_id)
151151
rss_items = [rss_item] if rss_item else []
152-
# From RSS Items, fetch all torrents concurrently
152+
# From RSS Items, fetch all torrents with concurrency limit
153153
logger.debug("[Engine] Get %s RSS items", len(rss_items))
154-
results = await asyncio.gather(
155-
*[self._pull_rss_with_status(rss_item) for rss_item in rss_items]
156-
)
154+
semaphore = asyncio.Semaphore(5)
155+
156+
async def _limited_pull(item):
157+
async with semaphore:
158+
return await self._pull_rss_with_status(item)
159+
160+
results = await asyncio.gather(*[_limited_pull(item) for item in rss_items])
157161
now = datetime.now(timezone.utc).isoformat()
158162
# Process results sequentially (DB operations)
159163
for rss_item, (new_torrents, error) in zip(rss_items, results):
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
"""Tests for network request_url: shared client configuration and reset."""
2+
3+
import pytest
4+
from unittest.mock import AsyncMock, MagicMock, patch
5+
6+
from module.network.request_url import get_shared_client, reset_shared_client
7+
8+
9+
@pytest.fixture(autouse=True)
10+
async def _clean_shared_client():
11+
"""Ensure shared client is reset after each test."""
12+
yield
13+
import module.network.request_url as mod
14+
15+
if mod._shared_client is not None:
16+
await mod._shared_client.aclose()
17+
mod._shared_client = None
18+
mod._shared_client_proxy_key = None
19+
20+
21+
class TestSharedClientLimits:
22+
async def test_client_has_keepalive_expiry(self):
23+
"""Shared client should use a finite keepalive_expiry."""
24+
client = await get_shared_client()
25+
pool = client._transport._pool
26+
assert pool._keepalive_expiry is not None
27+
assert pool._keepalive_expiry > 0
28+
29+
async def test_client_has_max_connections(self):
30+
"""Shared client should have a connection pool limit."""
31+
client = await get_shared_client()
32+
pool = client._transport._pool
33+
assert pool._max_connections is not None
34+
assert pool._max_connections > 0
35+
36+
37+
class TestResetSharedClient:
38+
async def test_reset_closes_existing_client(self):
39+
"""reset_shared_client should close and clear the shared client."""
40+
client = await get_shared_client()
41+
assert client is not None
42+
43+
await reset_shared_client()
44+
45+
import module.network.request_url as mod
46+
47+
assert mod._shared_client is None
48+
assert mod._shared_client_proxy_key is None
49+
50+
async def test_reset_idempotent_when_no_client(self):
51+
"""reset_shared_client should be safe when no client exists."""
52+
import module.network.request_url as mod
53+
54+
mod._shared_client = None
55+
mod._shared_client_proxy_key = None
56+
await reset_shared_client()
57+
58+
async def test_new_client_after_reset(self):
59+
"""After reset, get_shared_client returns a fresh client."""
60+
old_client = await get_shared_client()
61+
await reset_shared_client()
62+
new_client = await get_shared_client()
63+
assert new_client is not old_client
64+
65+
66+
class TestRetryWithReset:
67+
async def test_get_url_resets_on_connect_error(self):
68+
"""get_url should call reset_shared_client after ConnectTimeout."""
69+
import httpx
70+
from module.network.request_url import RequestURL
71+
72+
call_count = 0
73+
74+
async def mock_get(**kwargs):
75+
nonlocal call_count
76+
call_count += 1
77+
if call_count == 1:
78+
raise httpx.ConnectTimeout("Connection timed out")
79+
resp = MagicMock()
80+
resp.status_code = 200
81+
resp.raise_for_status = MagicMock()
82+
return resp
83+
84+
with (
85+
patch("module.network.request_url.get_shared_client") as mock_get_client,
86+
patch(
87+
"module.network.request_url.reset_shared_client",
88+
new_callable=AsyncMock,
89+
) as mock_reset,
90+
):
91+
mock_client = AsyncMock()
92+
mock_client.get = mock_get
93+
mock_get_client.return_value = mock_client
94+
95+
async with RequestURL() as req:
96+
result = await req.get_url("https://example.com/test", retry=2)
97+
98+
mock_reset.assert_called()
99+
assert call_count == 2

backend/src/test/test_rss_engine_new.py

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Tests for RSS engine: pull_rss, match_torrent, refresh_rss, add_rss."""
22

3+
import asyncio
4+
35
import pytest
46
from unittest.mock import AsyncMock, patch
57

@@ -51,7 +53,9 @@ async def test_returns_only_new_torrents(self, rss_engine):
5153
Torrent(name="new1", url="https://example.com/new1.torrent"),
5254
Torrent(name="new2", url="https://example.com/new2.torrent"),
5355
]
54-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
56+
with patch.object(
57+
RSSEngine, "_get_torrents", new_callable=AsyncMock
58+
) as mock_get:
5559
mock_get.return_value = all_torrents
5660
result = await rss_engine.pull_rss(rss_item)
5761

@@ -67,7 +71,9 @@ async def test_all_existing_returns_empty(self, rss_engine):
6771
existing = make_torrent(url="https://example.com/only.torrent", rss_id=1)
6872
rss_engine.torrent.add(existing)
6973

70-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
74+
with patch.object(
75+
RSSEngine, "_get_torrents", new_callable=AsyncMock
76+
) as mock_get:
7177
mock_get.return_value = [
7278
Torrent(name="only", url="https://example.com/only.torrent")
7379
]
@@ -81,7 +87,9 @@ async def test_empty_feed_returns_empty(self, rss_engine):
8187
rss_engine.rss.add(rss_item)
8288
rss_item = rss_engine.rss.search_id(1)
8389

84-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
90+
with patch.object(
91+
RSSEngine, "_get_torrents", new_callable=AsyncMock
92+
) as mock_get:
8593
mock_get.return_value = []
8694
result = await rss_engine.pull_rss(rss_item)
8795

@@ -99,9 +107,7 @@ def test_matches_by_title_raw_substring(self, rss_engine):
99107
bangumi = make_bangumi(title_raw="Mushoku Tensei", filter="")
100108
rss_engine.bangumi.add(bangumi)
101109

102-
torrent = make_torrent(
103-
name="[Lilith-Raws] Mushoku Tensei - 11 [1080p].mkv"
104-
)
110+
torrent = make_torrent(name="[Lilith-Raws] Mushoku Tensei - 11 [1080p].mkv")
105111
result = rss_engine.match_torrent(torrent)
106112

107113
assert result is not None
@@ -122,9 +128,7 @@ def test_filter_excludes_matching_torrent(self, rss_engine):
122128
bangumi = make_bangumi(title_raw="Mushoku Tensei", filter="720")
123129
rss_engine.bangumi.add(bangumi)
124130

125-
torrent = make_torrent(
126-
name="[Sub] Mushoku Tensei - 01 [720p].mkv"
127-
)
131+
torrent = make_torrent(name="[Sub] Mushoku Tensei - 01 [720p].mkv")
128132
result = rss_engine.match_torrent(torrent)
129133

130134
assert result is None
@@ -134,9 +138,7 @@ def test_empty_filter_allows_match(self, rss_engine):
134138
bangumi = make_bangumi(title_raw="Mushoku Tensei", filter="")
135139
rss_engine.bangumi.add(bangumi)
136140

137-
torrent = make_torrent(
138-
name="[Sub] Mushoku Tensei - 01 [720p].mkv"
139-
)
141+
torrent = make_torrent(name="[Sub] Mushoku Tensei - 01 [720p].mkv")
140142
result = rss_engine.match_torrent(torrent)
141143

142144
assert result is not None
@@ -147,9 +149,7 @@ def test_filter_case_insensitive(self, rss_engine):
147149
rss_engine.bangumi.add(bangumi)
148150

149151
# Torrent has "hevc" in lowercase - should still be filtered
150-
torrent = make_torrent(
151-
name="[Sub] Mushoku Tensei - 01 [1080p][hevc].mkv"
152-
)
152+
torrent = make_torrent(name="[Sub] Mushoku Tensei - 01 [1080p][hevc].mkv")
153153
result = rss_engine.match_torrent(torrent)
154154

155155
assert result is None
@@ -201,7 +201,9 @@ async def test_downloads_matched_torrents(self, rss_engine, mock_qb_client):
201201
name="[Sub] Mushoku Tensei - 12 [1080p].mkv",
202202
url="https://example.com/ep12.torrent",
203203
)
204-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
204+
with patch.object(
205+
RSSEngine, "_get_torrents", new_callable=AsyncMock
206+
) as mock_get:
205207
mock_get.return_value = [new_torrent]
206208

207209
# Create a mock client
@@ -227,7 +229,9 @@ async def test_unmatched_torrents_stored_not_downloaded(self, rss_engine):
227229
name="[Sub] Unknown Anime - 01 [1080p].mkv",
228230
url="https://example.com/unknown.torrent",
229231
)
230-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
232+
with patch.object(
233+
RSSEngine, "_get_torrents", new_callable=AsyncMock
234+
) as mock_get:
231235
mock_get.return_value = [unmatched]
232236
client = AsyncMock()
233237
await rss_engine.refresh_rss(client)
@@ -244,7 +248,9 @@ async def test_refresh_specific_rss_id(self, rss_engine):
244248
rss_engine.rss.add(rss1)
245249
rss_engine.rss.add(rss2)
246250

247-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
251+
with patch.object(
252+
RSSEngine, "_get_torrents", new_callable=AsyncMock
253+
) as mock_get:
248254
mock_get.return_value = []
249255
client = AsyncMock()
250256
await rss_engine.refresh_rss(client, rss_id=2)
@@ -254,7 +260,9 @@ async def test_refresh_specific_rss_id(self, rss_engine):
254260

255261
async def test_refresh_nonexistent_rss_id(self, rss_engine):
256262
"""refresh_rss with non-existent rss_id does nothing."""
257-
with patch.object(RSSEngine, "_get_torrents", new_callable=AsyncMock) as mock_get:
263+
with patch.object(
264+
RSSEngine, "_get_torrents", new_callable=AsyncMock
265+
) as mock_get:
258266
client = AsyncMock()
259267
await rss_engine.refresh_rss(client, rss_id=999)
260268

@@ -284,9 +292,7 @@ async def test_add_with_name(self, rss_engine):
284292

285293
async def test_add_without_name_fetches_title(self, rss_engine):
286294
"""add_rss without name calls get_rss_title to auto-discover title."""
287-
with patch(
288-
"module.rss.engine.RequestContent"
289-
) as MockReq:
295+
with patch("module.rss.engine.RequestContent") as MockReq:
290296
mock_instance = AsyncMock()
291297
mock_instance.get_rss_title = AsyncMock(return_value="Fetched Title")
292298
MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_instance)
@@ -303,9 +309,7 @@ async def test_add_without_name_fetches_title(self, rss_engine):
303309

304310
async def test_add_without_name_fetch_fails(self, rss_engine):
305311
"""add_rss returns error when title fetch fails."""
306-
with patch(
307-
"module.rss.engine.RequestContent"
308-
) as MockReq:
312+
with patch("module.rss.engine.RequestContent") as MockReq:
309313
mock_instance = AsyncMock()
310314
mock_instance.get_rss_title = AsyncMock(return_value=None)
311315
MockReq.return_value.__aenter__ = AsyncMock(return_value=mock_instance)
@@ -332,3 +336,36 @@ async def test_add_duplicate_url_fails(self, rss_engine):
332336

333337
assert result.status is False
334338
assert result.status_code == 406
339+
340+
341+
class TestRefreshRssConcurrency:
342+
async def test_concurrent_requests_limited(self, rss_engine):
343+
"""refresh_rss should limit concurrent requests via semaphore."""
344+
rss_items = [
345+
make_rss_item(name=f"Feed {i}", url=f"https://feed{i}.com/rss")
346+
for i in range(10)
347+
]
348+
for item in rss_items:
349+
rss_engine.rss.add(item)
350+
351+
active_count = 0
352+
max_active = 0
353+
lock = asyncio.Lock()
354+
355+
async def track_concurrency(rss_item):
356+
nonlocal active_count, max_active
357+
async with lock:
358+
active_count += 1
359+
max_active = max(max_active, active_count)
360+
await asyncio.sleep(0.01)
361+
async with lock:
362+
active_count -= 1
363+
return [], None
364+
365+
with patch.object(
366+
rss_engine, "_pull_rss_with_status", side_effect=track_concurrency
367+
):
368+
client = AsyncMock()
369+
await rss_engine.refresh_rss(client)
370+
371+
assert max_active <= 5

0 commit comments

Comments
 (0)