Skip to content

enhance: support TTL expiration with queries returning no results #41960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/core/src/exec/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class QueryContext : public Context {
const milvus::segcore::SegmentInternalInterface* segment,
int64_t active_count,
milvus::Timestamp timestamp,
milvus::Timestamp collection_ttl = 0,
int32_t consistency_level = 0,
std::shared_ptr<QueryConfig> query_config =
std::make_shared<QueryConfig>(),
Expand All @@ -187,6 +188,7 @@ class QueryContext : public Context {
segment_(segment),
active_count_(active_count),
query_timestamp_(timestamp),
collection_ttl_(collection_ttl),
query_config_(query_config),
executor_(executor),
consistency_level_(consistency_level) {
Expand Down Expand Up @@ -222,6 +224,11 @@ class QueryContext : public Context {
return query_timestamp_;
}

milvus::Timestamp
get_collection_ttl() {
return collection_ttl_;
}

int64_t
get_active_count() {
return active_count_;
Expand Down Expand Up @@ -290,7 +297,7 @@ class QueryContext : public Context {
int64_t active_count_;
// timestamp this query generate
milvus::Timestamp query_timestamp_;

milvus::Timestamp collection_ttl_;
// used for vector search
milvus::SearchInfo search_info_;
const query::PlaceholderGroup* placeholder_group_;
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/exec/operator/MvccNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ PhyMvccNode::PhyMvccNode(int32_t operator_id,
query_timestamp_ = query_context->get_query_timestamp();
active_count_ = query_context->get_active_count();
is_source_node_ = mvcc_node->sources().size() == 0;
collection_ttl_ = query_context->get_collection_ttl();
}

void
Expand Down Expand Up @@ -60,7 +61,7 @@ PhyMvccNode::GetOutput() {

TargetBitmapView data(col_input->GetRawData(), col_input->size());
// need to expose null?
segment_->mask_with_timestamps(data, query_timestamp_);
segment_->mask_with_timestamps(data, query_timestamp_, collection_ttl_);
segment_->mask_with_delete(data, active_count_, query_timestamp_);
is_finished_ = true;

Expand Down
1 change: 1 addition & 0 deletions internal/core/src/exec/operator/MvccNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class PhyMvccNode : public Operator {
int64_t active_count_;
bool is_finished_{false};
bool is_source_node_{false};
milvus::Timestamp collection_ttl_;
};

} // namespace exec
Expand Down
9 changes: 7 additions & 2 deletions internal/core/src/query/ExecPlanNodeVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
Timestamp timestamp,
const PlaceholderGroup& placeholder_group,
int32_t consystency_level)
int32_t consystency_level,
Timestamp collection_ttl)
: segment_(segment),
timestamp_(timestamp),
placeholder_group_(placeholder_group),
consystency_level_(consystency_level) {
consystency_level_(consystency_level),
collection_ttl_(collection_ttl_) {
}

SearchResult
Expand All @@ -59,6 +61,7 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
private:
const segcore::SegmentInterface& segment_;
Timestamp timestamp_;
Timestamp collection_ttl_;
const PlaceholderGroup& placeholder_group_;

SearchResultOpt search_result_opt_;
Expand Down Expand Up @@ -134,6 +137,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
segment,
active_count,
timestamp_,
collection_ttl_,
consystency_level_);
query_context->set_search_info(node.search_info_);
query_context->set_placeholder_group(placeholder_group_);
Expand Down Expand Up @@ -189,6 +193,7 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
segment,
active_count,
timestamp_,
collection_ttl_,
consystency_level_);

// Do task execution
Expand Down
9 changes: 7 additions & 2 deletions internal/core/src/query/ExecPlanNodeVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
Timestamp timestamp,
const PlaceholderGroup* placeholder_group,
int32_t consystency_level = 0)
int32_t consystency_level = 0,
Timestamp collection_ttl = 0)
: segment_(segment),
timestamp_(timestamp),
collection_ttl_(collection_ttl),
placeholder_group_(placeholder_group),
consystency_level_(consystency_level) {
}

ExecPlanNodeVisitor(const segcore::SegmentInterface& segment,
Timestamp timestamp,
int32_t consystency_level = 0)
int32_t consystency_level = 0,
Timestamp collection_ttl = 0)
: segment_(segment),
timestamp_(timestamp),
collection_ttl_(collection_ttl),
consystency_level_(consystency_level) {
placeholder_group_ = nullptr;
}
Expand Down Expand Up @@ -105,6 +109,7 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
private:
const segcore::SegmentInterface& segment_;
Timestamp timestamp_;
Timestamp collection_ttl_;
const PlaceholderGroup* placeholder_group_;

SearchResultOpt search_result_opt_;
Expand Down
8 changes: 7 additions & 1 deletion internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1950,13 +1950,19 @@

void
ChunkedSegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const {
Timestamp timestamp,
Timestamp collection_ttl) const {
// TODO change the
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto timestamps_data =
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);
if (collection_ttl > 0) {
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
collection_ttl, timestamps_data, timestamps_data_size);
bitset_chunk |= ttl_mask;

Check warning on line 1964 in internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp#L1961-L1964

Added lines #L1961 - L1964 were not covered by tests
}

AssertInfo(timestamps_data_size == get_row_count(),
fmt::format("Timestamp size not equal to row count: {}, {}",
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {

void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
Timestamp timestamp,
Timestamp collection_ttl) const override;

void
vector_search(SearchInfo& search_info,
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ SegmentGrowingImpl::get_active_count(Timestamp ts) const {

void
SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const {
Timestamp timestamp,
Timestamp collection_ttl) const {
// DO NOTHING
}

Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ class SegmentGrowingImpl : public SegmentGrowing {

void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
Timestamp timestamp,
Timestamp ttl = 0) const override;

void
vector_search(SearchInfo& search_info,
Expand Down
11 changes: 7 additions & 4 deletions internal/core/src/segcore/SegmentInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ SegmentInternalInterface::Search(
const query::Plan* plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
int32_t consistency_level) const {
int32_t consistency_level,
Timestamp collection_ttl) const {
std::shared_lock lck(mutex_);
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
check_search(plan);
query::ExecPlanNodeVisitor visitor(
*this, timestamp, placeholder_group, consistency_level);
*this, timestamp, placeholder_group, consistency_level, collection_ttl);
auto results = std::make_unique<SearchResult>();
*results = visitor.get_moved_result(*plan->plan_node_);
results->segment_ = (void*)this;
Expand All @@ -98,11 +99,13 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level) const {
int32_t consistency_level,
Timestamp collection_ttl) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
auto results = std::make_unique<proto::segcore::RetrieveResults>();
query::ExecPlanNodeVisitor visitor(*this, timestamp, consistency_level);
query::ExecPlanNodeVisitor visitor(
*this, timestamp, consistency_level, collection_ttl);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
retrieve_results.segment_ = (void*)this;
results->set_has_more_result(retrieve_results.has_more_result);
Expand Down
15 changes: 10 additions & 5 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ class SegmentInterface {
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
int32_t consistency_level = 0) const = 0;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const = 0;

virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level = 0) const = 0;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const = 0;

virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(tracer::TraceContext* trace_ctx,
Expand Down Expand Up @@ -270,7 +272,8 @@ class SegmentInternalInterface : public SegmentInterface {
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp,
int32_t consistency_level = 0) const override;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const override;

void
FillPrimaryKeys(const query::Plan* plan,
Expand All @@ -286,7 +289,8 @@ class SegmentInternalInterface : public SegmentInterface {
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk,
int32_t consistency_level = 0) const override;
int32_t consistency_level = 0,
Timestamp collection_ttl = 0) const override;

std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(tracer::TraceContext* trace_ctx,
Expand Down Expand Up @@ -380,7 +384,8 @@ class SegmentInternalInterface : public SegmentInterface {
// bitset 1 means not hit. 0 means hit.
virtual void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const = 0;
Timestamp timestamp,
Timestamp collection_ttl) const = 0;

// count of chunks
virtual int64_t
Expand Down
11 changes: 9 additions & 2 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1835,14 +1835,19 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {

void
SegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const {
Timestamp timestamp,
Timestamp collection_ttl) const {
// TODO change the
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto timestamps_data =
(const milvus::Timestamp*)insert_record_.timestamps_.get_chunk_data(0);
auto timestamps_data_size = insert_record_.timestamps_.get_chunk_size(0);

if (collection_ttl > 0) {
auto ttl_mask = TimestampIndex::GenerateTTLBitset(
collection_ttl, timestamps_data, timestamps_data_size);
bitset_chunk |= ttl_mask;
}
AssertInfo(timestamps_data_size == get_row_count(),
fmt::format("Timestamp size not equal to row count: {}, {}",
timestamps_data_size,
Expand All @@ -1862,6 +1867,8 @@ SegmentSealedImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
bitset_chunk.set();
return;
}

// Generate bitset for timestamp range and TTL in one pass
auto mask = TimestampIndex::GenerateBitset(
timestamp, range, timestamps_data, timestamps_data_size);
bitset_chunk |= mask;
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ class SegmentSealedImpl : public SegmentSealed {

void
mask_with_timestamps(BitsetTypeView& bitset_chunk,
Timestamp timestamp) const override;
Timestamp timestamp,
Timestamp collection_ttl) const override;

void
vector_search(SearchInfo& search_info,
Expand Down
36 changes: 36 additions & 0 deletions internal/core/src/segcore/TimestampIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,25 @@

#include "TimestampIndex.h"

using namespace std;
using namespace std::chrono;

namespace milvus::segcore {

const int logicalBits = 18;
const uint64_t logicalBitsMask = (1ULL << logicalBits) - 1;

std::pair<system_clock::time_point, uint64_t>
ParseTS(uint64_t ts) {
uint64_t logical = ts & logicalBitsMask;
uint64_t physical = ts >> logicalBits;
auto physicalTime = system_clock::from_time_t(physical / 1000);
auto ms = milliseconds(physical % 1000);
physicalTime += ms;

return make_pair(physicalTime, logical);
}

void
TimestampIndex::set_length_meta(std::vector<int64_t> lengths) {
lengths_ = std::move(lengths);
Expand Down Expand Up @@ -84,6 +101,25 @@ TimestampIndex::GenerateBitset(Timestamp query_timestamp,
return bitset;
}

BitsetType
TimestampIndex::GenerateTTLBitset(Timestamp collection_ttl,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use index to accelerate

const Timestamp* timestamps,
int64_t size) {
BitsetType bitset;
bitset.reserve(size);
bitset.resize(size, false);
auto cur = system_clock::now();
for (int64_t i = 0; i < size; ++i) {
auto [physicalTime, logical] = ParseTS(timestamps[i]);
bitset[i] =
(duration_cast<milliseconds>(physicalTime.time_since_epoch())
.count() +
collection_ttl) >
duration_cast<milliseconds>(cur.time_since_epoch()).count();
}
return bitset;
}

std::vector<int64_t>
GenerateFakeSlices(const Timestamp* timestamps,
int64_t size,
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/segcore/TimestampIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class TimestampIndex {
const Timestamp* timestamps,
int64_t size);

static BitsetType
GenerateTTLBitset(Timestamp collection_ttl,
const Timestamp* timestamps,
int64_t size);

private:
// numSlice
std::vector<int64_t> lengths_;
Expand Down
Loading
Loading