Skip to content

Commit fb7e319

Browse files
committed
test: add blob compatible test
1 parent 23a87fa commit fb7e319

31 files changed

Lines changed: 289 additions & 7 deletions

test/inte/blob_table_inte_test.cpp

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
#include <cstdint>
1818
#include <cstdlib>
19-
#include <filesystem>
20-
#include <fstream>
19+
#include <initializer_list>
2120
#include <map>
2221
#include <memory>
2322
#include <numeric>
23+
#include <optional>
2424
#include <set>
2525
#include <string>
2626
#include <utility>
@@ -40,6 +40,7 @@
4040
#include "paimon/common/data/binary_array_writer.h"
4141
#include "paimon/common/data/binary_row.h"
4242
#include "paimon/common/data/binary_row_writer.h"
43+
#include "paimon/common/data/blob_descriptor.h"
4344
#include "paimon/common/data/blob_view_struct.h"
4445
#include "paimon/common/factories/io_hook.h"
4546
#include "paimon/common/table/special_fields.h"
@@ -345,19 +346,68 @@ class BlobTableInteTest : public testing::Test, public ::testing::WithParamInter
345346
});
346347
}
347348

349+
struct BlobDescriptorPathRewrite {
350+
std::string table_path;
351+
std::vector<std::string> table_relative_blob_dirs;
352+
};
353+
354+
static std::optional<std::string> TryRewriteDescriptorUri(
355+
const std::string& descriptor_uri, const BlobDescriptorPathRewrite& rewrite,
356+
const std::shared_ptr<LocalFileSystem>& fs) {
357+
if (rewrite.table_path.empty()) {
358+
return std::nullopt;
359+
}
360+
361+
for (const auto& blob_dir : rewrite.table_relative_blob_dirs) {
362+
const std::string marker = "/" + blob_dir + "/";
363+
auto marker_pos = descriptor_uri.find(marker);
364+
if (marker_pos != std::string::npos) {
365+
std::string relative_blob_path = descriptor_uri.substr(marker_pos + 1);
366+
return PathUtil::JoinPath(rewrite.table_path, relative_blob_path);
367+
}
368+
}
369+
370+
std::string file_name = PathUtil::GetName(descriptor_uri);
371+
for (const auto& blob_dir : rewrite.table_relative_blob_dirs) {
372+
std::string candidate =
373+
PathUtil::JoinPath(PathUtil::JoinPath(rewrite.table_path, blob_dir), file_name);
374+
auto exists = fs->Exists(candidate);
375+
if (exists.ok() && exists.value()) {
376+
return candidate;
377+
}
378+
}
379+
return std::nullopt;
380+
}
381+
348382
/// Convert a StructArray with serialized BlobDescriptor bytes back to a StructArray
349383
/// with raw blob bytes. Only blob fields are resolved; other columns (including
350384
/// _VALUE_KIND) are kept as-is.
351385
Result<std::shared_ptr<arrow::StructArray>> ConvertDescriptorToRawBlob(
352386
const std::shared_ptr<arrow::StructArray>& desc_array,
353-
const std::set<std::string>& blob_fields) const {
387+
const std::set<std::string>& blob_fields,
388+
const BlobDescriptorPathRewrite& rewrite = {}) const {
354389
auto fs = std::make_shared<LocalFileSystem>();
355390
return TransformBlobFields(
356391
desc_array, blob_fields,
357392
[&](const std::string_view& descriptor_bytes,
358393
arrow::LargeBinaryBuilder* builder) -> Status {
359-
PAIMON_ASSIGN_OR_RAISE(auto blob, Blob::FromDescriptor(descriptor_bytes.data(),
360-
descriptor_bytes.size()));
394+
PAIMON_ASSIGN_OR_RAISE(
395+
auto descriptor,
396+
BlobDescriptor::Deserialize(descriptor_bytes.data(), descriptor_bytes.size()));
397+
std::string descriptor_uri = descriptor->Uri();
398+
auto rewritten_uri = TryRewriteDescriptorUri(descriptor_uri, rewrite, fs);
399+
if (rewritten_uri.has_value()) {
400+
descriptor_uri = rewritten_uri.value();
401+
}
402+
403+
PAIMON_ASSIGN_OR_RAISE(
404+
auto rewritten_descriptor,
405+
BlobDescriptor::Create(descriptor->Version(), descriptor_uri,
406+
descriptor->Offset(), descriptor->Length()));
407+
auto rewritten_descriptor_bytes = rewritten_descriptor->Serialize(pool_);
408+
PAIMON_ASSIGN_OR_RAISE(auto blob,
409+
Blob::FromDescriptor(rewritten_descriptor_bytes->data(),
410+
rewritten_descriptor_bytes->size()));
361411
PAIMON_ASSIGN_OR_RAISE(auto data, blob->ToData(fs, pool_));
362412
PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(data->data(), data->size()));
363413
return Status::OK();
@@ -2081,8 +2131,12 @@ TEST_P(BlobTableInteTest, TestBlobDescriptorFieldPartialExternalStorageNoAsDescr
20812131
auto read_concat = arrow::Concatenate(result.chunked_array->chunks()).ValueOrDie();
20822132
auto read_struct = std::dynamic_pointer_cast<arrow::StructArray>(read_concat);
20832133

2084-
// After read, b0 and b1 are both descriptor-stored; resolve all back to raw bytes
2085-
ASSERT_OK_AND_ASSIGN(auto resolved, ConvertDescriptorToRawBlob(read_struct, {"b0", "b1"}));
2134+
// After read, b0 and b1 are both descriptor-stored; resolve all back to raw bytes.
2135+
// Java-generated descriptors may contain absolute paths from the generation machine.
2136+
// Rewrite them to the portable blob directories inside the copied table path.
2137+
BlobDescriptorPathRewrite rewrite{table_path, {"raw_blob", "external_blob"}};
2138+
ASSERT_OK_AND_ASSIGN(auto resolved,
2139+
ConvertDescriptorToRawBlob(read_struct, {"b0", "b1"}, rewrite));
20862140
ASSERT_OK_AND_ASSIGN(auto expected_with_rk, PrependRowKindColumn(raw_array));
20872141
ASSERT_TRUE(resolved->Equals(expected_with_rk));
20882142

@@ -3008,4 +3062,40 @@ TEST_P(BlobTableInteTest, TestBlobViewWithFallbackPath) {
30083062
<< "expected:" << expected_with_rk->ToString();
30093063
}
30103064

3065+
TEST_P(BlobTableInteTest, TestReadBlobDescriptorFieldFromJava) {
3066+
std::string table_path =
3067+
GetDataDir() + "/" + file_format +
3068+
"/blob_desc_field_with_external_path.db/blob_desc_field_with_external_path";
3069+
arrow::FieldVector fields = {
3070+
arrow::field("f0", arrow::int32()), BlobUtils::ToArrowField("b0", true),
3071+
BlobUtils::ToArrowField("b1", true), BlobUtils::ToArrowField("b2", true),
3072+
BlobUtils::ToArrowField("b3", true)};
3073+
auto schema = arrow::schema(fields);
3074+
// b0: all non-null, b1: has nulls, b2: all non-null, b3: has nulls
3075+
std::string raw_json = R"([
3076+
[1, "img_0", null, "raw_2_0", "raw_3_0"],
3077+
[2, "img_1", "vid_1", "raw_2_1", null ],
3078+
[3, "img_2", null, "raw_2_2", "raw_3_2" ]
3079+
])";
3080+
auto raw_array = std::dynamic_pointer_cast<arrow::StructArray>(
3081+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), raw_json).ValueOrDie());
3082+
3083+
ASSERT_OK_AND_ASSIGN(auto plan, ScanTable(table_path));
3084+
std::map<std::string, std::string> read_options = {{Options::BLOB_AS_DESCRIPTOR, "false"}};
3085+
ASSERT_OK_AND_ASSIGN(auto result, ReadTable(table_path, schema->field_names(), plan,
3086+
/*predicate=*/nullptr, read_options));
3087+
ASSERT_TRUE(result.chunked_array);
3088+
auto read_concat = arrow::Concatenate(result.chunked_array->chunks()).ValueOrDie();
3089+
auto read_struct = std::dynamic_pointer_cast<arrow::StructArray>(read_concat);
3090+
3091+
// After read, b0 and b1 are both descriptor-stored; resolve all back to raw bytes.
3092+
// Java-generated descriptors may contain absolute paths from the generation machine.
3093+
// Rewrite them to the portable blob directories inside the copied table path.
3094+
BlobDescriptorPathRewrite rewrite{table_path, {"raw_blob", "external_blob"}};
3095+
ASSERT_OK_AND_ASSIGN(auto resolved,
3096+
ConvertDescriptorToRawBlob(read_struct, {"b0", "b1"}, rewrite));
3097+
ASSERT_OK_AND_ASSIGN(auto expected_with_rk, PrependRowKindColumn(raw_array));
3098+
ASSERT_TRUE(resolved->Equals(expected_with_rk));
3099+
}
3100+
30113101
} // namespace paimon::test
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
f0:int b0:blob b1:blob b2:blob b3:blob (all can be null)
2+
bucket count: -1
3+
target-file-size: 700
4+
row-tracking.enabled: true
5+
data-evolution.enabled: true
6+
blob-descriptor-field: b0,b1
7+
blob-external-storage-field: b1
8+
blob-external-storage-path: <table>/external_blob (absolute path at generation time)
9+
10+
b0: descriptor field, inline in main file, source .bin files in raw_blob/
11+
b1: descriptor field, repacked to external storage in external_blob/
12+
b2: regular blob, written to .blob files
13+
b3: regular blob, written to .blob files
14+
15+
Note: b0 is passed as descriptor via Blob.fromLocal(); b1/b2/b3 are raw bytes.
16+
Paimon auto-converts b1 to descriptor internally.
17+
18+
Msgs:
19+
snapshot-1
20+
write field: "f0", "b0", "b1", "b2", "b3"
21+
Add: 1, "img_0", null, "raw_2_0", "raw_3_0"
22+
Add: 2, "img_1", "vid_1", "raw_2_1", null
23+
Add: 3, "img_2", null, "raw_2_2", "raw_3_2"
24+
NoCompact
25+
26+
Expected data files after commit:
27+
[0] main orc file (f0, b0, b1), 3 rows
28+
[1] .blob file (b2), 3 rows
29+
[2] .blob file (b3), 3 rows
30+
31+
C++ read note:
32+
Descriptor URIs contain absolute paths from the Java generation machine.
33+
ConvertDescriptorToRawBlob uses BlobDescriptorPathRewrite{"raw_blob", "external_blob"}
34+
to redirect them to <table>/raw_blob/ and <table>/external_blob/ at read time.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
img_0

0 commit comments

Comments
 (0)