|
16 | 16 |
|
17 | 17 | #include <cstdint> |
18 | 18 | #include <cstdlib> |
19 | | -#include <filesystem> |
20 | | -#include <fstream> |
| 19 | +#include <initializer_list> |
21 | 20 | #include <map> |
22 | 21 | #include <memory> |
23 | 22 | #include <numeric> |
| 23 | +#include <optional> |
24 | 24 | #include <set> |
25 | 25 | #include <string> |
26 | 26 | #include <utility> |
|
40 | 40 | #include "paimon/common/data/binary_array_writer.h" |
41 | 41 | #include "paimon/common/data/binary_row.h" |
42 | 42 | #include "paimon/common/data/binary_row_writer.h" |
| 43 | +#include "paimon/common/data/blob_descriptor.h" |
43 | 44 | #include "paimon/common/data/blob_view_struct.h" |
44 | 45 | #include "paimon/common/factories/io_hook.h" |
45 | 46 | #include "paimon/common/table/special_fields.h" |
@@ -345,19 +346,58 @@ class BlobTableInteTest : public testing::Test, public ::testing::WithParamInter |
345 | 346 | }); |
346 | 347 | } |
347 | 348 |
|
| 349 | + struct BlobDescriptorPathRewrite { |
| 350 | + std::string table_path; |
| 351 | + std::vector<std::string> table_relative_blob_dirs; |
| 352 | + }; |
| 353 | + |
| 354 | + static std::optional<std::string> TryRewriteDescriptorUri( |
| 355 | + const std::string& descriptor_uri, const BlobDescriptorPathRewrite& rewrite, |
| 356 | + const std::shared_ptr<LocalFileSystem>& fs) { |
| 357 | + if (rewrite.table_path.empty()) { |
| 358 | + return std::nullopt; |
| 359 | + } |
| 360 | + |
| 361 | + for (const auto& blob_dir : rewrite.table_relative_blob_dirs) { |
| 362 | + const std::string marker = "/" + blob_dir + "/"; |
| 363 | + auto marker_pos = descriptor_uri.find(marker); |
| 364 | + if (marker_pos != std::string::npos) { |
| 365 | + std::string relative_blob_path = descriptor_uri.substr(marker_pos + 1); |
| 366 | + return PathUtil::JoinPath(rewrite.table_path, relative_blob_path); |
| 367 | + } |
| 368 | + } |
| 369 | + return std::nullopt; |
| 370 | + } |
| 371 | + |
348 | 372 | /// Convert a StructArray with serialized BlobDescriptor bytes back to a StructArray |
349 | 373 | /// with raw blob bytes. Only blob fields are resolved; other columns (including |
350 | 374 | /// _VALUE_KIND) are kept as-is. |
351 | 375 | Result<std::shared_ptr<arrow::StructArray>> ConvertDescriptorToRawBlob( |
352 | 376 | const std::shared_ptr<arrow::StructArray>& desc_array, |
353 | | - const std::set<std::string>& blob_fields) const { |
| 377 | + const std::set<std::string>& blob_fields, |
| 378 | + const BlobDescriptorPathRewrite& rewrite = {}) const { |
354 | 379 | auto fs = std::make_shared<LocalFileSystem>(); |
355 | 380 | return TransformBlobFields( |
356 | 381 | desc_array, blob_fields, |
357 | 382 | [&](const std::string_view& descriptor_bytes, |
358 | 383 | arrow::LargeBinaryBuilder* builder) -> Status { |
359 | | - PAIMON_ASSIGN_OR_RAISE(auto blob, Blob::FromDescriptor(descriptor_bytes.data(), |
360 | | - descriptor_bytes.size())); |
| 384 | + PAIMON_ASSIGN_OR_RAISE( |
| 385 | + auto descriptor, |
| 386 | + BlobDescriptor::Deserialize(descriptor_bytes.data(), descriptor_bytes.size())); |
| 387 | + std::string descriptor_uri = descriptor->Uri(); |
| 388 | + auto rewritten_uri = TryRewriteDescriptorUri(descriptor_uri, rewrite, fs); |
| 389 | + if (rewritten_uri.has_value()) { |
| 390 | + descriptor_uri = rewritten_uri.value(); |
| 391 | + } |
| 392 | + |
| 393 | + PAIMON_ASSIGN_OR_RAISE( |
| 394 | + auto rewritten_descriptor, |
| 395 | + BlobDescriptor::Create(descriptor->Version(), descriptor_uri, |
| 396 | + descriptor->Offset(), descriptor->Length())); |
| 397 | + auto rewritten_descriptor_bytes = rewritten_descriptor->Serialize(pool_); |
| 398 | + PAIMON_ASSIGN_OR_RAISE(auto blob, |
| 399 | + Blob::FromDescriptor(rewritten_descriptor_bytes->data(), |
| 400 | + rewritten_descriptor_bytes->size())); |
361 | 401 | PAIMON_ASSIGN_OR_RAISE(auto data, blob->ToData(fs, pool_)); |
362 | 402 | PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(data->data(), data->size())); |
363 | 403 | return Status::OK(); |
@@ -3008,4 +3048,44 @@ TEST_P(BlobTableInteTest, TestBlobViewWithFallbackPath) { |
3008 | 3048 | << "expected:" << expected_with_rk->ToString(); |
3009 | 3049 | } |
3010 | 3050 |
|
| 3051 | +TEST_P(BlobTableInteTest, TestReadBlobDescriptorFieldFromJava) { |
| 3052 | + auto file_format = GetParam(); |
| 3053 | + if (file_format != "orc" && file_format != "parquet") { |
| 3054 | + return; |
| 3055 | + } |
| 3056 | + std::string table_path = |
| 3057 | + GetDataDir() + "/" + file_format + |
| 3058 | + "/blob_desc_field_with_external_path.db/blob_desc_field_with_external_path"; |
| 3059 | + arrow::FieldVector fields = { |
| 3060 | + arrow::field("f0", arrow::int32()), BlobUtils::ToArrowField("b0", true), |
| 3061 | + BlobUtils::ToArrowField("b1", true), BlobUtils::ToArrowField("b2", true), |
| 3062 | + BlobUtils::ToArrowField("b3", true)}; |
| 3063 | + auto schema = arrow::schema(fields); |
| 3064 | + // b0: all non-null, b1: has nulls, b2: all non-null, b3: has nulls |
| 3065 | + std::string raw_json = R"([ |
| 3066 | + [1, "img_0", null, "raw_2_0", "raw_3_0"], |
| 3067 | + [2, "img_1", "vid_1", "raw_2_1", null ], |
| 3068 | + [3, "img_2", null, "raw_2_2", "raw_3_2" ] |
| 3069 | + ])"; |
| 3070 | + auto raw_array = std::dynamic_pointer_cast<arrow::StructArray>( |
| 3071 | + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), raw_json).ValueOrDie()); |
| 3072 | + |
| 3073 | + ASSERT_OK_AND_ASSIGN(auto plan, ScanTable(table_path)); |
| 3074 | + std::map<std::string, std::string> read_options = {{Options::BLOB_AS_DESCRIPTOR, "false"}}; |
| 3075 | + ASSERT_OK_AND_ASSIGN(auto result, ReadTable(table_path, schema->field_names(), plan, |
| 3076 | + /*predicate=*/nullptr, read_options)); |
| 3077 | + ASSERT_TRUE(result.chunked_array); |
| 3078 | + auto read_concat = arrow::Concatenate(result.chunked_array->chunks()).ValueOrDie(); |
| 3079 | + auto read_struct = std::dynamic_pointer_cast<arrow::StructArray>(read_concat); |
| 3080 | + |
| 3081 | + // After read, b0 and b1 are both descriptor-stored; resolve all back to raw bytes. |
| 3082 | + // Java-generated descriptors may contain absolute paths from the generation machine. |
| 3083 | + // Rewrite them to the portable blob directories inside the copied table path. |
| 3084 | + BlobDescriptorPathRewrite rewrite{table_path, {"raw_blob", "external_blob"}}; |
| 3085 | + ASSERT_OK_AND_ASSIGN(auto resolved, |
| 3086 | + ConvertDescriptorToRawBlob(read_struct, {"b0", "b1"}, rewrite)); |
| 3087 | + ASSERT_OK_AND_ASSIGN(auto expected_with_rk, PrependRowKindColumn(raw_array)); |
| 3088 | + ASSERT_TRUE(resolved->Equals(expected_with_rk)); |
| 3089 | +} |
| 3090 | + |
3011 | 3091 | } // namespace paimon::test |
0 commit comments