Skip to content

Commit 02f68c8

Browse files
authored
feat: support read for nested type sub column (#355)
1 parent 6ba9f6c commit 02f68c8

42 files changed

Lines changed: 3834 additions & 217 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

include/paimon/read_context.h

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <string>
2424
#include <vector>
2525

26+
#include "arrow/c/abi.h"
2627
#include "paimon/cache/cache.h"
2728
#include "paimon/predicate/predicate.h"
2829
#include "paimon/result.h"
@@ -44,7 +45,7 @@ class FileSystem;
4445
class PAIMON_EXPORT ReadContext {
4546
public:
4647
ReadContext(const std::string& path, const std::string& branch,
47-
const std::vector<std::string>& read_schema,
48+
const std::vector<std::string>& read_field_names,
4849
const std::vector<int32_t>& read_field_ids,
4950
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
5051
bool enable_prefetch, uint32_t prefetch_batch_count,
@@ -75,8 +76,8 @@ class PAIMON_EXPORT ReadContext {
7576
return options_;
7677
}
7778

78-
const std::vector<std::string>& GetReadSchema() const {
79-
return read_schema_;
79+
const std::vector<std::string>& GetReadFieldNames() const {
80+
return read_field_names_;
8081
}
8182

8283
const std::vector<int32_t>& GetReadFieldIds() const {
@@ -130,10 +131,26 @@ class PAIMON_EXPORT ReadContext {
130131
return cache_;
131132
}
132133

134+
/// Whether a read schema (C ArrowSchema) for nested column pruning was provided.
135+
bool HasReadSchema() const {
136+
return read_schema_ != nullptr && read_schema_->release != nullptr;
137+
}
138+
139+
/// Get the read schema as a mutable C ArrowSchema pointer.
140+
/// ImportSchema will consume (release) the schema content.
141+
ArrowSchema* GetReadSchema() {
142+
return read_schema_.get();
143+
}
144+
145+
/// Set the read schema from a C ArrowSchema unique_ptr and take ownership of
146+
/// schema resources (released via ArrowSchema::release in destructor).
147+
/// Called internally by ReadContextBuilder.
148+
void SetReadSchema(std::unique_ptr<ArrowSchema> schema);
149+
133150
private:
134151
std::string path_;
135152
std::string branch_;
136-
std::vector<std::string> read_schema_;
153+
std::vector<std::string> read_field_names_;
137154
std::vector<int32_t> read_field_ids_;
138155
std::shared_ptr<Predicate> predicate_;
139156
bool enable_predicate_filter_;
@@ -151,6 +168,8 @@ class PAIMON_EXPORT ReadContext {
151168
PrefetchCacheMode prefetch_cache_mode_;
152169
CacheConfig cache_config_;
153170
std::shared_ptr<Cache> cache_;
171+
// Owns schema resources and releases ArrowSchema::release in destructor.
172+
std::unique_ptr<ArrowSchema> read_schema_;
154173
};
155174

156175
/// `ReadContextBuilder` used to build a `ReadContext`, has input validation.
@@ -173,9 +192,9 @@ class PAIMON_EXPORT ReadContextBuilder {
173192
///
174193
/// @param read_field_names Vector of field names to read from the table.
175194
/// @return Reference to this builder for method chaining.
176-
/// @note Currently supports top-level field selection. Future versions may support
177-
/// nested field selection using ArrowSchema for more granular projection
178-
ReadContextBuilder& SetReadSchema(const std::vector<std::string>& read_field_names);
195+
/// @note Currently supports top-level field selection. For nested field selection
196+
/// use SetReadSchema(std::unique_ptr<ArrowSchema>) instead.
197+
ReadContextBuilder& SetReadFieldNames(const std::vector<std::string>& read_field_names);
179198
/// Set the schema fields to read from the table.
180199
///
181200
/// If not set, all fields from the table schema will be read. This is useful for
@@ -184,12 +203,51 @@ class PAIMON_EXPORT ReadContextBuilder {
184203
///
185204
/// @param read_field_ids Vector of field ids to read from the table.
186205
/// @return Reference to this builder for method chaining.
187-
/// @note Currently supports top-level field selection. Future versions may support
188-
/// nested field selection using ArrowSchema for more granular projection.
189-
/// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
190-
/// Calling both will ignore the read schema set by SetReadSchema().
206+
/// @note Currently supports top-level field selection.
207+
/// @note SetReadFieldIds() and SetReadFieldNames() are mutually exclusive.
208+
/// Calling both will ignore the read schema set by SetReadFieldNames().
191209
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);
192210

211+
/// Set the read Arrow Schema for nested column pruning.
212+
///
213+
/// The read schema is an Arrow C Data Interface schema where STRUCT types
214+
/// may contain only a subset of the original sub-fields, enabling nested column
215+
/// pruning to reduce I/O. Field matching is based on field name: the system
216+
/// looks up each field by name in the table schema and rebuilds the aligned
217+
/// schema using the table schema's type and metadata. Metadata propagation
218+
/// from the user-provided schema is whitelist-based: currently only
219+
/// "paimon.map.selected-keys" is preserved and merged into the final aligned
220+
/// schema.
221+
///
222+
/// To prune map entries by key, attach metadata "paimon.map.selected-keys"
223+
/// to the target map field in read schema. The value is a comma-separated
224+
/// key list, for example: "k1,k2". Only map fields with string key type
225+
/// (Arrow utf8) are supported.
226+
///
227+
/// Example:
228+
/// @code{.cpp}
229+
/// auto map_field = arrow::field("m", arrow::map(arrow::utf8(), arrow::int32()));
230+
/// auto map_meta = arrow::KeyValueMetadata::Make(
231+
/// {"paimon.map.selected-keys"}, {"k1,k2"});
232+
/// auto projected_schema = arrow::schema({
233+
/// arrow::field("id", arrow::int64()),
234+
/// map_field->WithMetadata(map_meta),
235+
/// });
236+
///
237+
/// auto c_schema = std::make_unique<ArrowSchema>();
238+
/// arrow::ExportSchema(*projected_schema, c_schema.get());
239+
///
240+
/// ReadContextBuilder builder("/path/to/table");
241+
/// builder.SetReadSchema(std::move(c_schema));
242+
/// @endcode
243+
///
244+
/// @param read_schema Arrow C Schema. Ownership of schema resources is transferred
245+
/// to the built ReadContext.
246+
/// @return Reference to this builder for method chaining.
247+
/// @note Priority: read_schema > read_field_ids > read_field_names.
248+
/// When set, read_field_ids and read_field_names are ignored.
249+
ReadContextBuilder& SetReadSchema(std::unique_ptr<ArrowSchema> read_schema);
250+
193251
/// Set a configuration options map to set some option entries which are not defined in the
194252
/// table schema or whose values you want to overwrite.
195253
/// @note The options map will clear the options added by `AddOption()` before.

src/paimon/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ set(PAIMON_CORE_SRCS
350350
core/utils/blob_view_lookup.cpp
351351
core/utils/consumer_manager.cpp
352352
core/utils/field_mapping.cpp
353+
core/utils/nested_projection_utils.cpp
353354
core/utils/file_store_path_factory.cpp
354355
core/utils/file_utils.cpp
355356
core/utils/manifest_meta_reader.cpp
@@ -744,6 +745,7 @@ if(PAIMON_BUILD_TESTS)
744745
core/utils/consumer_manager_test.cpp
745746
core/utils/file_store_path_factory_cache_test.cpp
746747
core/utils/field_mapping_test.cpp
748+
core/utils/nested_projection_utils_test.cpp
747749
core/utils/file_store_path_factory_test.cpp
748750
core/utils/file_utils_test.cpp
749751
core/utils/manifest_meta_reader_test.cpp

src/paimon/common/memory/memory_segment_test.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,7 @@ TEST(MemorySegmentTest, TestDoubleAccess) {
555555
delete[] occupied;
556556
}
557557

558-
// ------------------------------------------------------------------------
559-
// Bulk Byte Movements
560-
// ------------------------------------------------------------------------
561-
558+
// Bulk Byte Movements
562559
TEST(MemorySegmentTest, TestBulkByteAccess) {
563560
auto pool = paimon::GetDefaultPool();
564561
// test expected correct behavior with default offset / length

src/paimon/common/types/data_field.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,38 @@ Result<std::vector<DataField>> DataField::ProjectFields(
173173
return projected_fields;
174174
}
175175

176+
std::shared_ptr<arrow::Field> DataField::MergeFieldMetadataByWhitelist(
177+
const std::shared_ptr<arrow::Field>& target_field,
178+
const std::shared_ptr<arrow::Field>& source_field,
179+
const std::vector<std::string>& metadata_keys_whitelist) {
180+
if (!source_field || !source_field->HasMetadata() || !source_field->metadata()) {
181+
return target_field;
182+
}
183+
184+
std::unordered_map<std::string, std::string> metadata_map;
185+
for (const auto& key : metadata_keys_whitelist) {
186+
auto metadata_value_result = source_field->metadata()->Get(key);
187+
if (metadata_value_result.ok()) {
188+
metadata_map[key] = metadata_value_result.ValueUnsafe();
189+
}
190+
}
191+
192+
if (metadata_map.empty()) {
193+
return target_field;
194+
}
195+
196+
auto metadata = std::make_shared<arrow::KeyValueMetadata>(metadata_map);
197+
return target_field->WithMergedMetadata(metadata);
198+
}
199+
200+
DataField DataField::MergeFieldMetadataByWhitelist(
201+
const DataField& target_field, const DataField& source_field,
202+
const std::vector<std::string>& metadata_keys_whitelist) {
203+
return DataField(
204+
target_field.Id(),
205+
MergeFieldMetadataByWhitelist(target_field.ArrowField(), source_field.ArrowField(),
206+
metadata_keys_whitelist),
207+
target_field.Description());
208+
}
209+
176210
} // namespace paimon

src/paimon/common/types/data_field.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ class DataField : public Jsonizable<DataField> {
4141

4242
static constexpr char FIELD_ID[] = "paimon.id";
4343
static constexpr char DESCRIPTION[] = "paimon.description";
44+
/// Metadata key for map field selected keys. The value is a comma-separated
45+
/// string of key names, e.g. 'key1,key2'. Only string-keyed maps are supported.
46+
static constexpr char MAP_SELECTED_KEYS[] = "paimon.map.selected-keys";
4447

4548
public:
4649
static std::shared_ptr<arrow::Field> ConvertDataFieldToArrowField(const DataField& field);
@@ -63,6 +66,17 @@ class DataField : public Jsonizable<DataField> {
6366
const std::vector<DataField>& fields,
6467
const std::optional<std::vector<std::string>>& projected_cols);
6568

69+
/// Merge whitelisted metadata from source_field into target_field.
70+
static std::shared_ptr<arrow::Field> MergeFieldMetadataByWhitelist(
71+
const std::shared_ptr<arrow::Field>& target_field,
72+
const std::shared_ptr<arrow::Field>& source_field,
73+
const std::vector<std::string>& metadata_keys_whitelist);
74+
75+
/// Merge whitelisted metadata from source_field into target_field and keep target id/desc.
76+
static DataField MergeFieldMetadataByWhitelist(
77+
const DataField& target_field, const DataField& source_field,
78+
const std::vector<std::string>& metadata_keys_whitelist);
79+
6680
int32_t Id() const {
6781
return id_;
6882
}

src/paimon/core/global_index/global_index_write_task.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ Result<std::unique_ptr<BatchReader>> CreateBatchReader(
8383
.WithFileSystem(core_options.GetFileSystem())
8484
.EnablePrefetch(true)
8585
.WithMemoryPool(pool)
86-
.SetReadSchema({field_name, SpecialFields::RowId().Name()});
86+
.SetReadFieldNames({field_name, SpecialFields::RowId().Name()});
8787
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReadContext> read_context,
8888
read_context_builder.Finish());
8989
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<TableRead> table_read,

0 commit comments

Comments
 (0)