Skip to content

Commit b25e1ab

Browse files
committed
enhance: support TTL expiration with queries returning no results (milvus-io#41960)
support TTL expiration with queries returning no results issue:milvus-io#41959 pr:milvus-io#41720 --------- Signed-off-by: Xianhui.Lin <[email protected]>
1 parent f84650e commit b25e1ab

35 files changed

+676
-518
lines changed

internal/core/src/exec/QueryContext.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class QueryContext : public Context {
176176
const milvus::segcore::SegmentInternalInterface* segment,
177177
int64_t active_count,
178178
milvus::Timestamp timestamp,
179+
milvus::Timestamp collection_ttl = 0,
179180
int32_t consistency_level = 0,
180181
std::shared_ptr<QueryConfig> query_config =
181182
std::make_shared<QueryConfig>(),
@@ -187,6 +188,7 @@ class QueryContext : public Context {
187188
segment_(segment),
188189
active_count_(active_count),
189190
query_timestamp_(timestamp),
191+
collection_ttl_(collection_ttl),
190192
query_config_(query_config),
191193
executor_(executor),
192194
consistency_level_(consistency_level) {
@@ -222,6 +224,11 @@ class QueryContext : public Context {
222224
return query_timestamp_;
223225
}
224226

227+
milvus::Timestamp
228+
get_collection_ttl() {
229+
return collection_ttl_;
230+
}
231+
225232
int64_t
226233
get_active_count() {
227234
return active_count_;
@@ -290,7 +297,7 @@ class QueryContext : public Context {
290297
int64_t active_count_;
291298
// timestamp this query generate
292299
milvus::Timestamp query_timestamp_;
293-
300+
milvus::Timestamp collection_ttl_;
294301
// used for vector search
295302
milvus::SearchInfo search_info_;
296303
const query::PlaceholderGroup* placeholder_group_;

internal/core/src/exec/operator/MvccNode.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ PhyMvccNode::PhyMvccNode(int32_t operator_id,
3333
query_timestamp_ = query_context->get_query_timestamp();
3434
active_count_ = query_context->get_active_count();
3535
is_source_node_ = mvcc_node->sources().size() == 0;
36+
collection_ttl_ = query_context->get_collection_ttl();
3637
}
3738

3839
void
@@ -63,7 +64,7 @@ PhyMvccNode::GetOutput() {
6364

6465
TargetBitmapView data(col_input->GetRawData(), col_input->size());
6566
// need to expose null?
66-
segment_->mask_with_timestamps(data, query_timestamp_);
67+
segment_->mask_with_timestamps(data, query_timestamp_, collection_ttl_);
6768
segment_->mask_with_delete(data, active_count_, query_timestamp_);
6869
is_finished_ = true;
6970

internal/core/src/exec/operator/MvccNode.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class PhyMvccNode : public Operator {
7272
int64_t active_count_;
7373
bool is_finished_{false};
7474
bool is_source_node_{false};
75+
milvus::Timestamp collection_ttl_;
7576
};
7677

7778
} // namespace exec

internal/core/src/query/ExecPlanNodeVisitor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
3434
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
3535
Timestamp timestamp,
3636
const PlaceholderGroup& placeholder_group,
37-
int32_t consystency_level)
37+
int32_t consystency_level,
38+
Timestamp collection_ttl)
3839
: segment_(segment),
3940
timestamp_(timestamp),
4041
placeholder_group_(placeholder_group),
41-
consystency_level_(consystency_level) {
42+
consystency_level_(consystency_level),
43+
collection_ttl_(collection_ttl_) {
4244
}
4345

4446
SearchResult
@@ -59,6 +61,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
5961
private:
6062
const segcore::SegmentInterface& segment_;
6163
Timestamp timestamp_;
64+
Timestamp collection_ttl_;
6265
const PlaceholderGroup& placeholder_group_;
6366

6467
SearchResultOpt search_result_opt_;
@@ -134,6 +137,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
134137
segment,
135138
active_count,
136139
timestamp_,
140+
collection_ttl_,
137141
consystency_level_);
138142
query_context->set_search_info(node.search_info_);
139143
query_context->set_placeholder_group(placeholder_group_);
@@ -189,6 +193,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
189193
segment,
190194
active_count,
191195
timestamp_,
196+
collection_ttl_,
192197
consystency_level_);
193198

194199
// Do task execution

internal/core/src/query/ExecPlanNodeVisitor.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,22 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
4747
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
4848
Timestamp timestamp,
4949
const PlaceholderGroup* placeholder_group,
50-
int32_t consystency_level = 0)
50+
int32_t consystency_level = 0,
51+
Timestamp collection_ttl = 0)
5152
: segment_(segment),
5253
timestamp_(timestamp),
54+
collection_ttl_(collection_ttl),
5355
placeholder_group_(placeholder_group),
5456
consystency_level_(consystency_level) {
5557
}
5658

5759
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
5860
Timestamp timestamp,
59-
int32_t consystency_level = 0)
61+
int32_t consystency_level = 0,
62+
Timestamp collection_ttl = 0)
6063
: segment_(segment),
6164
timestamp_(timestamp),
65+
collection_ttl_(collection_ttl),
6266
consystency_level_(consystency_level) {
6367
placeholder_group_ = nullptr;
6468
}
@@ -108,6 +112,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
108112
private:
109113
const segcore::SegmentInterface& segment_;
110114
Timestamp timestamp_;
115+
Timestamp collection_ttl_;
111116
const PlaceholderGroup* placeholder_group_;
112117

113118
SearchResultOpt search_result_opt_;

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1610,13 +1610,19 @@ ChunkedSegmentSealedImpl::get_active_count(Timestamp ts) const {
16101610

16111611
void
16121612
ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
1613-
Timestamp timestamp) const {
1613+
Timestamp timestamp,
1614+
Timestamp collection_ttl) const {
16141615
// TODO change the
16151616
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
16161617
"num chunk not equal to 1 for sealed segment");
16171618
auto timestamps_data =
16181619
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
16191620
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
1621+
if (collection_ttl > 0) {
1622+
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
1623+
collection_ttl, timestamps_data, timestamps_data_size);
1624+
bitset_chunk |= ttl_mask;
1625+
}
16201626

16211627
AssertInfo(timestamps_data_size == get_row_count(),
16221628
fmt::format("Timestamp size not equal to row count: {}, {}",

internal/core/src/segcore/ChunkedSegmentSealedImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
343343

344344
void
345345
mask_with_timestamps(BitsetTypeView& bitset_chunk,
346-
Timestamp timestamp) const override;
346+
Timestamp timestamp,
347+
Timestamp collection_ttl) const override;
347348

348349
void
349350
vector_search(SearchInfo& search_info,

internal/core/src/segcore/SegmentGrowingImpl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,8 @@ SegmentGrowingImpl::get_active_count(Timestamp ts) const {
10741074

10751075
void
10761076
SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
1077-
Timestamp timestamp) const {
1077+
Timestamp timestamp,
1078+
Timestamp collection_ttl) const {
10781079
// DO NOTHING
10791080
}
10801081

internal/core/src/segcore/SegmentGrowingImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
307307

308308
void
309309
mask_with_timestamps(BitsetTypeView& bitset_chunk,
310-
Timestamp timestamp) const override;
310+
Timestamp timestamp,
311+
Timestamp ttl = 0) const override;
311312

312313
void
313314
vector_search(SearchInfo& search_info,

internal/core/src/segcore/SegmentInterface.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,13 @@ SegmentInternalInterface::Search(
8484
const query::Plan* plan,
8585
const query::PlaceholderGroup* placeholder_group,
8686
Timestamp timestamp,
87-
int32_t consistency_level) const {
87+
int32_t consistency_level,
88+
Timestamp collection_ttl) const {
8889
std::shared_lock lck(mutex_);
8990
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
9091
check_search(plan);
9192
query::ExecPlanNodeVisitor visitor(
92-
*this, timestamp, placeholder_group, consistency_level);
93+
*this, timestamp, placeholder_group, consistency_level, collection_ttl);
9394
auto results = std::make_unique<SearchResult>();
9495
*results = visitor.get_moved_result(*plan->plan_node_);
9596
results->segment_ = (void*)this;
@@ -102,11 +103,13 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
102103
Timestamp timestamp,
103104
int64_t limit_size,
104105
bool ignore_non_pk,
105-
int32_t consistency_level) const {
106+
int32_t consistency_level,
107+
Timestamp collection_ttl) const {
106108
std::shared_lock lck(mutex_);
107109
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
108110
auto results = std::make_unique<proto::segcore::RetrieveResults>();
109-
query::ExecPlanNodeVisitor visitor(*this, timestamp, consistency_level);
111+
query::ExecPlanNodeVisitor visitor(
112+
*this, timestamp, consistency_level, collection_ttl);
110113
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
111114
retrieve_results.segment_ = (void*)this;
112115
results->set_has_more_result(retrieve_results.has_more_result);

internal/core/src/segcore/SegmentInterface.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,17 @@ class SegmentInterface {
6666
Search(const query::Plan* Plan,
6767
const query::PlaceholderGroup* placeholder_group,
6868
Timestamp timestamp,
69-
int32_t consistency_level = 0) const = 0;
69+
int32_t consistency_level = 0,
70+
Timestamp collection_ttl = 0) const = 0;
7071

7172
virtual std::unique_ptr<proto::segcore::RetrieveResults>
7273
Retrieve(tracer::TraceContext* trace_ctx,
7374
const query::RetrievePlan* Plan,
7475
Timestamp timestamp,
7576
int64_t limit_size,
7677
bool ignore_non_pk,
77-
int32_t consistency_level = 0) const = 0;
78+
int32_t consistency_level = 0,
79+
Timestamp collection_ttl = 0) const = 0;
7880

7981
virtual std::unique_ptr<proto::segcore::RetrieveResults>
8082
Retrieve(tracer::TraceContext* trace_ctx,
@@ -266,7 +268,8 @@ class SegmentInternalInterface : public SegmentInterface {
266268
Search(const query::Plan* Plan,
267269
const query::PlaceholderGroup* placeholder_group,
268270
Timestamp timestamp,
269-
int32_t consistency_level = 0) const override;
271+
int32_t consistency_level = 0,
272+
Timestamp collection_ttl = 0) const override;
270273

271274
void
272275
FillPrimaryKeys(const query::Plan* plan,
@@ -282,7 +285,8 @@ class SegmentInternalInterface : public SegmentInterface {
282285
Timestamp timestamp,
283286
int64_t limit_size,
284287
bool ignore_non_pk,
285-
int32_t consistency_level = 0) const override;
288+
int32_t consistency_level = 0,
289+
Timestamp collection_ttl = 0) const override;
286290

287291
std::unique_ptr<proto::segcore::RetrieveResults>
288292
Retrieve(tracer::TraceContext* trace_ctx,
@@ -367,7 +371,8 @@ class SegmentInternalInterface : public SegmentInterface {
367371
// bitset 1 means not hit. 0 means hit.
368372
virtual void
369373
mask_with_timestamps(BitsetTypeView& bitset_chunk,
370-
Timestamp timestamp) const = 0;
374+
Timestamp timestamp,
375+
Timestamp collection_ttl) const = 0;
371376

372377
// count of chunks
373378
virtual int64_t

internal/core/src/segcore/TimestampIndex.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,25 @@
1111

1212
#include "TimestampIndex.h"
1313

14+
using namespace std;
15+
using namespace std::chrono;
16+
1417
namespace milvus::segcore {
1518

19+
const int logicalBits = 18;
20+
const uint64_t logicalBitsMask = (1ULL << logicalBits) - 1;
21+
22+
std::pair<system_clock::time_point, uint64_t>
23+
ParseTS(uint64_t ts) {
24+
uint64_t logical = ts & logicalBitsMask;
25+
uint64_t physical = ts >> logicalBits;
26+
auto physicalTime = system_clock::from_time_t(physical / 1000);
27+
auto ms = milliseconds(physical % 1000);
28+
physicalTime += ms;
29+
30+
return make_pair(physicalTime, logical);
31+
}
32+
1633
void
1734
TimestampIndex::set_length_meta(std::vector<int64_t> lengths) {
1835
lengths_ = std::move(lengths);
@@ -84,6 +101,25 @@ TimestampIndex::GenerateBitset(Timestamp query_timestamp,
84101
return bitset;
85102
}
86103

104+
BitsetType
105+
TimestampIndex::GenerateTTLBitset(Timestamp collection_ttl,
106+
const Timestamp* timestamps,
107+
int64_t size) {
108+
BitsetType bitset;
109+
bitset.reserve(size);
110+
bitset.resize(size, false);
111+
auto cur = system_clock::now();
112+
for (int64_t i = 0; i < size; ++i) {
113+
auto [physicalTime, logical] = ParseTS(timestamps[i]);
114+
bitset[i] =
115+
(duration_cast<milliseconds>(physicalTime.time_since_epoch())
116+
.count() +
117+
collection_ttl) >
118+
duration_cast<milliseconds>(cur.time_since_epoch()).count();
119+
}
120+
return bitset;
121+
}
122+
87123
std::vector<int64_t>
88124
GenerateFakeSlices(const Timestamp* timestamps,
89125
int64_t size,

internal/core/src/segcore/TimestampIndex.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ class TimestampIndex {
4040
const Timestamp* timestamps,
4141
int64_t size);
4242

43+
static BitsetType
44+
GenerateTTLBitset(Timestamp collection_ttl,
45+
const Timestamp* timestamps,
46+
int64_t size);
47+
4348
private:
4449
// numSlice
4550
std::vector<int64_t> lengths_;

internal/core/src/segcore/segment_c.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ AsyncSearch(CTraceContext c_trace,
104104
CSearchPlan c_plan,
105105
CPlaceholderGroup c_placeholder_group,
106106
uint64_t timestamp,
107-
int32_t consistency_level) {
107+
int32_t consistency_level,
108+
uint64_t collection_ttl) {
108109
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
109110
auto plan = (milvus::query::Plan*)c_plan;
110111
auto phg_ptr = reinterpret_cast<const milvus::query::PlaceholderGroup*>(
@@ -113,8 +114,13 @@ AsyncSearch(CTraceContext c_trace,
113114
auto future = milvus::futures::Future<milvus::SearchResult>::async(
114115
milvus::futures::getGlobalCPUExecutor(),
115116
milvus::futures::ExecutePriority::HIGH,
116-
[c_trace, segment, plan, phg_ptr, timestamp, consistency_level](
117-
milvus::futures::CancellationToken cancel_token) {
117+
[c_trace,
118+
segment,
119+
plan,
120+
phg_ptr,
121+
timestamp,
122+
consistency_level,
123+
collection_ttl](milvus::futures::CancellationToken cancel_token) {
118124
// save trace context into search_info
119125
auto& trace_ctx = plan->plan_node_->search_info_.trace_ctx_;
120126
trace_ctx.traceID = c_trace.traceID;
@@ -126,8 +132,8 @@ AsyncSearch(CTraceContext c_trace,
126132

127133
segment->LazyCheckSchema(plan->schema_);
128134

129-
auto search_result =
130-
segment->Search(plan, phg_ptr, timestamp, consistency_level);
135+
auto search_result = segment->Search(
136+
plan, phg_ptr, timestamp, consistency_level, collection_ttl);
131137
if (!milvus::PositivelyRelated(
132138
plan->plan_node_->search_info_.metric_type_)) {
133139
for (auto& dis : search_result->distances_) {
@@ -176,7 +182,8 @@ AsyncRetrieve(CTraceContext c_trace,
176182
uint64_t timestamp,
177183
int64_t limit_size,
178184
bool ignore_non_pk,
179-
int32_t consistency_level) {
185+
int32_t consistency_level,
186+
uint64_t collection_ttl) {
180187
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
181188
auto plan = static_cast<const milvus::query::RetrievePlan*>(c_plan);
182189

@@ -189,7 +196,8 @@ AsyncRetrieve(CTraceContext c_trace,
189196
timestamp,
190197
limit_size,
191198
ignore_non_pk,
192-
consistency_level](milvus::futures::CancellationToken cancel_token) {
199+
consistency_level,
200+
collection_ttl](milvus::futures::CancellationToken cancel_token) {
193201
auto trace_ctx = milvus::tracer::TraceContext{
194202
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
195203
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);
@@ -201,7 +209,8 @@ AsyncRetrieve(CTraceContext c_trace,
201209
timestamp,
202210
limit_size,
203211
ignore_non_pk,
204-
consistency_level);
212+
consistency_level,
213+
collection_ttl);
205214

206215
return CreateLeakedCRetrieveResultFromProto(
207216
std::move(retrieve_result));

0 commit comments

Comments
 (0)