Skip to content

Commit 2d8e7cc

Browse files
authored
feat(blob): Support null blob and multiple blob fields write and commit (#286)
1 parent 3e4888b commit 2d8e7cc

27 files changed

Lines changed: 1282 additions & 246 deletions

include/paimon/defs.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,11 @@ struct PAIMON_EXPORT Options {
354354
/// "row-tracking.enabled" - Whether enable unique row id for append table. Default value is
355355
/// "false".
356356
static const char ROW_TRACKING_ENABLED[];
357+
/// "row-tracking.partition-group-on-commit" - When row-tracking is enabled, whether to group
358+
/// new file metas by partition before commit, so that assigned row IDs are contiguous within
359+
/// each partition. This is useful if you want to build global indices on this table. Default
360+
/// value is "true".
361+
static const char ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[];
357362
/// "data-evolution.enabled" - Whether enable data evolution for row tracking table. Default
358363
/// value is "false".
359364
static const char DATA_EVOLUTION_ENABLED[];
@@ -363,6 +368,14 @@ struct PAIMON_EXPORT Options {
363368
/// "blob-as-descriptor" - Read and write blob field using blob descriptor rather than blob
364369
/// bytes. Default value is "false".
365370
static const char BLOB_AS_DESCRIPTOR[];
371+
/// "blob-field" - Specifies column names that should be stored as blob type. This is used
372+
/// when you want to treat a BYTES column as a BLOB. Comma-separated field names.
373+
/// Multiple blob fields are supported.
374+
static const char BLOB_FIELD[];
375+
// TODO(xinyu.lxy): support "blob-descriptor-field" - treat fields as BLOB and store as
376+
// BlobDescriptor
377+
// TODO(xinyu.lxy): support "blob-view-field" - treat fields as BLOB and resolve from upstream
378+
// tables
366379
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
367380
static const char GLOBAL_INDEX_ENABLED[];
368381
/// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No

src/paimon/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ set(PAIMON_CORE_SRCS
220220
core/io/key_value_meta_projection_consumer.cpp
221221
core/io/key_value_projection_consumer.cpp
222222
core/io/key_value_projection_reader.cpp
223+
core/io/multiple_blob_file_writer.cpp
223224
core/io/rolling_blob_file_writer.cpp
224225
core/manifest/file_kind.cpp
225226
core/manifest/file_source.cpp

src/paimon/common/data/blob_defs.h

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,45 @@
1616

1717
#pragma once
1818

19+
#include <cstdint>
20+
1921
namespace paimon {
2022

23+
/// Blob file format constants shared between writer and reader.
24+
///
2125
/// A Blob field uses the 'large_binary' type as its underlying physical storage in Apache Arrow
2226
/// Schema, and is marked as the Paimon Blob extension type by attaching specific
23-
/// **KeyValueMetadata**. Only one blob field in one paimon table is allowed.
24-
///
25-
/// To create a Blob field:
26-
/// @code
27-
/// std::unordered_map<std::string, std::string> blob_metadata_map = {
28-
/// {Blob::EXTENSION_TYPE_KEY, Blob::EXTENSION_TYPE_VALUE}
29-
/// };
30-
/// auto field = arrow::field("my_blob_field", arrow::large_binary(), false,
31-
/// std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map));
32-
/// @endcode
33-
constexpr char BLOB_EXTENSION_TYPE_KEY[] = "paimon.extension.type";
34-
constexpr char BLOB_EXTENSION_TYPE_VALUE[] = "paimon.type.blob";
27+
/// **KeyValueMetadata**. Multiple blob fields in one paimon table are supported.
28+
class BlobDefs {
29+
public:
30+
BlobDefs() = delete;
31+
~BlobDefs() = delete;
32+
33+
/// To create a Blob field:
34+
/// @code
35+
/// std::unordered_map<std::string, std::string> blob_metadata_map = {
36+
/// {Blob::EXTENSION_TYPE_KEY, Blob::EXTENSION_TYPE_VALUE}
37+
/// };
38+
/// auto field = arrow::field("my_blob_field", arrow::large_binary(), false,
39+
/// std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map));
40+
/// @endcode
41+
/// Metadata key identifying a Paimon Blob extension type field.
42+
static constexpr char kExtensionTypeKey[] = "paimon.extension.type";
43+
/// Metadata value identifying a Paimon Blob extension type field.
44+
static constexpr char kExtensionTypeValue[] = "paimon.type.blob";
45+
46+
/// A bin_length value of -1 in the index indicates a null blob entry.
47+
static constexpr int64_t kNullBinLength = -1;
48+
/// Blob file format version.
49+
static constexpr int8_t kFileVersion = 1;
50+
/// Magic number identifying the start of each blob bin.
51+
static constexpr int32_t kMagicNumber = 1481511375;
52+
/// Offset from the start of a bin to the actual blob content (magic number size).
53+
static constexpr int32_t kContentStartOffset = 4;
54+
/// Total metadata length per bin: magic(4) + bin_length(8) + crc32(4) = 16.
55+
static constexpr int32_t kTotalMetaLength = 16;
56+
/// Blob file header length: index_len(4) + version(1) = 5.
57+
static constexpr uint32_t kBlobFileHeaderLength = 5;
58+
};
3559

3660
} // namespace paimon

src/paimon/common/data/blob_utils.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ bool BlobUtils::IsBlobMetadata(const std::shared_ptr<const arrow::KeyValueMetada
9696
if (!metadata) {
9797
return false;
9898
}
99-
auto extension_name = metadata->Get(BLOB_EXTENSION_TYPE_KEY);
99+
auto extension_name = metadata->Get(BlobDefs::kExtensionTypeKey);
100100
if (!extension_name.ok()) {
101101
return false;
102102
}
103-
return extension_name.ValueUnsafe() == BLOB_EXTENSION_TYPE_VALUE;
103+
return extension_name.ValueUnsafe() == BlobDefs::kExtensionTypeValue;
104104
}
105105

106106
bool BlobUtils::IsBlobFile(const std::string& file_name) {
@@ -110,7 +110,7 @@ bool BlobUtils::IsBlobFile(const std::string& file_name) {
110110
std::shared_ptr<arrow::Field> BlobUtils::ToArrowField(
111111
const std::string& field_name, bool nullable,
112112
std::unordered_map<std::string, std::string> metadata) {
113-
metadata[BLOB_EXTENSION_TYPE_KEY] = BLOB_EXTENSION_TYPE_VALUE;
113+
metadata[BlobDefs::kExtensionTypeKey] = BlobDefs::kExtensionTypeValue;
114114
return arrow::field(field_name, arrow::large_binary(), nullable,
115115
std::make_shared<arrow::KeyValueMetadata>(metadata));
116116
}

src/paimon/common/data/blob_utils_test.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class BlobUtilsTest : public ::testing::Test {
2929
private:
3030
std::shared_ptr<arrow::KeyValueMetadata> CreateBlobMetadata() {
3131
std::unordered_map<std::string, std::string> blob_metadata_map = {
32-
{BLOB_EXTENSION_TYPE_KEY, BLOB_EXTENSION_TYPE_VALUE}};
32+
{BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}};
3333
return std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map);
3434
}
3535
};
@@ -39,11 +39,11 @@ TEST_F(BlobUtilsTest, IsBlobMetadata) {
3939
EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
4040
EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
4141
std::unordered_map<std::string, std::string> wrong_metadata_map = {
42-
{BLOB_EXTENSION_TYPE_KEY, "paimon.type.varchar"}};
42+
{BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}};
4343
auto wrong_metadata = std::make_shared<arrow::KeyValueMetadata>(wrong_metadata_map);
4444
EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
4545
std::unordered_map<std::string, std::string> no_extension_metadata_map = {
46-
{"other_key", BLOB_EXTENSION_TYPE_VALUE}};
46+
{"other_key", BlobDefs::kExtensionTypeValue}};
4747
auto no_extension_metadata =
4848
std::make_shared<arrow::KeyValueMetadata>(no_extension_metadata_map);
4949
EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));

src/paimon/common/defs.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,12 @@ const char Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY[] = "data-file.external-pa
8787
const char Options::DATA_FILE_PREFIX[] = "data-file.prefix";
8888
const char Options::INDEX_FILE_IN_DATA_FILE_DIR[] = "index-file-in-data-file-dir";
8989
const char Options::ROW_TRACKING_ENABLED[] = "row-tracking.enabled";
90+
const char Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[] =
91+
"row-tracking.partition-group-on-commit";
9092
const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled";
9193
const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
9294
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
95+
const char Options::BLOB_FIELD[] = "blob-field";
9396
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
9497
const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num";
9598
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";

src/paimon/core/append/append_only_writer.cpp

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "paimon/core/io/data_file_path_factory.h"
3434
#include "paimon/core/io/data_file_writer.h"
3535
#include "paimon/core/io/data_increment.h"
36+
#include "paimon/core/io/multiple_blob_file_writer.h"
3637
#include "paimon/core/io/rolling_blob_file_writer.h"
3738
#include "paimon/core/io/rolling_file_writer.h"
3839
#include "paimon/core/io/single_file_writer.h"
@@ -212,35 +213,43 @@ AppendOnlyWriter::SingleFileWriterCreator AppendOnlyWriter::GetBlobFileWriterCre
212213

213214
AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingBlobWriter(
214215
const BlobUtils::SeparatedSchemas& schemas) const {
215-
if (schemas.blob_schema->num_fields() > RollingBlobFileWriter::EXPECTED_BLOB_FIELD_COUNT) {
216-
return Status::Invalid("Limit exactly one blob field in one paimon table yet.");
217-
}
218-
// use a specialized writer that writes blob data to a separate rolling file.
219-
::ArrowSchema arrow_schema;
220-
ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); });
221-
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schemas.blob_schema, &arrow_schema));
222-
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> format,
223-
FileFormatFactory::Get("blob", options_.ToMap()));
224-
PAIMON_ASSIGN_OR_RAISE(
225-
std::shared_ptr<WriterBuilder> writer_builder,
226-
format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize()));
227-
writer_builder->WithMemoryPool(memory_pool_);
228-
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schemas.blob_schema, &arrow_schema));
229-
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FormatStatsExtractor> stats_extractor,
230-
format->CreateStatsExtractor(&arrow_schema));
231-
232-
auto single_blob_file_writer_creator = GetBlobFileWriterCreator(
233-
writer_builder, stats_extractor, schemas.blob_schema->field_names());
234-
auto rolling_blob_file_writer_creator = [this, single_blob_file_writer_creator]()
216+
// Multiple blob fields are supported. Each blob field gets its own rolling file writer
217+
// via MultipleBlobFileWriter.
218+
auto blob_schema = schemas.blob_schema;
219+
auto blob_writer_creator = [this, blob_schema](const std::string& blob_field_name)
235220
-> Result<
236221
std::unique_ptr<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>> {
222+
// Create a single-field schema for this blob field
223+
auto field = blob_schema->GetFieldByName(blob_field_name);
224+
if (!field) {
225+
return Status::Invalid(
226+
fmt::format("Blob field '{}' not found in blob schema", blob_field_name));
227+
}
228+
auto single_field_schema = arrow::schema({field});
229+
::ArrowSchema arrow_schema;
230+
ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); });
231+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema));
232+
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> format,
233+
FileFormatFactory::Get("blob", options_.ToMap()));
234+
PAIMON_ASSIGN_OR_RAISE(
235+
std::shared_ptr<WriterBuilder> writer_builder,
236+
format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize()));
237+
writer_builder->WithMemoryPool(memory_pool_);
238+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema));
239+
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FormatStatsExtractor> stats_extractor,
240+
format->CreateStatsExtractor(&arrow_schema));
241+
242+
std::vector<std::string> write_cols = {blob_field_name};
243+
auto single_blob_file_writer_creator =
244+
GetBlobFileWriterCreator(writer_builder, stats_extractor, write_cols);
237245
return std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
238246
options_.GetBlobTargetFileSize(), single_blob_file_writer_creator);
239247
};
248+
240249
return std::make_unique<RollingBlobFileWriter>(
241250
options_.GetTargetFileSize(/*has_primary_key=*/false),
242251
GetDataFileWriterCreator(schemas.main_schema, schemas.main_schema->field_names()),
243-
rolling_blob_file_writer_creator, arrow::struct_(write_schema_->fields()));
252+
blob_schema, blob_writer_creator, arrow::struct_(write_schema_->fields()));
244253
}
245254

246255
Status AppendOnlyWriter::Sync() {

src/paimon/core/append/append_only_writer_test.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ TEST_F(AppendOnlyWriterTest, TestWriteWithSingleBlobField) {
639639
ASSERT_OK(writer.Close());
640640
}
641641

642-
TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFieldsShouldFail) {
642+
TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFields) {
643643
auto options =
644644
CreateOptions({{Options::FILE_FORMAT, "orc"}, {Options::MANIFEST_FORMAT, "orc"}});
645645
auto dir = UniqueTestDirectory::Create();
@@ -663,9 +663,19 @@ TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFieldsShouldFail) {
663663
ASSERT_TRUE(blob_builder2.Append("b", 1).ok());
664664
auto blob_array2 = blob_builder2.Finish().ValueOrDie();
665665

666-
ASSERT_NOK_WITH_MSG(
667-
writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2})),
668-
"Limit exactly one blob field in one paimon table yet.");
666+
ASSERT_OK(writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2})));
667+
ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(/*wait_compaction=*/true));
668+
669+
ASSERT_EQ(inc.GetNewFilesIncrement().NewFiles().size(), 3);
670+
const auto& main_file = inc.GetNewFilesIncrement().NewFiles()[0];
671+
const auto& blob_file1 = inc.GetNewFilesIncrement().NewFiles()[1];
672+
const auto& blob_file2 = inc.GetNewFilesIncrement().NewFiles()[2];
673+
ASSERT_TRUE(
674+
options.GetFileSystem()->Exists(path_factory->ToPath(main_file->file_name)).value());
675+
ASSERT_TRUE(
676+
options.GetFileSystem()->Exists(path_factory->ToPath(blob_file1->file_name)).value());
677+
ASSERT_TRUE(
678+
options.GetFileSystem()->Exists(path_factory->ToPath(blob_file2->file_name)).value());
669679
ASSERT_OK(writer.Close());
670680
}
671681

src/paimon/core/core_options.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ struct CoreOptions::Impl {
374374
ExpireConfig expire_config;
375375
std::vector<std::string> sequence_field;
376376
std::vector<std::string> remove_record_on_sequence_group;
377+
std::vector<std::string> blob_fields;
377378

378379
std::string partition_default_name = "__DEFAULT_PARTITION__";
379380
StartupMode startup_mode = StartupMode::Default();
@@ -430,6 +431,7 @@ struct CoreOptions::Impl {
430431
bool enable_adaptive_prefetch_strategy = true;
431432
bool index_file_in_data_file_dir = false;
432433
bool row_tracking_enabled = false;
434+
bool row_tracking_partition_group_on_commit = true;
433435
bool data_evolution_enabled = false;
434436
bool legacy_partition_name_enabled = true;
435437
bool global_index_enabled = true;
@@ -525,11 +527,17 @@ struct CoreOptions::Impl {
525527
// Parse row-tracking.enabled - whether to enable unique row id for append table
526528
PAIMON_RETURN_NOT_OK(
527529
parser.Parse<bool>(Options::ROW_TRACKING_ENABLED, &row_tracking_enabled));
530+
// Parse row-tracking.partition-group-on-commit - whether to group delta files by partition
531+
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT,
532+
&row_tracking_partition_group_on_commit));
528533
// Parse data-evolution.enabled - whether to enable data evolution for row tracking
529534
PAIMON_RETURN_NOT_OK(
530535
parser.Parse<bool>(Options::DATA_EVOLUTION_ENABLED, &data_evolution_enabled));
531536
// Parse bucket-function - bucket function type, default "DEFAULT"
532537
PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type));
538+
// Parse blob-field - column names to store as blob type, comma separated
539+
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
540+
Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields));
533541
return Status::OK();
534542
}
535543

@@ -1279,6 +1287,10 @@ bool CoreOptions::RowTrackingEnabled() const {
12791287
return impl_->row_tracking_enabled;
12801288
}
12811289

1290+
bool CoreOptions::RowTrackingPartitionGroupOnCommit() const {
1291+
return impl_->row_tracking_partition_group_on_commit;
1292+
}
1293+
12821294
bool CoreOptions::DataEvolutionEnabled() const {
12831295
return impl_->data_evolution_enabled;
12841296
}
@@ -1373,6 +1385,10 @@ BucketFunctionType CoreOptions::GetBucketFunctionType() const {
13731385
return impl_->bucket_function_type;
13741386
}
13751387

1388+
const std::vector<std::string>& CoreOptions::GetBlobFields() const {
1389+
return impl_->blob_fields;
1390+
}
1391+
13761392
int64_t CoreOptions::GetLookupCacheFileRetentionMs() const {
13771393
return impl_->lookup_cache_file_retention_ms;
13781394
}

src/paimon/core/core_options.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class PAIMON_EXPORT CoreOptions {
144144
bool IndexFileInDataFileDir() const;
145145

146146
bool RowTrackingEnabled() const;
147+
bool RowTrackingPartitionGroupOnCommit() const;
147148
bool DataEvolutionEnabled() const;
148149

149150
bool LegacyPartitionNameEnabled() const;
@@ -179,6 +180,8 @@ class PAIMON_EXPORT CoreOptions {
179180
BucketFunctionType GetBucketFunctionType() const;
180181
std::optional<int32_t> GetGlobalIndexThreadNum() const;
181182

183+
const std::vector<std::string>& GetBlobFields() const;
184+
182185
const std::map<std::string, std::string>& ToMap() const;
183186

184187
private:

0 commit comments

Comments
 (0)