Skip to content

Commit 73ed1a6

Browse files
authored
test: [2.5] Add TTL tests to verify expiration in read (#42190)
related issue: #42182 pr: #42189 --------- Signed-off-by: yanliang567 <[email protected]>
1 parent d2ff390 commit 73ed1a6

File tree

2 files changed

+191
-16
lines changed

2 files changed

+191
-16
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import pytest
2+
import time
3+
from common.common_type import CaseLabel, CheckTasks
4+
from common import common_func as cf
5+
from common import common_type as ct
6+
from utils.util_log import test_log as log
7+
from utils.util_pymilvus import *
8+
from base.client_v2_base import TestMilvusClientV2Base
9+
from pymilvus import DataType, AnnSearchRequest, WeightedRanker
10+
11+
12+
class TestMilvusClientTTL(TestMilvusClientV2Base):
13+
""" Test case of Time To Live """
14+
15+
@pytest.mark.tags(CaseLabel.L0)
16+
@pytest.mark.parametrize("flush_enable", [True, False])
17+
@pytest.mark.parametrize("on_insert", [True, False])
18+
def test_milvus_client_ttl_default(self, flush_enable, on_insert):
19+
"""
20+
target: verify that data is invisible after ttl
21+
method: create collection with ttl, insert data, wait for ttl, search data
22+
expected: data is invisible
23+
"""
24+
client = self._client()
25+
dim = 65
26+
ttl = 10
27+
nb = 1000
28+
collection_name = cf.gen_collection_name_by_testcase_name()
29+
schema = self.create_schema(client, enable_dynamic_field=False)[0]
30+
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
31+
schema.add_field("embeddings", DataType.FLOAT_VECTOR, dim=dim)
32+
schema.add_field("embeddings_2", DataType.FLOAT_VECTOR, dim=dim)
33+
schema.add_field("visible", DataType.BOOL, nullable=True)
34+
self.create_collection(client, collection_name, schema=schema, properties={"collection.ttl.seconds": ttl})
35+
collection_info = self.describe_collection(client, collection_name)[0]
36+
assert collection_info['properties']["collection.ttl.seconds"] == str(ttl)
37+
38+
# create index
39+
index_params = self.prepare_index_params(client)[0]
40+
index_params.add_index(field_name="embeddings", index_type="IVF_FLAT", metric_type="COSINE", nlist=128)
41+
index_params.add_index(field_name="embeddings_2", index_type="IVF_FLAT", metric_type="COSINE", nlist=128)
42+
self.create_index(client, collection_name, index_params=index_params)
43+
44+
# load collection
45+
self.load_collection(client, collection_name)
46+
47+
# insert data
48+
insert_times = 2
49+
for i in range(insert_times):
50+
vectors = cf.gen_vectors(nb, dim=dim)
51+
vectors_2 = cf.gen_vectors(nb, dim=dim)
52+
rows = []
53+
start_id = i * nb
54+
for j in range(nb):
55+
row = {
56+
"id": start_id + j,
57+
"embeddings": list(vectors[j]),
58+
"embeddings_2": list(vectors_2[j]),
59+
"visible": False
60+
}
61+
rows.append(row)
62+
if on_insert is True:
63+
self.insert(client, collection_name, rows)
64+
else:
65+
self.upsert(client, collection_name, rows)
66+
67+
# search until timeout or get empty results
68+
start_time = time.time()
69+
timeout = ttl * 5
70+
nq = 1
71+
search_ttl_effective = False
72+
query_ttl_effective = False
73+
hybrid_search_ttl_effective = False
74+
search_vectors = cf.gen_vectors(nq, dim=dim)
75+
sub_search1 = AnnSearchRequest(search_vectors, "embeddings", {"level": 1}, 20)
76+
sub_search2 = AnnSearchRequest(search_vectors, "embeddings_2", {"level": 1}, 20)
77+
ranker = WeightedRanker(0.2, 0.8)
78+
# flush collection if flush_enable is True
79+
if flush_enable:
80+
t1 = time.time()
81+
self.flush(client, collection_name)
82+
log.info(f"flush completed in {time.time() - t1}s")
83+
while time.time() - start_time < timeout:
84+
if search_ttl_effective is False:
85+
res1 = self.search(client, collection_name, search_vectors, anns_field='embeddings',
86+
search_params={}, limit=10, consistency_level='Strong')[0]
87+
if query_ttl_effective is False:
88+
res2 = self.query(client, collection_name, filter='',
89+
output_fields=["count(*)"], consistency_level='Strong')[0]
90+
if hybrid_search_ttl_effective is False:
91+
res3 = self.hybrid_search(client, collection_name, [sub_search1, sub_search2], ranker,
92+
limit=10, consistency_level='Strong')[0]
93+
if len(res1[0]) == 0 and search_ttl_effective is False:
94+
log.info(f"search ttl effects in {round(time.time() - start_time, 4)}s")
95+
search_ttl_effective = True
96+
if res2[0].get('count(*)', None) == 0 and query_ttl_effective is False:
97+
log.info(f"query ttl effects in {round(time.time() - start_time, 4)}s")
98+
query_ttl_effective = True
99+
if len(res3[0]) == 0 and hybrid_search_ttl_effective is False:
100+
log.info(f"hybrid search ttl effects in {round(time.time() - start_time, 4)}s")
101+
hybrid_search_ttl_effective = True
102+
if search_ttl_effective is True and query_ttl_effective is True and hybrid_search_ttl_effective is True:
103+
break
104+
time.sleep(1)
105+
106+
delta_tt = round(time.time() - start_time, 4)
107+
log.info(f"ttl effects in {delta_tt}s")
108+
assert ttl-2 <= delta_tt <= ttl+5
109+
110+
# query count(*)
111+
res = self.query(client, collection_name, filter='', output_fields=["count(*)"])[0]
112+
assert res[0].get('count(*)', None) == 0
113+
114+
# insert more data
115+
for i in range(insert_times):
116+
vectors = cf.gen_vectors(nb, dim=dim)
117+
vectors_2 = cf.gen_vectors(nb, dim=dim)
118+
rows = []
119+
start_id = (insert_times+i) * nb
120+
for j in range(nb):
121+
row = {
122+
"id": start_id + j,
123+
"embeddings": list(vectors[j]),
124+
"embeddings_2": list(vectors_2[j]),
125+
"visible": True
126+
}
127+
rows.append(row)
128+
if on_insert is True:
129+
self.insert(client, collection_name, rows)
130+
else:
131+
self.upsert(client, collection_name, rows)
132+
133+
# flush collection if flush_enable is True
134+
if flush_enable:
135+
t1 = time.time()
136+
self.flush(client, collection_name)
137+
log.info(f"flush completed in {time.time()-t1}s")
138+
139+
# search data again after insert more data
140+
res = self.search(client, collection_name, search_vectors,
141+
search_params={}, anns_field='embeddings',
142+
limit=10, consistency_level='Strong')[0]
143+
assert len(res[0]) > 0
144+
# query count(*)
145+
res = self.query(client, collection_name, filter='visible==False',
146+
output_fields=["count(*)"], consistency_level='Strong')[0]
147+
assert res[0].get('count(*)', None) == 0
148+
149+
# hybrid search
150+
res = self.hybrid_search(client, collection_name, [sub_search1, sub_search2], ranker,
151+
limit=10, consistency_level='Strong')[0]
152+
assert len(res[0]) > 0
153+
154+
# query count(visible)
155+
res = self.query(client, collection_name, filter='visible==True',
156+
output_fields=["count(*)"], consistency_level='Strong')[0]
157+
assert res[0].get('count(*)', None) > 0
158+
159+
# alter ttl to 1000s
160+
self.alter_collection_properties(client, collection_name, properties={"collection.ttl.seconds": 1000})
161+
# search data after alter ttl
162+
res = self.search(client, collection_name, search_vectors,
163+
search_params={}, anns_field='embeddings',
164+
filter='visible==False', limit=10, consistency_level='Strong')[0]
165+
assert len(res[0]) > 0
166+
167+
# hybrid search data after alter ttl
168+
sub_search1 = AnnSearchRequest(search_vectors, "embeddings", {"level": 1}, 20, expr='visible==False')
169+
sub_search2 = AnnSearchRequest(search_vectors, "embeddings_2", {"level": 1}, 20, expr='visible==False')
170+
res = self.hybrid_search(client, collection_name, [sub_search1, sub_search2], ranker,
171+
limit=10, consistency_level='Strong')[0]
172+
assert len(res[0]) > 0
173+
174+
# query count(*)
175+
res = self.query(client, collection_name, filter='visible==False',
176+
output_fields=["count(*)"], consistency_level='Strong')[0]
177+
assert res[0].get('count(*)', None) == insert_times * nb
178+
res = self.query(client, collection_name, filter='',
179+
output_fields=["count(*)"], consistency_level='Strong')[0]
180+
assert res[0].get('count(*)', None) == insert_times * nb * 2

tests/python_client/utils/api_request.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55
from check.func_check import ResponseChecker, Error
66
from utils.util_log import test_log as log
77

8-
# enable_traceback = os.getenv('ENABLE_TRACEBACK', "True")
9-
# log.info(f"enable_traceback:{enable_traceback}")
10-
11-
128
log_row_length = 300
139

1410

@@ -20,7 +16,6 @@ def inner_wrapper(*args, **kwargs):
2016
if "enable_traceback" in _kwargs:
2117
del _kwargs["enable_traceback"]
2218
res = func(*args, **_kwargs)
23-
# if enable_traceback == "True":
2419
if kwargs.get("enable_traceback", True):
2520
res_str = str(res)
2621
log_res = res_str[0:log_row_length] + '......' if len(res_str) > log_row_length else res_str
@@ -30,7 +25,6 @@ def inner_wrapper(*args, **kwargs):
3025
except Exception as e:
3126
e_str = str(e)
3227
log_e = e_str[0:log_row_length] + '......' if len(e_str) > log_row_length else e_str
33-
# if enable_traceback == "True":
3428
if kwargs.get("enable_traceback", True):
3529
log.error(traceback.format_exc())
3630
log.error("(api_response) : %s" % log_e)
@@ -44,29 +38,30 @@ def api_request(_list, **kwargs):
4438
if isinstance(_list, list):
4539
func = _list[0]
4640
if callable(func):
47-
arg = _list[1:]
48-
arg_str = str(arg)
49-
log_arg = arg_str[0:log_row_length] + '......' if len(arg_str) > log_row_length else arg_str
50-
# if enable_traceback == "True":
5141
if kwargs.get("enable_traceback", True):
52-
log.debug("(api_request) : [%s] args: %s, kwargs: %s" % (func.__qualname__, log_arg, str(kwargs)))
42+
arg = _list[1:]
43+
arg_str = str(arg)
44+
log_arg = arg_str[0:log_row_length] + '...' if len(arg_str) > log_row_length else arg_str
45+
log_kwargs = str(kwargs)[0:log_row_length] + '...' if len(str(kwargs)) > log_row_length else str(kwargs)
46+
log.debug("(api_request) : [%s] args: %s, kwargs: %s" % (func.__qualname__, log_arg, log_kwargs))
5347
return func(*arg, **kwargs)
5448
return False, False
5549

5650

5751
def logger_interceptor():
5852
def wrapper(func):
5953
def log_request(*arg, **kwargs):
60-
arg = arg[1:]
61-
arg_str = str(arg)
62-
log_arg = arg_str[0:log_row_length] + '......' if len(arg_str) > log_row_length else arg_str
6354
if kwargs.get("enable_traceback", True):
64-
log.debug("(api_request) : [%s] args: %s, kwargs: %s" % (func.__name__, log_arg, str(kwargs)))
55+
arg = arg[1:]
56+
arg_str = str(arg)
57+
log_arg = arg_str[0:log_row_length] + '...' if len(arg_str) > log_row_length else arg_str
58+
log_kwargs = str(kwargs)[0:log_row_length] + '...' if len(str(kwargs)) > log_row_length else str(kwargs)
59+
log.debug("(api_request) : [%s] args: %s, kwargs: %s" % (func.__name__, log_arg, log_kwargs))
6560

6661
def log_response(res, **kwargs):
6762
if kwargs.get("enable_traceback", True):
6863
res_str = str(res)
69-
log_res = res_str[0:log_row_length] + '......' if len(res_str) > log_row_length else res_str
64+
log_res = res_str[0:log_row_length] + '...' if len(res_str) > log_row_length else res_str
7065
log.debug("(api_response) : [%s] %s " % (func.__name__, log_res))
7166
return res, True
7267

0 commit comments

Comments
 (0)