Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2478,6 +2478,7 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
->Register()),
insert_record_(*schema, MAX_ROW_COUNT),
schema_(schema),
schema_version_(schema->get_schema_version()),
id_(segment_id),
col_index_meta_(index_meta),
is_sorted_by_pk_(is_sorted_by_pk),
Expand Down Expand Up @@ -4266,12 +4267,8 @@ ChunkedSegmentSealedImpl::LazyCheckSchema(SchemaPtr sch,
return;
}

uint64_t current_schema_version;
{
std::shared_lock lck(mutex_);
current_schema_version = schema_->get_schema_version();
}

auto current_schema_version =
schema_version_.load(std::memory_order_acquire);
if (sch->get_schema_version() > current_schema_version) {
LOG_INFO(
"lazy check schema segment {} found newer schema version, "
Expand Down Expand Up @@ -4449,15 +4446,22 @@ ChunkedSegmentSealedImpl::ApplySchemaForReopen(SchemaPtr sch) {
return;
}

auto incoming_schema_version = sch->get_schema_version();
if (incoming_schema_version <=
schema_version_.load(std::memory_order_acquire)) {
return;
}

std::unique_lock lck(mutex_);
if (sch->get_schema_version() <= schema_->get_schema_version()) {
if (incoming_schema_version <= schema_->get_schema_version()) {
return;
}

field_data_ready_bitset_.resize(sch->get_field_id_bitset_size());
index_ready_bitset_.resize(sch->get_field_id_bitset_size());
binlog_index_bitset_.resize(sch->get_field_id_bitset_size());
schema_ = std::move(sch);
schema_version_.store(incoming_schema_version, std::memory_order_release);
}

void
Expand All @@ -4472,6 +4476,12 @@ ChunkedSegmentSealedImpl::Reopen(milvus::OpContext* op_ctx, SchemaPtr sch) {
return;
}

auto incoming_schema_version = sch->get_schema_version();
if (incoming_schema_version <=
schema_version_.load(std::memory_order_acquire)) {
return;
}

std::lock_guard<std::mutex> reopen_guard(reopen_mutex_);
SchemaPtr current_schema;
{
Expand All @@ -4481,7 +4491,7 @@ ChunkedSegmentSealedImpl::Reopen(milvus::OpContext* op_ctx, SchemaPtr sch) {
// Schema-only reopen carries no load-info updates, so equal-version input
// cannot produce work. Reopen(load_info, schema) still accepts equal schema
// versions because load info may have changed independently.
if (sch->get_schema_version() <= current_schema->get_schema_version()) {
if (incoming_schema_version <= current_schema->get_schema_version()) {
return;
}

Expand Down Expand Up @@ -4521,6 +4531,21 @@ ChunkedSegmentSealedImpl::Reopen(
milvus::OpContext* op_ctx,
const milvus::proto::segcore::SegmentLoadInfo& new_load_info,
SchemaPtr new_schema) {
auto incoming_schema_version =
new_schema ? std::optional<uint64_t>(new_schema->get_schema_version())
: std::nullopt;
if (incoming_schema_version.has_value() &&
incoming_schema_version.value() <
schema_version_.load(std::memory_order_acquire)) {
LOG_WARN(
"Skip stale reopen segment {}, current schema version {}, incoming "
"schema version {}",
id_,
schema_version_.load(std::memory_order_relaxed),
incoming_schema_version.value());
return;
}

// reopen_mutex_ serializes top-level writers of segment_load_info_.
// It is held across ApplyLoadDiff so two Reopens never interleave their
// resource mutations. Readers are unaffected — they snapshot via
Expand All @@ -4532,19 +4557,29 @@ ChunkedSegmentSealedImpl::Reopen(
std::shared_lock lck(mutex_);
current_schema = schema_;
}
if (new_schema && new_schema->get_schema_version() <
current_schema->get_schema_version()) {
if (incoming_schema_version.has_value() &&
incoming_schema_version.value() <
current_schema->get_schema_version()) {
LOG_WARN(
"Skip stale reopen segment {}, current schema version {}, incoming "
"schema version {}",
id_,
current_schema->get_schema_version(),
new_schema->get_schema_version());
incoming_schema_version.value());
return;
}

if (!new_schema) {
new_schema = current_schema;
}
if (incoming_schema_version.has_value() &&
incoming_schema_version.value() >
current_schema->get_schema_version()) {
current_schema = new_schema;
}

auto current = std::atomic_load(&segment_load_info_);
auto target_schema = new_schema ? std::move(new_schema) : current_schema;
auto target_schema = std::move(new_schema);

SegmentLoadInfo current_mutable(*current);
SegmentLoadInfo new_local(new_load_info, target_schema);
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
std::mutex reopen_mutex_;

SchemaPtr schema_;
std::atomic<uint64_t> schema_version_;
int64_t id_;
// commit_ts_ is set for import segments to prevent rows with old historical
// timestamps from being visible to queries before T_commit.
Expand Down
Loading