Skip to content

Commit 1172b12

Browse files
committed
enhance: support TTL expiration with queries returning no results (milvus-io#41960)
support TTL expiration with queries returning no results issue:milvus-io#41959 pr:milvus-io#41720 --------- Signed-off-by: Xianhui.Lin <[email protected]> fix Signed-off-by: Xianhui.Lin <[email protected]> enhancement: query ttl Signed-off-by: Xianhui.Lin <[email protected]> fix Signed-off-by: Xianhui.Lin <[email protected]> fix Signed-off-by: Xianhui.Lin <[email protected]> fix Signed-off-by: Xianhui.Lin <[email protected]> feat growing Signed-off-by: Xianhui.Lin <[email protected]> fix Signed-off-by: Xianhui.Lin <[email protected]> fix Signed-off-by: Xianhui.Lin <[email protected]>
1 parent 59a6eef commit 1172b12

35 files changed

+718
-546
lines changed

internal/core/src/exec/QueryContext.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class QueryContext : public Context {
176176
const milvus::segcore::SegmentInternalInterface* segment,
177177
int64_t active_count,
178178
milvus::Timestamp timestamp,
179+
milvus::Timestamp collection_ttl = 0,
179180
int32_t consistency_level = 0,
180181
std::shared_ptr<QueryConfig> query_config =
181182
std::make_shared<QueryConfig>(),
@@ -187,6 +188,7 @@ class QueryContext : public Context {
187188
segment_(segment),
188189
active_count_(active_count),
189190
query_timestamp_(timestamp),
191+
collection_ttl_timestamp_(collection_ttl),
190192
query_config_(query_config),
191193
executor_(executor),
192194
consistency_level_(consistency_level) {
@@ -222,6 +224,11 @@ class QueryContext : public Context {
222224
return query_timestamp_;
223225
}
224226

227+
milvus::Timestamp
228+
get_collection_ttl() {
229+
return collection_ttl_timestamp_;
230+
}
231+
225232
int64_t
226233
get_active_count() {
227234
return active_count_;
@@ -290,7 +297,7 @@ class QueryContext : public Context {
290297
int64_t active_count_;
291298
// timestamp this query generate
292299
milvus::Timestamp query_timestamp_;
293-
300+
milvus::Timestamp collection_ttl_timestamp_;
294301
// used for vector search
295302
milvus::SearchInfo search_info_;
296303
const query::PlaceholderGroup* placeholder_group_;

internal/core/src/exec/operator/MvccNode.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ PhyMvccNode::PhyMvccNode(int32_t operator_id,
3333
query_timestamp_ = query_context->get_query_timestamp();
3434
active_count_ = query_context->get_active_count();
3535
is_source_node_ = mvcc_node->sources().size() == 0;
36+
collection_ttl_timestamp_ = query_context->get_collection_ttl();
3637
}
3738

3839
void
@@ -63,7 +64,8 @@ PhyMvccNode::GetOutput() {
6364

6465
TargetBitmapView data(col_input->GetRawData(), col_input->size());
6566
// need to expose null?
66-
segment_->mask_with_timestamps(data, query_timestamp_);
67+
segment_->mask_with_timestamps(
68+
data, query_timestamp_, collection_ttl_timestamp_);
6769
segment_->mask_with_delete(data, active_count_, query_timestamp_);
6870
is_finished_ = true;
6971

internal/core/src/exec/operator/MvccNode.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class PhyMvccNode : public Operator {
7272
int64_t active_count_;
7373
bool is_finished_{false};
7474
bool is_source_node_{false};
75+
milvus::Timestamp collection_ttl_timestamp_;
7576
};
7677

7778
} // namespace exec

internal/core/src/query/ExecPlanNodeVisitor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
3434
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
3535
Timestamp timestamp,
3636
const PlaceholderGroup& placeholder_group,
37-
int32_t consystency_level)
37+
int32_t consystency_level,
38+
Timestamp collection_ttl)
3839
: segment_(segment),
3940
timestamp_(timestamp),
4041
placeholder_group_(placeholder_group),
41-
consystency_level_(consystency_level) {
42+
consystency_level_(consystency_level),
43+
collection_ttl_timestamp_(collection_ttl_timestamp_) {
4244
}
4345

4446
SearchResult
@@ -59,6 +61,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
5961
private:
6062
const segcore::SegmentInterface& segment_;
6163
Timestamp timestamp_;
64+
Timestamp collection_ttl_timestamp_;
6265
const PlaceholderGroup& placeholder_group_;
6366

6467
SearchResultOpt search_result_opt_;
@@ -134,6 +137,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
134137
segment,
135138
active_count,
136139
timestamp_,
140+
collection_ttl_timestamp_,
137141
consystency_level_);
138142
query_context->set_search_info(node.search_info_);
139143
query_context->set_placeholder_group(placeholder_group_);
@@ -189,6 +193,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
189193
segment,
190194
active_count,
191195
timestamp_,
196+
collection_ttl_timestamp_,
192197
consystency_level_);
193198

194199
// Do task execution

internal/core/src/query/ExecPlanNodeVisitor.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,22 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
4747
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
4848
Timestamp timestamp,
4949
const PlaceholderGroup* placeholder_group,
50-
int32_t consystency_level = 0)
50+
int32_t consystency_level = 0,
51+
Timestamp collection_ttl = 0)
5152
: segment_(segment),
5253
timestamp_(timestamp),
54+
collection_ttl_timestamp_(collection_ttl),
5355
placeholder_group_(placeholder_group),
5456
consystency_level_(consystency_level) {
5557
}
5658

5759
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
5860
Timestamp timestamp,
59-
int32_t consystency_level = 0)
61+
int32_t consystency_level = 0,
62+
Timestamp collection_ttl = 0)
6063
: segment_(segment),
6164
timestamp_(timestamp),
65+
collection_ttl_timestamp_(collection_ttl),
6266
consystency_level_(consystency_level) {
6367
placeholder_group_ = nullptr;
6468
}
@@ -108,6 +112,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
108112
private:
109113
const segcore::SegmentInterface& segment_;
110114
Timestamp timestamp_;
115+
Timestamp collection_ttl_timestamp_;
111116
const PlaceholderGroup* placeholder_group_;
112117

113118
SearchResultOpt search_result_opt_;

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1610,13 +1610,27 @@ ChunkedSegmentSealedImpl::get_active_count(Timestamp ts) const {
16101610

16111611
void
16121612
ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
1613-
Timestamp timestamp) const {
1613+
Timestamp timestamp,
1614+
Timestamp collection_ttl) const {
16141615
// TODO change the
16151616
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
16161617
"num chunk not equal to 1 for sealed segment");
16171618
auto timestamps_data =
16181619
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
16191620
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
1621+
if (collection_ttl > 0) {
1622+
auto range =
1623+
insert_record_.timestamp_index_.get_active_range(collection_ttl);
1624+
if (range.first == range.second &&
1625+
range.first == timestamps_data_size) {
1626+
bitset_chunk.set();
1627+
return;
1628+
} else {
1629+
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
1630+
timestamps_data, timestamps_data_size, collection_ttl, range);
1631+
bitset_chunk |= ttl_mask;
1632+
}
1633+
}
16201634

16211635
AssertInfo(timestamps_data_size == get_row_count(),
16221636
fmt::format("Timestamp size not equal to row count: {}, {}",

internal/core/src/segcore/ChunkedSegmentSealedImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
343343

344344
void
345345
mask_with_timestamps(BitsetTypeView& bitset_chunk,
346-
Timestamp timestamp) const override;
346+
Timestamp timestamp,
347+
Timestamp collection_ttl) const override;
347348

348349
void
349350
vector_search(SearchInfo& search_info,

internal/core/src/segcore/SegmentGrowingImpl.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,22 @@ SegmentGrowingImpl::get_active_count(Timestamp ts) const {
10741074

10751075
void
10761076
SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
1077-
Timestamp timestamp) const {
1078-
// DO NOTHING
1077+
Timestamp timestamp,
1078+
Timestamp collection_ttl) const {
1079+
if (collection_ttl > 0) {
1080+
auto& timestamps = get_timestamps();
1081+
auto size = bitset_chunk.size();
1082+
if (timestamps[size - 1] <= collection_ttl) {
1083+
bitset_chunk.set();
1084+
return;
1085+
}
1086+
auto pilot = upper_bound(timestamps, 0, size, collection_ttl);
1087+
BitsetType bitset;
1088+
bitset.reserve(size);
1089+
bitset.resize(pilot, true);
1090+
bitset.resize(size, false);
1091+
bitset_chunk |= bitset;
1092+
}
10791093
}
10801094

10811095
void

internal/core/src/segcore/SegmentGrowingImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
307307

308308
void
309309
mask_with_timestamps(BitsetTypeView& bitset_chunk,
310-
Timestamp timestamp) const override;
310+
Timestamp timestamp,
311+
Timestamp ttl = 0) const override;
311312

312313
void
313314
vector_search(SearchInfo& search_info,

internal/core/src/segcore/SegmentInterface.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,13 @@ SegmentInternalInterface::Search(
8484
const query::Plan* plan,
8585
const query::PlaceholderGroup* placeholder_group,
8686
Timestamp timestamp,
87-
int32_t consistency_level) const {
87+
int32_t consistency_level,
88+
Timestamp collection_ttl) const {
8889
std::shared_lock lck(mutex_);
8990
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
9091
check_search(plan);
9192
query::ExecPlanNodeVisitor visitor(
92-
*this, timestamp, placeholder_group, consistency_level);
93+
*this, timestamp, placeholder_group, consistency_level, collection_ttl);
9394
auto results = std::make_unique<SearchResult>();
9495
*results = visitor.get_moved_result(*plan->plan_node_);
9596
results->segment_ = (void*)this;
@@ -102,11 +103,13 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
102103
Timestamp timestamp,
103104
int64_t limit_size,
104105
bool ignore_non_pk,
105-
int32_t consistency_level) const {
106+
int32_t consistency_level,
107+
Timestamp collection_ttl) const {
106108
std::shared_lock lck(mutex_);
107109
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
108110
auto results = std::make_unique<proto::segcore::RetrieveResults>();
109-
query::ExecPlanNodeVisitor visitor(*this, timestamp, consistency_level);
111+
query::ExecPlanNodeVisitor visitor(
112+
*this, timestamp, consistency_level, collection_ttl);
110113
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
111114
retrieve_results.segment_ = (void*)this;
112115
results->set_has_more_result(retrieve_results.has_more_result);

internal/core/src/segcore/SegmentInterface.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,17 @@ class SegmentInterface {
6666
Search(const query::Plan* Plan,
6767
const query::PlaceholderGroup* placeholder_group,
6868
Timestamp timestamp,
69-
int32_t consistency_level = 0) const = 0;
69+
int32_t consistency_level = 0,
70+
Timestamp collection_ttl = 0) const = 0;
7071

7172
virtual std::unique_ptr<proto::segcore::RetrieveResults>
7273
Retrieve(tracer::TraceContext* trace_ctx,
7374
const query::RetrievePlan* Plan,
7475
Timestamp timestamp,
7576
int64_t limit_size,
7677
bool ignore_non_pk,
77-
int32_t consistency_level = 0) const = 0;
78+
int32_t consistency_level = 0,
79+
Timestamp collection_ttl = 0) const = 0;
7880

7981
virtual std::unique_ptr<proto::segcore::RetrieveResults>
8082
Retrieve(tracer::TraceContext* trace_ctx,
@@ -266,7 +268,8 @@ class SegmentInternalInterface : public SegmentInterface {
266268
Search(const query::Plan* Plan,
267269
const query::PlaceholderGroup* placeholder_group,
268270
Timestamp timestamp,
269-
int32_t consistency_level = 0) const override;
271+
int32_t consistency_level = 0,
272+
Timestamp collection_ttl = 0) const override;
270273

271274
void
272275
FillPrimaryKeys(const query::Plan* plan,
@@ -282,7 +285,8 @@ class SegmentInternalInterface : public SegmentInterface {
282285
Timestamp timestamp,
283286
int64_t limit_size,
284287
bool ignore_non_pk,
285-
int32_t consistency_level = 0) const override;
288+
int32_t consistency_level = 0,
289+
Timestamp collection_ttl = 0) const override;
286290

287291
std::unique_ptr<proto::segcore::RetrieveResults>
288292
Retrieve(tracer::TraceContext* trace_ctx,
@@ -367,7 +371,8 @@ class SegmentInternalInterface : public SegmentInterface {
367371
// bitset 1 means not hit. 0 means hit.
368372
virtual void
369373
mask_with_timestamps(BitsetTypeView& bitset_chunk,
370-
Timestamp timestamp) const = 0;
374+
Timestamp timestamp,
375+
Timestamp collection_ttl) const = 0;
371376

372377
// count of chunks
373378
virtual int64_t

internal/core/src/segcore/TimestampIndex.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,25 @@ TimestampIndex::GenerateBitset(Timestamp query_timestamp,
8484
return bitset;
8585
}
8686

87+
BitsetType
88+
TimestampIndex::GenerateTTLBitset(const Timestamp* timestamps,
89+
int64_t size,
90+
Timestamp expire_ts,
91+
std::pair<int64_t, int64_t> active_range) {
92+
auto beg = active_range.first;
93+
auto end = active_range.second;
94+
95+
BitsetType bitset;
96+
bitset.reserve(size);
97+
bitset.resize(beg, true);
98+
bitset.resize(size, false);
99+
100+
for (int64_t i = beg; i < end; ++i) {
101+
bitset[i] = timestamps[i] <= expire_ts;
102+
}
103+
return bitset;
104+
}
105+
87106
std::vector<int64_t>
88107
GenerateFakeSlices(const Timestamp* timestamps,
89108
int64_t size,

internal/core/src/segcore/TimestampIndex.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#include <utility>
1717

1818
#include "common/Schema.h"
19-
2019
namespace milvus::segcore {
2120

2221
class TimestampIndex {
@@ -40,6 +39,12 @@ class TimestampIndex {
4039
const Timestamp* timestamps,
4140
int64_t size);
4241

42+
static BitsetType
43+
GenerateTTLBitset(const Timestamp* timestamps,
44+
int64_t size,
45+
Timestamp expire_ts,
46+
std::pair<int64_t, int64_t> active_range);
47+
4348
private:
4449
// numSlice
4550
std::vector<int64_t> lengths_;
@@ -56,5 +61,4 @@ std::vector<int64_t>
5661
GenerateFakeSlices(const Timestamp* timestamps,
5762
int64_t size,
5863
int min_slice_length = 1);
59-
6064
} // namespace milvus::segcore

0 commit comments

Comments
 (0)