Skip to content

Commit 01093e1

Browse files
committed
doubao-fix
1 parent b14f4e5 commit 01093e1

File tree

4 files changed

+521
-4
lines changed

4 files changed

+521
-4
lines changed

RATE_LIMITER_USAGE.md

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
2+
# API 限流中间件使用说明
3+
4+
## 概述
5+
6+
这是一个可插拔的 API 请求限流中间件,用于保护股票行情数据采集系统免受 IP 封禁、API 配额耗尽或服务商限制。
7+
8+
## 核心特性
9+
10+
- 基于域名的独立频率控制
11+
- 默认 30次/分钟 的限流策略
12+
- 支持自定义特定域名的限流阈值
13+
- 运行时动态调整参数,无需重启
14+
- 超限请求给出明确的等待提示
15+
- 线程安全
16+
- 最小侵入性,对现有业务代码零修改
17+
18+
## 文件位置
19+
20+
- 核心限流模块: `adata/common/utils/rate_limiter.py`
21+
- 集成限流的请求封装: `adata/common/utils/sunrequests.py`
22+
23+
## 快速开始
24+
25+
### 1. 启用限流(一行代码)
26+
27+
```python
28+
from adata.common.utils.rate_limiter import enable_rate_limit
29+
30+
# 启用全局限流
31+
enable_rate_limit(True)
32+
```
33+
34+
### 2. 配置限流参数
35+
36+
```python
37+
from adata.common.utils.rate_limiter import set_domain_limit, set_default_limit
38+
39+
# 设置默认限流(例如:改为 50次/分钟)
40+
set_default_limit(50, 60)
41+
42+
# 设置特定域名的限流(例如:新浪财经 20次/分钟)
43+
set_domain_limit("finance.sina.com.cn", 20, 60)
44+
45+
# 设置东方财富 40次/分钟
46+
set_domain_limit("push2.eastmoney.com", 40, 60)
47+
```
48+
49+
### 3. 完整使用示例
50+
51+
```python
52+
# 导入模块
53+
from adata.common.utils.rate_limiter import enable_rate_limit, set_domain_limit
54+
from adata.common.utils.sunrequests import sun_requests
55+
56+
# 步骤1: 启用限流
57+
enable_rate_limit(True)
58+
59+
# 步骤2: 配置特定域名的限流(可选)
60+
set_domain_limit("api.example.com", 25, 60)
61+
62+
# 步骤3: 正常使用请求,自动限流
63+
url = "https://api.example.com/stock/data"
64+
response = sun_requests.request(url=url)
65+
print(response.text)
66+
```
67+
68+
## API 参考
69+
70+
### `enable_rate_limit(enable=True)`
71+
启用或禁用全局限流功能。
72+
73+
**参数:**
74+
- `enable` (bool): True 启用,False 禁用,默认 True
75+
76+
**返回:**
77+
- SunRequests 实例
78+
79+
---
80+
81+
### `set_domain_limit(domain, limit, window=60)`
82+
设置特定域名的限流阈值。
83+
84+
**参数:**
85+
- `domain` (str): 域名(例如 "api.example.com")
86+
- `limit` (int): 允许的请求次数
87+
- `window` (int): 时间窗口(秒),默认 60 秒
88+
89+
---
90+
91+
### `set_default_limit(limit, window=60)`
92+
设置全局默认限流阈值。
93+
94+
**参数:**
95+
- `limit` (int): 默认允许的请求次数
96+
- `window` (int): 时间窗口(秒),默认 60 秒
97+
98+
---
99+
100+
### `get_rate_limiter()`
101+
获取限流器单例实例,用于高级操作。
102+
103+
**返回:**
104+
- RateLimiter 实例
105+
106+
## 工作原理
107+
108+
### 限流算法
109+
采用**滑动窗口算法**,精确控制请求频率:
110+
111+
1. 记录每个请求的时间戳
112+
2. 清理时间窗口外的过期请求
113+
3. 检查当前窗口内的请求数是否超限
114+
4. 如超限,计算需要等待的时间
115+
116+
### 域名隔离
117+
每个域名拥有独立的限流配置和请求记录,互不干扰。
118+
119+
### 线程安全
120+
使用 `threading.RLock` 保证多线程环境下的安全性。
121+
122+
## 限流提示
123+
124+
当请求超限时时,会在控制台输出提示信息:
125+
126+
```
127+
[RateLimit] 域名 api.example.com 已达到 30 次/60 秒限制,等待 15.32 秒...
128+
```
129+
130+
## 性能测试
131+
132+
### 测试场景1: 20次请求(不触发限流)
133+
134+
期望结果:20次请求快速完成,无等待
135+
136+
```python
137+
from adata.common.utils.rate_limiter import get_rate_limiter
138+
139+
limiter = get_rate_limiter()
140+
limiter.set_default_limit(30, 60)
141+
url = "https://test.example.com/api"
142+
143+
for i in range(20):
144+
wait = limiter.acquire(url)
145+
assert wait == 0 # 无需等待
146+
```
147+
148+
### 测试场景2: 40次请求(触发限流)
149+
150+
期望结果:前30次快速完成,第31次开始触发等待
151+
152+
```python
153+
from adata.common.utils.rate_limiter import get_rate_limiter
154+
155+
limiter = get_rate_limiter()
156+
limiter.set_default_limit(30, 60)
157+
url = "https://test.example.com/api"
158+
159+
wait_triggered = False
160+
for i in range(40):
161+
wait = limiter.acquire(url)
162+
if wait > 0:
163+
wait_triggered = True
164+
165+
assert wait_triggered == True # 应该触发等待
166+
```
167+
168+
## 兼容性说明
169+
170+
- 默认不启用限流,完全向后兼容
171+
- 限流器模块导入失败时不影响现有功能
172+
- 支持 Python 3.6+
173+
174+
## 禁用限流
175+
176+
如需临时禁用限流:
177+
178+
```python
179+
from adata.common.utils.rate_limiter import enable_rate_limit
180+
181+
# 禁用限流
182+
enable_rate_limit(False)
183+
```
184+
185+
## 注意事项
186+
187+
1. 限流是基于内存的,进程重启后计数会重置
188+
2. 多进程场景下,每个进程有独立的限流计数
189+
3. 建议根据实际 API 服务商的限制合理配置阈值
190+
4. 限流不会中止请求,只是延迟执行
191+

adata/common/utils/rate_limiter.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
2+
# -*- coding: utf-8 -*-
3+
"""
4+
@desc: API请求限流中间件
5+
@author: adata
6+
@time: 2025/03/17
7+
"""
8+
import threading
9+
import time
10+
from collections import deque
11+
from urllib.parse import urlparse
12+
13+
14+
class RateLimiter:
15+
"""
16+
基于域名的独立频率控制器
17+
滑动窗口算法实现,线程安全
18+
"""
19+
20+
def __init__(self):
21+
self._lock = threading.RLock()
22+
self._domain_limits = {}
23+
self._domain_windows = {}
24+
self._default_limit = 30
25+
self._default_window = 60
26+
27+
def set_domain_limit(self, domain, limit, window=60):
28+
"""
29+
设置特定域名的限流阈值
30+
:param domain: 域名(不带http/https)
31+
:param limit: 允许的请求次数
32+
:param window: 时间窗口(秒),默认60秒
33+
"""
34+
with self._lock:
35+
self._domain_limits[domain] = (limit, window)
36+
if domain not in self._domain_windows:
37+
self._domain_windows[domain] = deque()
38+
39+
def get_domain_limit(self, domain):
40+
"""
41+
获取特定域名的限流阈值
42+
:param domain: 域名
43+
:return: (limit, window)
44+
"""
45+
with self._lock:
46+
return self._domain_limits.get(domain, (self._default_limit, self._default_window))
47+
48+
def set_default_limit(self, limit, window=60):
49+
"""
50+
设置默认限流阈值
51+
:param limit: 默认允许的请求次数
52+
:param window: 时间窗口(秒)
53+
"""
54+
with self._lock:
55+
self._default_limit = limit
56+
self._default_window = window
57+
58+
def _clean_old_requests(self, domain, window, now):
59+
"""
60+
清理过期的请求记录
61+
"""
62+
window_start = now - window
63+
while len(self._domain_windows[domain]) > 0:
64+
if self._domain_windows[domain][0] <= window_start:
65+
self._domain_windows[domain].popleft()
66+
else:
67+
break
68+
69+
def acquire(self, url):
70+
"""
71+
请求限流,根据URL获取域名进行限制
72+
:param url: 请求的URL
73+
:return: 等待时间(秒),0表示不需要等待
74+
"""
75+
domain = self._extract_domain(url)
76+
77+
with self._lock:
78+
if domain not in self._domain_windows:
79+
self._domain_windows[domain] = deque()
80+
81+
limit, window = self.get_domain_limit(domain)
82+
now = time.time()
83+
84+
self._clean_old_requests(domain, window, now)
85+
86+
if len(self._domain_windows[domain]) < limit:
87+
self._domain_windows[domain].append(now)
88+
return 0
89+
90+
oldest_time = self._domain_windows[domain][0]
91+
wait_time = window - (now - oldest_time)
92+
if wait_time > 0:
93+
return wait_time
94+
else:
95+
self._domain_windows[domain].popleft()
96+
self._domain_windows[domain].append(now)
97+
return 0
98+
99+
def _extract_domain(self, url):
100+
"""
101+
从URL中提取域名
102+
"""
103+
parsed = urlparse(url)
104+
return parsed.netloc
105+
106+
107+
_rate_limiter_instance = None
108+
_instance_lock = threading.Lock()
109+
110+
111+
def get_rate_limiter():
112+
"""
113+
获取单例限流器
114+
:return: RateLimiter 实例
115+
"""
116+
global _rate_limiter_instance
117+
if _rate_limiter_instance is None:
118+
with _instance_lock:
119+
if _rate_limiter_instance is None:
120+
_rate_limiter_instance = RateLimiter()
121+
return _rate_limiter_instance
122+
123+
124+
def set_domain_limit(domain, limit, window=60):
125+
"""
126+
便捷函数:设置特定域名的限流阈值
127+
"""
128+
get_rate_limiter().set_domain_limit(domain, limit, window)
129+
130+
131+
def set_default_limit(limit, window=60):
132+
"""
133+
便捷函数:设置默认限流阈值
134+
"""
135+
get_rate_limiter().set_default_limit(limit, window)
136+
137+
138+
def enable_rate_limit(enable=True):
139+
"""
140+
启用或禁用全局限流功能
141+
:param enable: True启用,False禁用,默认True
142+
"""
143+
from adata.common.utils.sunrequests import get_sun_requests
144+
sun_req = get_sun_requests(enable_rate_limit=enable)
145+
return sun_req
146+

0 commit comments

Comments
 (0)