Skip to content

Commit f6c0e96

Browse files
committed
enhance: support TTL expiration with queries returning no results
Signed-off-by: Xianhui.Lin <[email protected]> polish Signed-off-by: Xianhui.Lin <[email protected]>
1 parent 0e75e66 commit f6c0e96

34 files changed

+671
-520
lines changed

internal/core/src/exec/QueryContext.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class QueryContext : public Context {
176176
const milvus::segcore::SegmentInternalInterface* segment,
177177
int64_t active_count,
178178
milvus::Timestamp timestamp,
179+
milvus::Timestamp collection_ttl = 0,
179180
int32_t consistency_level = 0,
180181
std::shared_ptr<QueryConfig> query_config =
181182
std::make_shared<QueryConfig>(),
@@ -187,6 +188,7 @@ class QueryContext : public Context {
187188
segment_(segment),
188189
active_count_(active_count),
189190
query_timestamp_(timestamp),
191+
collection_ttl_(collection_ttl),
190192
query_config_(query_config),
191193
executor_(executor),
192194
consistency_level_(consistency_level) {
@@ -222,6 +224,11 @@ class QueryContext : public Context {
222224
return query_timestamp_;
223225
}
224226

227+
milvus::Timestamp
228+
get_collection_ttl() {
229+
return collection_ttl_;
230+
}
231+
225232
int64_t
226233
get_active_count() {
227234
return active_count_;
@@ -290,7 +297,7 @@ class QueryContext : public Context {
290297
int64_t active_count_;
291298
// timestamp this query generate
292299
milvus::Timestamp query_timestamp_;
293-
300+
milvus::Timestamp collection_ttl_;
294301
// used for vector search
295302
milvus::SearchInfo search_info_;
296303
const query::PlaceholderGroup* placeholder_group_;

internal/core/src/exec/operator/MvccNode.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ PhyMvccNode::PhyMvccNode(int32_t operator_id,
3030
query_timestamp_ = query_context->get_query_timestamp();
3131
active_count_ = query_context->get_active_count();
3232
is_source_node_ = mvcc_node->sources().size() == 0;
33+
collection_ttl_ = query_context->get_collection_ttl();
3334
}
3435

3536
void
@@ -60,7 +61,7 @@ PhyMvccNode::GetOutput() {
6061

6162
TargetBitmapView data(col_input->GetRawData(), col_input->size());
6263
// need to expose null?
63-
segment_->mask_with_timestamps(data, query_timestamp_);
64+
segment_->mask_with_timestamps(data, query_timestamp_, collection_ttl_);
6465
segment_->mask_with_delete(data, active_count_, query_timestamp_);
6566
is_finished_ = true;
6667

internal/core/src/exec/operator/MvccNode.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class PhyMvccNode : public Operator {
7272
int64_t active_count_;
7373
bool is_finished_{false};
7474
bool is_source_node_{false};
75+
milvus::Timestamp collection_ttl_;
7576
};
7677

7778
} // namespace exec

internal/core/src/query/ExecPlanNodeVisitor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
3434
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
3535
Timestamp timestamp,
3636
const PlaceholderGroup& placeholder_group,
37-
int32_t consystency_level)
37+
int32_t consystency_level,
38+
Timestamp collection_ttl)
3839
: segment_(segment),
3940
timestamp_(timestamp),
4041
placeholder_group_(placeholder_group),
41-
consystency_level_(consystency_level) {
42+
consystency_level_(consystency_level),
43+
collection_ttl_(collection_ttl_) {
4244
}
4345

4446
SearchResult
@@ -59,6 +61,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
5961
private:
6062
const segcore::SegmentInterface& segment_;
6163
Timestamp timestamp_;
64+
Timestamp collection_ttl_;
6265
const PlaceholderGroup& placeholder_group_;
6366

6467
SearchResultOpt search_result_opt_;
@@ -134,6 +137,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
134137
segment,
135138
active_count,
136139
timestamp_,
140+
collection_ttl_,
137141
consystency_level_);
138142
query_context->set_search_info(node.search_info_);
139143
query_context->set_placeholder_group(placeholder_group_);
@@ -189,6 +193,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
189193
segment,
190194
active_count,
191195
timestamp_,
196+
collection_ttl_,
192197
consystency_level_);
193198

194199
// Do task execution

internal/core/src/query/ExecPlanNodeVisitor.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,22 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
4444
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
4545
Timestamp timestamp,
4646
const PlaceholderGroup* placeholder_group,
47-
int32_t consystency_level = 0)
47+
int32_t consystency_level = 0,
48+
Timestamp collection_ttl = 0)
4849
: segment_(segment),
4950
timestamp_(timestamp),
51+
collection_ttl_(collection_ttl),
5052
placeholder_group_(placeholder_group),
5153
consystency_level_(consystency_level) {
5254
}
5355

5456
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
5557
Timestamp timestamp,
56-
int32_t consystency_level = 0)
58+
int32_t consystency_level = 0,
59+
Timestamp collection_ttl = 0)
5760
: segment_(segment),
5861
timestamp_(timestamp),
62+
collection_ttl_(collection_ttl),
5963
consystency_level_(consystency_level) {
6064
placeholder_group_ = nullptr;
6165
}
@@ -105,6 +109,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
105109
private:
106110
const segcore::SegmentInterface& segment_;
107111
Timestamp timestamp_;
112+
Timestamp collection_ttl_;
108113
const PlaceholderGroup* placeholder_group_;
109114

110115
SearchResultOpt search_result_opt_;

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1950,13 +1950,19 @@ ChunkedSegmentSealedImpl::get_active_count(Timestamp ts) const {
19501950

19511951
void
19521952
ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
1953-
Timestamp timestamp) const {
1953+
Timestamp timestamp,
1954+
Timestamp collection_ttl) const {
19541955
// TODO change the
19551956
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
19561957
"num chunk not equal to 1 for sealed segment");
19571958
auto timestamps_data =
19581959
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
19591960
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
1961+
if (collection_ttl > 0) {
1962+
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
1963+
collection_ttl, timestamps_data, timestamps_data_size);
1964+
bitset_chunk |= ttl_mask;
1965+
}
19601966

19611967
AssertInfo(timestamps_data_size == get_row_count(),
19621968
fmt::format("Timestamp size not equal to row count: {}, {}",

internal/core/src/segcore/ChunkedSegmentSealedImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
333333

334334
void
335335
mask_with_timestamps(BitsetTypeView& bitset_chunk,
336-
Timestamp timestamp) const override;
336+
Timestamp timestamp,
337+
Timestamp collection_ttl) const override;
337338

338339
void
339340
vector_search(SearchInfo& search_info,

internal/core/src/segcore/SegmentGrowingImpl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,8 @@ SegmentGrowingImpl::get_active_count(Timestamp ts) const {
862862

863863
void
864864
SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
865-
Timestamp timestamp) const {
865+
Timestamp timestamp,
866+
Timestamp collection_ttl) const {
866867
// DO NOTHING
867868
}
868869

internal/core/src/segcore/SegmentGrowingImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
268268

269269
void
270270
mask_with_timestamps(BitsetTypeView& bitset_chunk,
271-
Timestamp timestamp) const override;
271+
Timestamp timestamp,
272+
Timestamp ttl = 0) const override;
272273

273274
void
274275
vector_search(SearchInfo& search_info,

internal/core/src/segcore/SegmentInterface.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,13 @@ SegmentInternalInterface::Search(
8080
const query::Plan* plan,
8181
const query::PlaceholderGroup* placeholder_group,
8282
Timestamp timestamp,
83-
int32_t consistency_level) const {
83+
int32_t consistency_level,
84+
Timestamp collection_ttl) const {
8485
std::shared_lock lck(mutex_);
8586
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
8687
check_search(plan);
8788
query::ExecPlanNodeVisitor visitor(
88-
*this, timestamp, placeholder_group, consistency_level);
89+
*this, timestamp, placeholder_group, consistency_level, collection_ttl);
8990
auto results = std::make_unique<SearchResult>();
9091
*results = visitor.get_moved_result(*plan->plan_node_);
9192
results->segment_ = (void*)this;
@@ -98,11 +99,13 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
9899
Timestamp timestamp,
99100
int64_t limit_size,
100101
bool ignore_non_pk,
101-
int32_t consistency_level) const {
102+
int32_t consistency_level,
103+
Timestamp collection_ttl) const {
102104
std::shared_lock lck(mutex_);
103105
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
104106
auto results = std::make_unique<proto::segcore::RetrieveResults>();
105-
query::ExecPlanNodeVisitor visitor(*this, timestamp, consistency_level);
107+
query::ExecPlanNodeVisitor visitor(
108+
*this, timestamp, consistency_level, collection_ttl);
106109
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
107110
retrieve_results.segment_ = (void*)this;
108111
results->set_has_more_result(retrieve_results.has_more_result);

internal/core/src/segcore/SegmentInterface.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,17 @@ class SegmentInterface {
6666
Search(const query::Plan* Plan,
6767
const query::PlaceholderGroup* placeholder_group,
6868
Timestamp timestamp,
69-
int32_t consistency_level = 0) const = 0;
69+
int32_t consistency_level = 0,
70+
Timestamp collection_ttl = 0) const = 0;
7071

7172
virtual std::unique_ptr<proto::segcore::RetrieveResults>
7273
Retrieve(tracer::TraceContext* trace_ctx,
7374
const query::RetrievePlan* Plan,
7475
Timestamp timestamp,
7576
int64_t limit_size,
7677
bool ignore_non_pk,
77-
int32_t consistency_level = 0) const = 0;
78+
int32_t consistency_level = 0,
79+
Timestamp collection_ttl = 0) const = 0;
7880

7981
virtual std::unique_ptr<proto::segcore::RetrieveResults>
8082
Retrieve(tracer::TraceContext* trace_ctx,
@@ -270,7 +272,8 @@ class SegmentInternalInterface : public SegmentInterface {
270272
Search(const query::Plan* Plan,
271273
const query::PlaceholderGroup* placeholder_group,
272274
Timestamp timestamp,
273-
int32_t consistency_level = 0) const override;
275+
int32_t consistency_level = 0,
276+
Timestamp collection_ttl = 0) const override;
274277

275278
void
276279
FillPrimaryKeys(const query::Plan* plan,
@@ -286,7 +289,8 @@ class SegmentInternalInterface : public SegmentInterface {
286289
Timestamp timestamp,
287290
int64_t limit_size,
288291
bool ignore_non_pk,
289-
int32_t consistency_level = 0) const override;
292+
int32_t consistency_level = 0,
293+
Timestamp collection_ttl = 0) const override;
290294

291295
std::unique_ptr<proto::segcore::RetrieveResults>
292296
Retrieve(tracer::TraceContext* trace_ctx,
@@ -380,7 +384,8 @@ class SegmentInternalInterface : public SegmentInterface {
380384
// bitset 1 means not hit. 0 means hit.
381385
virtual void
382386
mask_with_timestamps(BitsetTypeView& bitset_chunk,
383-
Timestamp timestamp) const = 0;
387+
Timestamp timestamp,
388+
Timestamp collection_ttl) const = 0;
384389

385390
// count of chunks
386391
virtual int64_t

internal/core/src/segcore/SegmentSealedImpl.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,14 +1835,19 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {
18351835

18361836
void
18371837
SegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
1838-
Timestamp timestamp) const {
1838+
Timestamp timestamp,
1839+
Timestamp collection_ttl) const {
18391840
// TODO change the
18401841
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
18411842
"num chunk not equal to 1 for sealed segment");
18421843
auto timestamps_data =
18431844
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
18441845
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
1845-
1846+
if (collection_ttl > 0) {
1847+
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
1848+
collection_ttl, timestamps_data, timestamps_data_size);
1849+
bitset_chunk |= ttl_mask;
1850+
}
18461851
AssertInfo(timestamps_data_size == get_row_count(),
18471852
fmt::format("Timestamp size not equal to row count: {}, {}",
18481853
timestamps_data_size,
@@ -1862,6 +1867,8 @@ SegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
18621867
bitset_chunk.set();
18631868
return;
18641869
}
1870+
1871+
// Generate bitset for timestamp range and TTL in one pass
18651872
auto mask = TimestampIndex::GenerateBitset(
18661873
timestamp, range, timestamps_data, timestamps_data_size);
18671874
bitset_chunk |= mask;

internal/core/src/segcore/SegmentSealedImpl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ class SegmentSealedImpl : public SegmentSealed {
326326

327327
void
328328
mask_with_timestamps(BitsetTypeView& bitset_chunk,
329-
Timestamp timestamp) const override;
329+
Timestamp timestamp,
330+
Timestamp collection_ttl) const override;
330331

331332
void
332333
vector_search(SearchInfo& search_info,

internal/core/src/segcore/TimestampIndex.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,25 @@
1111

1212
#include "TimestampIndex.h"
1313

14+
using namespace std;
15+
using namespace std::chrono;
16+
1417
namespace milvus::segcore {
1518

19+
const int logicalBits = 18;
20+
const uint64_t logicalBitsMask = (1ULL << logicalBits) - 1;
21+
22+
std::pair<system_clock::time_point, uint64_t>
23+
ParseTS(uint64_t ts) {
24+
uint64_t logical = ts & logicalBitsMask;
25+
uint64_t physical = ts >> logicalBits;
26+
auto physicalTime = system_clock::from_time_t(physical / 1000);
27+
auto ms = milliseconds(physical % 1000);
28+
physicalTime += ms;
29+
30+
return make_pair(physicalTime, logical);
31+
}
32+
1633
void
1734
TimestampIndex::set_length_meta(std::vector<int64_t> lengths) {
1835
lengths_ = std::move(lengths);
@@ -84,6 +101,25 @@ TimestampIndex::GenerateBitset(Timestamp query_timestamp,
84101
return bitset;
85102
}
86103

104+
BitsetType
105+
TimestampIndex::GenerateTTLBitset(Timestamp collection_ttl,
106+
const Timestamp* timestamps,
107+
int64_t size) {
108+
BitsetType bitset;
109+
bitset.reserve(size);
110+
bitset.resize(size, false);
111+
auto cur = system_clock::now();
112+
for (int64_t i = 0; i < size; ++i) {
113+
auto [physicalTime, logical] = ParseTS(timestamps[i]);
114+
bitset[i] =
115+
(duration_cast<milliseconds>(physicalTime.time_since_epoch())
116+
.count() +
117+
collection_ttl) >
118+
duration_cast<milliseconds>(cur.time_since_epoch()).count();
119+
}
120+
return bitset;
121+
}
122+
87123
std::vector<int64_t>
88124
GenerateFakeSlices(const Timestamp* timestamps,
89125
int64_t size,

internal/core/src/segcore/TimestampIndex.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ class TimestampIndex {
4040
const Timestamp* timestamps,
4141
int64_t size);
4242

43+
static BitsetType
44+
GenerateTTLBitset(Timestamp collection_ttl,
45+
const Timestamp* timestamps,
46+
int64_t size);
47+
4348
private:
4449
// numSlice
4550
std::vector<int64_t> lengths_;

0 commit comments

Comments
 (0)