Skip to content

Commit cfe8260

Browse files
committed
feat(cache): support parquet metadata cache
1 parent 0dbbda6 commit cfe8260

17 files changed

Lines changed: 507 additions & 22 deletions

include/paimon/api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "paimon/factories/factory.h" // IWYU pragma: export
2424
#include "paimon/file_store_commit.h" // IWYU pragma: export
2525
#include "paimon/file_store_write.h" // IWYU pragma: export
26+
#include "paimon/format/parquet.h" // IWYU pragma: export
2627
#include "paimon/fs/file_system_factory.h" // IWYU pragma: export
2728
#include "paimon/memory/memory_pool.h" // IWYU pragma: export
2829
#include "paimon/predicate/predicate.h" // IWYU pragma: export

include/paimon/format/parquet.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
21+
#include "paimon/status.h"
22+
#include "paimon/visibility.h"
23+
24+
namespace paimon::parquet {
25+
26+
/// Resize the process-wide parquet metadata cache. `max_bytes <= 0` disables the
27+
/// cache for subsequently created readers, and shrinks the existing cache down to
28+
/// the new limit immediately (entries evicted in LRU order). The cache is
29+
/// initialized eagerly when the parquet file format factory is registered, so this
30+
/// function is safe to call at any time.
31+
PAIMON_EXPORT Status ResizeParquetMetadataCache(int64_t max_bytes);
32+
33+
} // namespace paimon::parquet

src/paimon/common/utils/generic_lru_cache.h

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include <atomic>
1920
#include <chrono>
2021
#include <cstdint>
2122
#include <functional>
@@ -63,6 +64,8 @@ class GenericLruCache {
6364
/// Configuration options for the cache.
6465
struct Options {
6566
/// Maximum total weight of all entries. Entries are evicted (LRU) when exceeded.
67+
/// Used only as the initial value for the cache's runtime-mutable max weight;
68+
/// after construction, use GetMaxWeight()/SetMaxWeight() to read or update it.
6669
int64_t max_weight = INT64_MAX;
6770

6871
/// Time in milliseconds after last access before an entry expires.
@@ -77,7 +80,8 @@ class GenericLruCache {
7780
RemovalCallback removal_callback = nullptr;
7881
};
7982

80-
explicit GenericLruCache(Options options) : options_(std::move(options)) {}
83+
explicit GenericLruCache(Options options)
84+
: options_(std::move(options)), max_weight_(options_.max_weight) {}
8185

8286
/// Look up a key in the cache. On hit, promotes the entry to the front (most recently
8387
/// used) and updates its access time. Returns std::nullopt on miss or if the entry
@@ -101,7 +105,7 @@ class GenericLruCache {
101105
// Cache miss: load via supplier outside the lock
102106
PAIMON_ASSIGN_OR_RAISE(V value, supplier(key));
103107
int64_t weight = ComputeWeight(key, value);
104-
if (weight > options_.max_weight) {
108+
if (weight > max_weight_.load(std::memory_order_relaxed)) {
105109
return value;
106110
}
107111

@@ -122,10 +126,11 @@ class GenericLruCache {
122126
/// @return Status::Invalid if the entry's weight exceeds max_weight, Status::OK otherwise.
123127
Status Put(const K& key, V value) {
124128
int64_t weight = ComputeWeight(key, value);
125-
if (weight > options_.max_weight) {
129+
int64_t max_weight = max_weight_.load(std::memory_order_relaxed);
130+
if (weight > max_weight) {
126131
return Status::Invalid(
127132
fmt::format("Entry weight {} exceeds cache max weight {}, entry will not be cached",
128-
weight, options_.max_weight));
133+
weight, max_weight));
129134
}
130135

131136
std::unique_lock<std::shared_mutex> lock(mutex_);
@@ -176,7 +181,22 @@ class GenericLruCache {
176181

177182
/// @return The maximum weight configured for this cache.
178183
int64_t GetMaxWeight() const {
179-
return options_.max_weight;
184+
return max_weight_.load(std::memory_order_relaxed);
185+
}
186+
187+
/// Update the maximum total weight at runtime. The new limit is published with
188+
/// release semantics so subsequent insertions on other threads observe it without
189+
/// needing the cache lock. After updating, this method also acquires the write
190+
/// lock and runs EvictIfNeeded() so that:
191+
/// 1. expired entries are reaped opportunistically;
192+
/// 2. when the new limit is smaller than current_weight_, entries are evicted
193+
/// down to the new limit immediately rather than being held until the next
194+
/// insertion (relevant when the cache is being shrunk or disabled and no
195+
/// further inserts are expected).
196+
void SetMaxWeight(int64_t new_max_weight) {
197+
max_weight_.store(new_max_weight, std::memory_order_relaxed);
198+
std::unique_lock<std::shared_mutex> lock(mutex_);
199+
EvictIfNeeded();
180200
}
181201

182202
private:
@@ -267,7 +287,8 @@ class GenericLruCache {
267287
/// Evict expired entries from the tail, then evict by weight if still over capacity.
268288
void EvictIfNeeded() {
269289
EvictExpired();
270-
while (current_weight_ > options_.max_weight && !lru_list_.empty()) {
290+
while (current_weight_ > max_weight_.load(std::memory_order_relaxed) &&
291+
!lru_list_.empty()) {
271292
RemoveEntry(std::prev(lru_list_.end()), RemovalCause::SIZE);
272293
}
273294
}
@@ -317,6 +338,10 @@ class GenericLruCache {
317338
}
318339

319340
Options options_;
341+
/// Runtime-mutable maximum total weight. Read on the hot path without holding
342+
/// `mutex_` (e.g. in Get/Put before locking, and inside EvictIfNeeded under
343+
/// the write lock). Writes go through SetMaxWeight() which uses relaxed atomics.
344+
std::atomic<int64_t> max_weight_;
320345
int64_t current_weight_ = 0;
321346
EntryList lru_list_;
322347
EntryMap lru_map_;

src/paimon/format/parquet/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ set(PAIMON_PARQUET_FILE_FORMAT
1919
page_filtered_row_group_reader.cpp
2020
parquet_timestamp_converter.cpp
2121
parquet_file_batch_reader.cpp
22+
parquet_file_format.cpp
2223
parquet_file_format_factory.cpp
2324
parquet_format_writer.cpp
25+
parquet_metadata_cache.cpp
2426
parquet_schema_util.cpp
2527
parquet_stats_extractor.cpp
2628
parquet_writer_builder.cpp
@@ -57,6 +59,7 @@ if(PAIMON_BUILD_TESTS)
5759
parquet_field_id_converter_test.cpp
5860
parquet_file_batch_reader_test.cpp
5961
parquet_format_writer_test.cpp
62+
parquet_metadata_cache_test.cpp
6063
parquet_stats_extractor_test.cpp
6164
parquet_writer_builder_test.cpp
6265
predicate_converter_test.cpp

src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test {
120120
options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = "true";
121121
ASSERT_OK_AND_ASSIGN(
122122
auto batch_reader,
123-
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size));
123+
ParquetFileBatchReader::Create(std::move(in_stream), arrow_pool_, options, batch_size,
124+
/*file_metadata=*/nullptr));
124125
auto c_schema = std::make_unique<ArrowSchema>();
125126
ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok());
126127
ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate,

src/paimon/format/parquet/parquet_file_batch_reader.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ ParquetFileBatchReader::ParquetFileBatchReader(
7272
Result<std::unique_ptr<ParquetFileBatchReader>> ParquetFileBatchReader::Create(
7373
std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
7474
const std::shared_ptr<arrow::MemoryPool>& pool,
75-
const std::map<std::string, std::string>& options, int32_t batch_size) {
75+
const std::map<std::string, std::string>& options, int32_t batch_size,
76+
std::shared_ptr<::parquet::FileMetaData> file_metadata) {
7677
try {
7778
assert(input_stream);
7879
PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties,
@@ -82,7 +83,8 @@ Result<std::unique_ptr<ParquetFileBatchReader>> ParquetFileBatchReader::Create(
8283
CreateArrowReaderProperties(pool, options, batch_size));
8384

8485
::parquet::arrow::FileReaderBuilder file_reader_builder;
85-
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream, reader_properties));
86+
PAIMON_RETURN_NOT_OK_FROM_ARROW(
87+
file_reader_builder.Open(input_stream, reader_properties, std::move(file_metadata)));
8688

8789
std::unique_ptr<::parquet::arrow::FileReader> file_reader;
8890
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get())

src/paimon/format/parquet/parquet_file_batch_reader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader {
6464
static Result<std::unique_ptr<ParquetFileBatchReader>> Create(
6565
std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
6666
const std::shared_ptr<arrow::MemoryPool>& pool,
67-
const std::map<std::string, std::string>& options, int32_t batch_size);
67+
const std::map<std::string, std::string>& options, int32_t batch_size,
68+
std::shared_ptr<::parquet::FileMetaData> file_metadata);
6869

6970
// For timestamp type, we return the schema stored in file, e.g., second in parquet file will
7071
// store as milli.
@@ -171,6 +172,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader {
171172
const std::vector<int32_t>& src_row_groups);
172173

173174
private:
175+
friend class ParquetReaderBuilder;
176+
174177
std::map<std::string, std::string> options_;
175178
// hold the lifecycle of arrow memory pool.
176179
std::shared_ptr<arrow::MemoryPool> arrow_pool_;

src/paimon/format/parquet/parquet_file_batch_reader_test.cpp

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ class ParquetFileBatchReaderTest : public ::testing::Test,
143143
const std::optional<RoaringBitmap32>& selection_bitmap, int32_t batch_size) const {
144144
EXPECT_OK_AND_ASSIGN(
145145
auto parquet_batch_reader,
146-
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size));
146+
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size,
147+
/*file_metadata=*/nullptr));
147148
std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
148149
auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
149150
EXPECT_TRUE(arrow_status.ok());
@@ -203,6 +204,38 @@ TEST_F(ParquetFileBatchReaderTest, TestReadBinaryWrittenFromBinaryAndLargeBinary
203204
check_binary_read_result(arrow::large_binary(), "large-binary.parquet");
204205
}
205206

207+
TEST_F(ParquetFileBatchReaderTest, TestReadRewrittenFileWithoutMetadataCache) {
208+
auto field = arrow::field("f0", arrow::int32());
209+
auto schema = arrow::schema({field});
210+
211+
auto first_array = std::dynamic_pointer_cast<arrow::StructArray>(
212+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), R"([[1]])")
213+
.ValueOrDie());
214+
WriteArray(file_path_, first_array, schema, /*write_batch_size=*/first_array->length(),
215+
/*enable_dictionary=*/false, /*max_row_group_length=*/first_array->length());
216+
217+
auto first_reader = PrepareParquetFileBatchReader(
218+
file_path_, schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size_);
219+
ASSERT_OK_AND_ASSIGN(auto first_result,
220+
paimon::test::ReadResultCollector::CollectResult(first_reader.get()));
221+
auto first_expected = std::make_shared<arrow::ChunkedArray>(first_array);
222+
ASSERT_TRUE(first_result->Equals(first_expected));
223+
224+
auto second_array = std::dynamic_pointer_cast<arrow::StructArray>(
225+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}),
226+
R"([[1], [2], [3], [4]])")
227+
.ValueOrDie());
228+
WriteArray(file_path_, second_array, schema, /*write_batch_size=*/second_array->length(),
229+
/*enable_dictionary=*/false, /*max_row_group_length=*/second_array->length());
230+
231+
auto second_reader = PrepareParquetFileBatchReader(
232+
file_path_, schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size_);
233+
ASSERT_OK_AND_ASSIGN(auto second_result,
234+
paimon::test::ReadResultCollector::CollectResult(second_reader.get()));
235+
auto second_expected = std::make_shared<arrow::ChunkedArray>(second_array);
236+
ASSERT_TRUE(second_result->Equals(second_expected));
237+
}
238+
206239
TEST_F(ParquetFileBatchReaderTest, TestSimple) {
207240
std::string file_name = paimon::test::GetDataDir() +
208241
"/parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
@@ -229,7 +262,8 @@ TEST_F(ParquetFileBatchReaderTest, TestSetReadSchema) {
229262
std::map<std::string, std::string> options;
230263
ASSERT_OK_AND_ASSIGN(
231264
auto parquet_batch_reader,
232-
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_));
265+
ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_,
266+
/*file_metadata=*/nullptr));
233267
// test GetFileSchema()
234268
ASSERT_OK_AND_ASSIGN(auto c_file_schema, parquet_batch_reader->GetFileSchema());
235269
auto arrow_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie();
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/factories/factory_creator.h"
18+
#include "paimon/format/parquet.h"
19+
#include "paimon/format/parquet/parquet_file_format_factory.h"
20+
#include "paimon/status.h"
21+
22+
namespace paimon::parquet {
23+
24+
Status ResizeParquetMetadataCache(int64_t max_bytes) {
25+
auto* factory_creator = ::paimon::FactoryCreator::GetInstance();
26+
if (factory_creator == nullptr) {
27+
return Status::Invalid("FactoryCreator is not initialized");
28+
}
29+
auto* factory = dynamic_cast<ParquetFileFormatFactory*>(
30+
factory_creator->Create(ParquetFileFormatFactory::IDENTIFIER));
31+
if (factory == nullptr) {
32+
return Status::Invalid("ParquetFileFormatFactory is not registered");
33+
}
34+
return factory->ResizeMetadataCache(max_bytes);
35+
}
36+
37+
} // namespace paimon::parquet

src/paimon/format/parquet/parquet_file_format.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717
#pragma once
1818

1919
#include <cassert>
20+
#include <cstdint>
2021
#include <map>
2122
#include <memory>
2223
#include <string>
24+
#include <utility>
2325

2426
#include "arrow/c/bridge.h"
25-
#include "arrow/c/helpers.h"
2627
#include "paimon/common/utils/arrow/status_utils.h"
2728
#include "paimon/format/file_format.h"
2829
#include "paimon/format/parquet/parquet_field_id_converter.h"
30+
#include "paimon/format/parquet/parquet_metadata_cache.h"
2931
#include "paimon/format/parquet/parquet_reader_builder.h"
3032
#include "paimon/format/parquet/parquet_stats_extractor.h"
3133
#include "paimon/format/parquet/parquet_writer_builder.h"
34+
#include "paimon/result.h"
3235

3336
struct ArrowSchema;
3437

@@ -42,15 +45,16 @@ namespace parquet {
4245

4346
class ParquetFileFormat : public FileFormat {
4447
public:
45-
explicit ParquetFileFormat(const std::map<std::string, std::string>& options)
46-
: identifier_("parquet"), options_(options) {}
48+
ParquetFileFormat(const std::map<std::string, std::string>& options,
49+
std::shared_ptr<ParquetMetadataCache> metadata_cache)
50+
: identifier_("parquet"), options_(options), metadata_cache_(std::move(metadata_cache)) {}
4751

4852
const std::string& Identifier() const override {
4953
return identifier_;
5054
}
5155

5256
Result<std::unique_ptr<ReaderBuilder>> CreateReaderBuilder(int32_t batch_size) const override {
53-
return std::make_unique<ParquetReaderBuilder>(options_, batch_size);
57+
return std::make_unique<ParquetReaderBuilder>(options_, batch_size, metadata_cache_);
5458
}
5559

5660
Result<std::unique_ptr<WriterBuilder>> CreateWriterBuilder(::ArrowSchema* schema,
@@ -72,6 +76,9 @@ class ParquetFileFormat : public FileFormat {
7276
protected:
7377
std::string identifier_;
7478
std::map<std::string, std::string> options_;
79+
/// Process-wide parquet metadata cache injected by the factory. May be nullptr
80+
/// when the cache is disabled.
81+
std::shared_ptr<ParquetMetadataCache> metadata_cache_;
7582
};
7683

7784
} // namespace parquet

0 commit comments

Comments
 (0)