Skip to content

Commit a27f61b

Browse files
authored
enhance: cache reusable format reader metadata (#552)
This change makes repeated reads cheaper by keeping reusable format-reader metadata around for the lifetime of a Reader. The main need here is to avoid reopening the same file metadata over and over when different read paths, projections, predicates, chunk reads, or take calls touch the same column group files. The implementation adds a typed metadata cache for Parquet, Iceberg, Lance, and Vortex readers. Each format now separates the stable file metadata loading step from the stateful reader creation step, so schema, row group layout, footer data, Lance fragment metadata, Vortex file handles, and Iceberg positional delete information can be reused while each actual reader still gets its own projection and predicate state. At a high level, the cache now sits behind ReaderImpl and is shared by all column-group read paths. The important part is that we cache only immutable metadata, then create a fresh reader from that metadata whenever the caller brings a different projection or predicate. ``` ReaderImpl owns MetadataCache owns FormatReaderMetadataCaches<Parquet, Iceberg, Lance, Vortex> owns FormatReaderMetadataCache<ReaderT> entries_[cache_key] = immutable FormatReaderMetadata<Payload> in_flight_loads_[cache_key] = singleflight loader state read flow metadata = cache.get_or_open(key, load_metadata) reader = FormatReader::create_from_metadata(metadata, file, projection, predicate) ``` The cache also tracenhance: cache reusable format reader metadata This change makes repeated reads cheaper by keeping reusable format-reader metadata around for the lifetime of a Reader. The main need here is to avoid reopening the same file metadata over and over when different read paths, projections, predicates, chunk reads, or take calls touch the same column group files. The implementation adds a typed metadata cache for Parquet, Iceberg, Lance, and Vortex readers. Each format now separates the stable file metadata loading step from the stateful reader creation step, so schema, row group layout, footer data, Lance fragment metadata, Vortex file handles, and Iceberg positional delete information can be reused while each actual reader still gets its own projection aks in-flight loads per key, so concurrent requests for the same file wait on the same metadata load instead of doing duplicate work. It is wired through ReaderImpl, ColumnGroupReader, and ColumnGroupLazyReader, so batch reads, chunk readers, and lazy take reads all use the same metadata source while keeping the caller-facing reader behavior unchanged. --------- Signed-off-by: jiaqizho <jiaqi.zhou@zilliz.com>
1 parent c09dd7c commit a27f61b

34 files changed

Lines changed: 3065 additions & 371 deletions

cpp/ffi_exports.map

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
loon_properties_reader_record_batch_max_rows;
5858
loon_properties_reader_record_batch_max_size;
5959
loon_properties_reader_logical_chunk_rows;
60+
loon_properties_reader_metadata_cache_enable;
6061
loon_properties_reader_parquet_prebuffer_hole_size_limit;
6162
loon_properties_reader_parquet_prebuffer_range_size_limit;
6263
loon_properties_reader_vortex_split_row_indices;

cpp/ffi_exports_mac.map

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ _loon_properties_writer_vortex_enable_statistics
5555
_loon_properties_reader_record_batch_max_rows
5656
_loon_properties_reader_record_batch_max_size
5757
_loon_properties_reader_logical_chunk_rows
58+
_loon_properties_reader_metadata_cache_enable
5859
_loon_properties_reader_parquet_prebuffer_hole_size_limit
5960
_loon_properties_reader_parquet_prebuffer_range_size_limit
6061
_loon_properties_reader_vortex_split_row_indices

cpp/include/milvus-storage/ffi_c.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ FFI_EXPORT extern const char* loon_properties_writer_vortex_enable_statistics;
125125
FFI_EXPORT extern const char* loon_properties_reader_record_batch_max_rows;
126126
FFI_EXPORT extern const char* loon_properties_reader_record_batch_max_size;
127127
FFI_EXPORT extern const char* loon_properties_reader_logical_chunk_rows;
128+
FFI_EXPORT extern const char* loon_properties_reader_metadata_cache_enable;
128129
FFI_EXPORT extern const char* loon_properties_reader_parquet_prebuffer_hole_size_limit;
129130
FFI_EXPORT extern const char* loon_properties_reader_parquet_prebuffer_range_size_limit;
130131
FFI_EXPORT extern const char* loon_properties_reader_vortex_split_row_indices;
@@ -561,7 +562,10 @@ FFI_EXPORT LoonFFIResult loon_reader_new(const LoonColumnGroups* column_groups,
561562

562563
/**
563564
* @brief Sets a key retriever callback for dynamic key retrieval
564-
* use to the KMS(key management system) integration
565+
* use to the KMS(key management system) integration.
566+
*
567+
* This is a setup-only API and is not thread-safe with read operations. Call it
568+
* before creating record batch readers, chunk readers, or calling take.
565569
*/
566570
FFI_EXPORT void loon_reader_set_keyretriever(LoonReaderHandle reader,
567571
const char* (*key_retriever)(const char* metadata));

cpp/include/milvus-storage/format/column_group_lazy_reader.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "milvus-storage/column_groups.h"
2121
#include "milvus-storage/properties.h"
22+
#include "milvus-storage/format/format_reader_cache.h"
2223
#include "milvus-storage/format/format_reader.h"
2324
#include "milvus-storage/thread_pool.h"
2425

@@ -31,7 +32,7 @@ class ColumnGroupLazyReader {
3132
/**
3233
* @brief Take a table from the column group
3334
*
34-
* Thread-safe: each call clones the FormatReader, safe for concurrent use on the same object.
35+
* Thread-safe: each call opens independent FormatReaders from reusable metadata.
3536
*
3637
* @param row_indices the row indices to take, MUST be uniqued and sorted
3738
* @return arrow::Result<std::shared_ptr<arrow::Table>>
@@ -44,7 +45,8 @@ class ColumnGroupLazyReader {
4445
const std::shared_ptr<milvus_storage::api::ColumnGroup>& column_group,
4546
const milvus_storage::api::Properties& properties,
4647
const std::vector<std::string>& needed_columns,
47-
const std::function<std::string(const std::string&)>& key_retriever);
48+
const std::function<std::string(const std::string&)>& key_retriever,
49+
const milvus_storage::MetadataCache& cache = milvus_storage::MetadataCache());
4850
};
4951

50-
}; // namespace milvus_storage::api
52+
}; // namespace milvus_storage::api

cpp/include/milvus-storage/format/column_group_reader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "milvus-storage/column_groups.h"
2020
#include "milvus-storage/properties.h"
21+
#include "milvus-storage/format/format_reader_cache.h"
2122
#include "milvus-storage/format/format_reader.h"
2223
#include "milvus-storage/thread_pool.h"
2324

@@ -34,7 +35,7 @@ class ColumnGroupReader {
3435
// NOT thread-safe: concurrent calls on the same object may race on the underlying FormatReader.
3536
virtual arrow::Result<std::shared_ptr<arrow::RecordBatch>> get_chunk(int64_t chunk_index) = 0;
3637

37-
// Thread-safe: each call clones the FormatReader, safe for concurrent use on the same object.
38+
// Thread-safe: each call opens an independent FormatReader from reusable metadata.
3839
virtual arrow::Result<std::vector<std::shared_ptr<arrow::RecordBatch>>> get_chunks(
3940
const std::vector<int64_t>& chunk_indices, size_t parallelism = 1) = 0;
4041

@@ -59,7 +60,8 @@ class ColumnGroupReader {
5960
const std::vector<std::string>& needed_columns,
6061
const milvus_storage::api::Properties& properties,
6162
const std::function<std::string(const std::string&)>& key_retriever,
62-
const std::string& predicate = "");
63+
const std::string& predicate = "",
64+
const milvus_storage::MetadataCache& cache = milvus_storage::MetadataCache());
6365
};
6466

6567
} // namespace milvus_storage::api

cpp/include/milvus-storage/format/format_reader.h

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414

1515
#pragma once
1616

17+
#include <concepts>
18+
#include <cstddef>
19+
#include <cstdint>
20+
#include <functional>
1721
#include <memory>
22+
#include <string>
1823
#include <vector>
1924

2025
#include <arrow/status.h>
@@ -27,6 +32,8 @@
2732

2833
namespace milvus_storage {
2934

35+
using KeyRetriever = std::function<std::string(const std::string&)>;
36+
3037
struct RowGroupInfo {
3138
public:
3239
size_t start_offset;
@@ -36,6 +43,16 @@ struct RowGroupInfo {
3643
std::string ToString() const;
3744
};
3845

46+
template <typename Payload>
47+
struct FormatReaderMetadata {
48+
std::string cache_key;
49+
std::string path;
50+
std::shared_ptr<arrow::Schema> file_schema;
51+
std::vector<RowGroupInfo> row_group_infos;
52+
uint64_t cache_size = 0;
53+
Payload payload;
54+
};
55+
3956
/**
4057
* FormatReader is a reader to read the format file.
4158
* It exists both blocking and streaming read interfaces.
@@ -49,6 +66,12 @@ struct RowGroupInfo {
4966
*/
5067
class FormatReader {
5168
public:
69+
template <typename ReaderT>
70+
using MetaTrait = typename ReaderT::MetaTrait;
71+
72+
template <typename ReaderT>
73+
using MetadataPtr = typename MetaTrait<ReaderT>::MetadataPtr;
74+
5275
virtual ~FormatReader() = default;
5376

5477
// open the format reader, usage to initialize the reader
@@ -88,6 +111,26 @@ class FormatReader {
88111
// set a predicate string for filtering (default no-op for formats that don't support it)
89112
virtual void set_predicate(const std::string& /*predicate*/) {}
90113

114+
// Load reusable file metadata without applying read-time state such as
115+
// projection or predicate. The returned metadata is safe to share through
116+
// MetadataCache and later reuse to create independent readers.
117+
template <typename ReaderT>
118+
static arrow::Result<MetadataPtr<ReaderT>> load_metadata(const api::ColumnGroupFile& file,
119+
const api::Properties& properties,
120+
const KeyRetriever& key_retriever);
121+
122+
// Create a new stateful reader from cached metadata. The file carries
123+
// manifest-owned values such as file_size and footer_size; read_schema,
124+
// needed_columns, and predicate are applied here so callers can create
125+
// independent readers with different projections or filters from the same
126+
// cached metadata.
127+
template <typename ReaderT>
128+
static arrow::Result<std::shared_ptr<ReaderT>> create_from_metadata(MetadataPtr<ReaderT> metadata,
129+
const api::ColumnGroupFile& file,
130+
const std::shared_ptr<arrow::Schema>& read_schema,
131+
const std::vector<std::string>& needed_columns,
132+
const std::string& predicate);
133+
91134
// create format reader
92135
static arrow::Result<std::shared_ptr<FormatReader>> create(
93136
const std::shared_ptr<arrow::Schema>& read_schema,
@@ -99,4 +142,57 @@ class FormatReader {
99142

100143
}; // class FormatReader
101144

145+
template <typename ReaderT>
146+
concept FormatReaderWithMetadata =
147+
std::derived_from<ReaderT, FormatReader> && requires(const api::ColumnGroupFile& file,
148+
const api::Properties& properties,
149+
const KeyRetriever& key_retriever,
150+
typename ReaderT::MetaTrait::MetadataPtr metadata,
151+
const api::ColumnGroupFile& metadata_file,
152+
const std::shared_ptr<arrow::Schema>& read_schema,
153+
const std::vector<std::string>& needed_columns,
154+
const std::string& predicate) {
155+
typename ReaderT::MetaTrait::Payload;
156+
typename ReaderT::MetaTrait::Metadata;
157+
typename ReaderT::MetaTrait::MetadataPtr;
158+
159+
requires std::same_as<typename ReaderT::MetaTrait::Metadata,
160+
FormatReaderMetadata<typename ReaderT::MetaTrait::Payload>>;
161+
requires std::same_as<typename ReaderT::MetaTrait::MetadataPtr,
162+
std::shared_ptr<const typename ReaderT::MetaTrait::Metadata>>;
163+
164+
{ ReaderT::MetaTrait::cache_key(file) } -> std::convertible_to<std::string>;
165+
{
166+
ReaderT::MetaTrait::load_metadata(file, properties, key_retriever)
167+
} -> std::same_as<arrow::Result<typename ReaderT::MetaTrait::MetadataPtr>>;
168+
{
169+
ReaderT::MetaTrait::create_from_metadata(metadata, metadata_file, read_schema, needed_columns, predicate)
170+
} -> std::same_as<arrow::Result<std::shared_ptr<ReaderT>>>;
171+
{ metadata->row_group_infos } -> std::same_as<const std::vector<RowGroupInfo>&>;
172+
{ metadata->file_schema } -> std::same_as<const std::shared_ptr<arrow::Schema>&>;
173+
};
174+
175+
template <typename ReaderT>
176+
arrow::Result<FormatReader::MetadataPtr<ReaderT>> FormatReader::load_metadata(const api::ColumnGroupFile& file,
177+
const api::Properties& properties,
178+
const KeyRetriever& key_retriever) {
179+
static_assert(FormatReaderWithMetadata<ReaderT>,
180+
"ReaderT must derive from FormatReader and define MetaTrait with Payload, Metadata, MetadataPtr, "
181+
"cache_key, load_metadata, and create_from_metadata.");
182+
return ReaderT::MetaTrait::load_metadata(file, properties, key_retriever);
183+
}
184+
185+
template <typename ReaderT>
186+
arrow::Result<std::shared_ptr<ReaderT>> FormatReader::create_from_metadata(
187+
MetadataPtr<ReaderT> metadata,
188+
const api::ColumnGroupFile& file,
189+
const std::shared_ptr<arrow::Schema>& read_schema,
190+
const std::vector<std::string>& needed_columns,
191+
const std::string& predicate) {
192+
static_assert(FormatReaderWithMetadata<ReaderT>,
193+
"ReaderT must derive from FormatReader and define MetaTrait with Payload, Metadata, MetadataPtr, "
194+
"cache_key, load_metadata, and create_from_metadata.");
195+
return ReaderT::MetaTrait::create_from_metadata(metadata, file, read_schema, needed_columns, predicate);
196+
}
197+
102198
} // namespace milvus_storage
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright 2023 Zilliz
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <concepts>
18+
#include <condition_variable>
19+
#include <functional>
20+
#include <memory>
21+
#include <mutex>
22+
#include <optional>
23+
#include <string>
24+
#include <typeindex>
25+
#include <tuple>
26+
#include <unordered_map>
27+
#include <utility>
28+
29+
#include <arrow/result.h>
30+
#include <arrow/status.h>
31+
32+
#include "milvus-storage/format/format_reader.h"
33+
34+
namespace milvus_storage {
35+
36+
namespace iceberg {
37+
class IcebergFormatReader;
38+
} // namespace iceberg
39+
40+
namespace lance {
41+
class LanceTableReader;
42+
} // namespace lance
43+
44+
namespace parquet {
45+
class ParquetFormatReader;
46+
} // namespace parquet
47+
48+
namespace vortex {
49+
class VortexFormatReader;
50+
} // namespace vortex
51+
52+
// Thread-safe metadata cache for one concrete FormatReader type.
53+
// Cached metadata is immutable and can be reused to create independent
54+
// stateful readers with different projections or predicates.
55+
template <typename ReaderT>
56+
class FormatReaderMetadataCache final {
57+
static_assert(FormatReaderWithMetadata<ReaderT>,
58+
"ReaderT must derive from FormatReader and define MetaTrait with Payload, Metadata, MetadataPtr, "
59+
"cache_key, load_metadata, and create_from_metadata.");
60+
61+
public:
62+
using ReaderType = ReaderT;
63+
using Trait = typename FormatReader::template MetaTrait<ReaderT>;
64+
using MetadataPtr = typename Trait::MetadataPtr;
65+
using MetadataLoader = std::function<arrow::Result<MetadataPtr>()>;
66+
67+
std::optional<MetadataPtr> get(const std::string& key) const;
68+
69+
arrow::Status add(std::string key, MetadataPtr metadata);
70+
71+
arrow::Result<MetadataPtr> get_or_open(const std::string& key, const MetadataLoader& load_fn);
72+
73+
private:
74+
struct Entry {
75+
MetadataPtr metadata;
76+
};
77+
78+
// Per-key singleflight state. The first cache miss creates this marker and
79+
// runs load_fn outside mutex_; waiters for the same key block on cv while
80+
// unrelated keys can still load concurrently.
81+
struct InFlightLoad {
82+
std::condition_variable cv;
83+
bool done = false;
84+
arrow::Status status = arrow::Status::OK();
85+
MetadataPtr metadata;
86+
};
87+
88+
mutable std::mutex mutex_;
89+
std::unordered_map<std::string, Entry> entries_;
90+
std::unordered_map<std::string, std::shared_ptr<InFlightLoad>> in_flight_loads_;
91+
};
92+
93+
// Owns one typed metadata cache for each ReaderT in the template list.
94+
// Callers still retrieve caches statically with get<ReaderT>(); this class only
95+
// groups the per-format caches into one value that can be embedded elsewhere.
96+
template <typename... ReaderTs>
97+
class FormatReaderMetadataCaches final {
98+
public:
99+
FormatReaderMetadataCaches() : caches_(std::make_shared<FormatReaderMetadataCache<ReaderTs>>()...) {}
100+
101+
template <typename ReaderT>
102+
[[nodiscard]] std::shared_ptr<FormatReaderMetadataCache<ReaderT>> get() const {
103+
static_assert((std::same_as<ReaderT, ReaderTs> || ...), "ReaderT must be a supported metadata cache reader type");
104+
return std::get<std::shared_ptr<FormatReaderMetadataCache<ReaderT>>>(caches_);
105+
}
106+
107+
private:
108+
std::tuple<std::shared_ptr<FormatReaderMetadataCache<ReaderTs>>...> caches_;
109+
};
110+
111+
// Public cache handle carried by ReaderImpl and passed down to column-group
112+
// readers. Concrete reader headers are intentionally not included here, so
113+
// installed consumers can include public reader headers without private bridge
114+
// headers from the source tree.
115+
class MetadataCache final {
116+
public:
117+
explicit MetadataCache(bool enabled = true);
118+
119+
[[nodiscard]] bool enabled() const { return enabled_; }
120+
121+
template <typename ReaderT>
122+
[[nodiscard]] std::shared_ptr<FormatReaderMetadataCache<ReaderT>> get() const {
123+
static_assert(FormatReaderWithMetadata<ReaderT>,
124+
"ReaderT must derive from FormatReader and define MetaTrait with Payload, Metadata, MetadataPtr, "
125+
"cache_key, load_metadata, and create_from_metadata.");
126+
127+
std::lock_guard<std::mutex> lock(state_->mutex);
128+
auto [it, inserted] = state_->caches.try_emplace(std::type_index(typeid(ReaderT)));
129+
if (inserted || !it->second) {
130+
it->second = std::make_shared<FormatReaderMetadataCache<ReaderT>>();
131+
}
132+
return std::static_pointer_cast<FormatReaderMetadataCache<ReaderT>>(it->second);
133+
}
134+
135+
template <typename Visitor>
136+
auto dispatch(const std::string& format, Visitor&& visitor) const {
137+
using ReturnT = decltype(std::forward<Visitor>(visitor)(get<parquet::ParquetFormatReader>()));
138+
139+
if (format == LOON_FORMAT_PARQUET) {
140+
return std::forward<Visitor>(visitor)(get<parquet::ParquetFormatReader>());
141+
}
142+
if (format == LOON_FORMAT_VORTEX) {
143+
return std::forward<Visitor>(visitor)(get<vortex::VortexFormatReader>());
144+
}
145+
if (format == LOON_FORMAT_LANCE_TABLE) {
146+
return std::forward<Visitor>(visitor)(get<lance::LanceTableReader>());
147+
}
148+
if (format == LOON_FORMAT_ICEBERG_TABLE) {
149+
return std::forward<Visitor>(visitor)(get<iceberg::IcebergFormatReader>());
150+
}
151+
152+
return ReturnT(arrow::Status::Invalid("Unknown column group format: ", format));
153+
}
154+
155+
private:
156+
struct State {
157+
mutable std::mutex mutex;
158+
std::unordered_map<std::type_index, std::shared_ptr<void>> caches;
159+
};
160+
161+
bool enabled_ = true;
162+
std::shared_ptr<State> state_ = std::make_shared<State>();
163+
};
164+
165+
} // namespace milvus_storage

0 commit comments

Comments
 (0)