-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Open
Labels
kind/bugIssues or changes related a bugIssues or changes related a bugtriage/acceptedIndicates an issue or PR is ready to be actively worked on.Indicates an issue or PR is ready to be actively worked on.
Milestone
Description
Is there an existing issue for this?
- I have searched the existing issues
Environment
- Milvus version: 2.6.0
- Attu version: 2.5.12
- Deployment mode(standalone or cluster): both
- MQ type(rocksmq, pulsar or kafka): both
- SDK version(e.g. pymilvus v2.0.0rc2): 2.6.0Current Behavior
When I adjust the rate limit configuration such as 'collection.insertRate.max.mb' or 'collection.queryRate.max.qps' through the Attu webUI, the actual rate limit does not work
Expected Behavior
When I adjust the rate limit configuration such as 'collection.insertRate.max.mb' or 'collection.queryRate.max.qps' through the Attu webUI, my write/query script implemented through Python SDK should receive an error message, such as: message=quota exceeded
Steps To Reproduce
1. Build milvus2.6.0 standalone service using docker-compose.yaml(The file is pasted at the back)
2. Visit Attu on http://localhost:8000/ and create 'test_db' and 'test_collection'
3. Execute write/query script(The file is pasted at the back):
`python3 insert_and_query.py --uri http://localhost:19530 --token root:Milvus --db test_db --collection test_collection --duration 3600`
4. set test_collection properties 'collection.queryRate.max.qps=0'
5. write/query script did not receive errors: <MilvusException: (code=9, message=quota exceeded[reason=rate type: DQLQuery])>Milvus Log
No response
Anything else?
docker-compose.yaml
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.18
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
healthcheck:
test: ["CMD", "etcdctl", "endpoint", "health"]
interval: 30s
timeout: 20s
retries: 3
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2024-05-28T17-19-04Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.6.0
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
MINIO_REGION: us-east-1
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
interval: 30s
start_period: 90s
timeout: 20s
retries: 3
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "etcd"
- "minio"
attu:
container_name: milvus-attu
image: zilliz/attu:v2.5.12
environment:
MILVUS_URL: standalone:19530
ports:
- "8000:3000"
depends_on:
- "standalone"
networks:
default:
name: milvus
insert_and_query.py
#!/usr/bin/env python3
import time
import threading
import argparse
import random
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from pymilvus.exceptions import MilvusException
# 全局控制变量
running = True
write_lock = threading.Lock()
query_lock = threading.Lock()
counter_lock = threading.Lock()
write_fail_count = 0
query_fail_count = 0
def write_data_thread(collection, collection_name, interval=1.0, batch_size=10):
"""
写入线程:每隔 interval 秒写入 batch_size 条数据
"""
global running, write_fail_count
thread_id = threading.current_thread().ident
insert_count = 0
try:
while running:
start_time = time.time()
# 获取 collection 的 schema 以生成正确的数据
schema = collection.schema
fields = schema.fields
# 生成测试数据(使用字典列表格式)
entities = []
for i in range(batch_size):
entity = {}
for field in fields:
field_name = field.name
field_type = field.dtype
# 跳过自动生成的主键
if field.is_primary and field.auto_id:
continue
# 根据字段类型生成数据
if field_type == DataType.INT64:
entity[field_name] = random.randint(0, 1000000)
elif field_type == DataType.INT32:
entity[field_name] = random.randint(0, 100000)
elif field_type == DataType.VARCHAR:
max_length = field.params.get('max_length', 65535)
entity[field_name] = f"str_{random.randint(0, 1000000)}_{i}"[:max_length]
elif field_type == DataType.FLOAT_VECTOR:
dim = field.params.get('dim', 128) # 默认维度128
entity[field_name] = [random.random() for _ in range(dim)]
elif field_type == DataType.FLOAT:
entity[field_name] = random.random() * 100
elif field_type == DataType.DOUBLE:
entity[field_name] = random.random() * 1000
elif field_type == DataType.BOOL:
entity[field_name] = random.choice([True, False])
else:
# 其他类型使用默认值
entity[field_name] = 0
entities.append(entity)
# pymilvus insert 可以直接接受字典列表
data = entities
# 执行插入
try:
with write_lock:
result = collection.insert(data)
insert_count += batch_size
# 成功但超过阈值也按失败统计
elapsed = time.time() - start_time
if elapsed > 1.0:
with counter_lock:
write_fail_count += 1
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{ts}] [写入线程 {thread_id}] ✗ 写入超时,用时 {elapsed:.3f}s(阈值 1.000s)")
except Exception as e:
with counter_lock:
write_fail_count += 1
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
elapsed = time.time() - start_time
print(f"[{ts}] [写入线程 {thread_id}] ✗ 写入失败,用时 {elapsed:.3f}s,错误: {e}")
# 等待到下一个周期
sleep_time = interval - (time.time() - start_time)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{ts}] [写入线程 {thread_id}] ✗ 线程异常: {e}")
import traceback
traceback.print_exc()
# 结束时不打印成功统计,仅在最终汇总
def query_count_thread(collection, collection_name, interval=1.0):
"""
查询线程:每隔 interval 秒查询一次 Collection 的总数据量
"""
global running, query_fail_count
thread_id = threading.current_thread().ident
query_count = 0
try:
while running:
start_time = time.time()
try:
with query_lock:
# 使用 select count(*) 查询总数据量
# expr="" 表示查询所有数据,output_fields=["count(*)"] 表示只返回计数
result = collection.query(
expr="", # 空表达式表示查询所有数据
output_fields=["count(*)"]
)
# 成功但超过阈值也按失败统计
elapsed = time.time() - start_time
if elapsed > 1.0:
with counter_lock:
query_fail_count += 1
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{ts}] [查询线程 {thread_id}] ✗ 查询超时,用时 {elapsed:.3f}s(阈值 1.000s)")
# 成功不打印其他信息
query_count += 1
except Exception as e:
with counter_lock:
query_fail_count += 1
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
elapsed = time.time() - start_time
print(f"[{ts}] [查询线程 {thread_id}] ✗ 查询失败,用时 {elapsed:.3f}s,错误: {e}")
# 等待到下一个周期
sleep_time = interval - (time.time() - start_time)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[{ts}] [查询线程 {thread_id}] ✗ 线程异常: {e}")
import traceback
traceback.print_exc()
# 结束时不打印成功统计,仅在最终汇总
def process(uri="", token="", db_name="", collection_name="", duration=60):
"""
主处理函数
duration: 运行时长(秒),0 表示无限运行
"""
global running, write_fail_count, query_fail_count
try:
# 连接
connections.connect(
uri=uri,
token=token,
db_name=db_name
)
print("✓ 已连接到 Milvus\n")
# 获取 collection
try:
collection = Collection(collection_name)
print(f"✓ 获取到 collection: {collection_name}\n")
except Exception as e:
print(f"✗ Collection {collection_name} 不存在")
print(f" 错误: {e}")
print("请先创建 collection 或修改 collection_name 参数")
return
# 检查是否已加载
try:
collection.load()
print("✓ Collection 已加载\n")
except Exception as e:
print(f"✗ Collection 未加载或加载失败: {e}")
return
# 打印 collection schema 信息
# 可选:不打印 schema,保持安静模式
#----------------- 主逻辑 --------------------------------------------
# 创建写入线程
write_thread = threading.Thread(
target=write_data_thread,
args=(collection, collection_name, 0.1, 1), # 每0.1s写入1条
daemon=True
)
# 创建查询线程
query_thread = threading.Thread(
target=query_count_thread,
args=(collection, collection_name, 0.1), # 每0.1s查询一次
daemon=True
)
# 启动线程
write_thread.start()
time.sleep(0.1) # 稍微延迟,让写入线程先启动
query_thread.start()
# 等待指定时长或直到用户中断
try:
if duration > 0:
print(f"程序将运行 {duration} 秒,按 Ctrl+C 可提前退出\n")
time.sleep(duration)
else:
print("程序将持续运行,按 Ctrl+C 退出\n")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\n收到中断信号,正在停止...\n")
# 停止线程
running = False
# 等待线程结束(最多等待5秒)
write_thread.join(timeout=5)
query_thread.join(timeout=5)
# 汇总失败次数
print("=" * 60)
print("程序执行完成(失败统计)")
print(f" 写入失败次数: {write_fail_count}")
print(f" 查询失败次数: {query_fail_count}")
print("=" * 60)
#--------------------------------------------------------------------
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
finally:
running = False
connections.disconnect('default')
print("\n✓ 已断开连接")
if __name__ == "__main__":
# 使用示例:
# python3 insert_and_query.py --uri http://localhost:19530 --token root:Milvus --db test_db --collection test_collection --duration 3600
parser = argparse.ArgumentParser(description='测试Milvus Collection 写入和查询操作')
parser.add_argument('--uri', required=True, help='Milvus服务器地址,例如: http://10.77.94.43:19530')
parser.add_argument('--token', required=True, help='认证token,例如: root:Milvus')
parser.add_argument('--db', required=True, help='数据库名称,例如: gx_test')
parser.add_argument('--collection', required=True, help='要测试的collection名称,例如: test')
parser.add_argument('--duration', type=int, default=60, help='运行时长(秒),0表示无限运行,默认60秒')
args = parser.parse_args()
process(
uri=args.uri,
token=args.token,
db_name=args.db,
collection_name=args.collection,
duration=args.duration
)
coderabbitai
Metadata
Metadata
Assignees
Labels
kind/bugIssues or changes related a bugIssues or changes related a bugtriage/acceptedIndicates an issue or PR is ready to be actively worked on.Indicates an issue or PR is ready to be actively worked on.