Skip to content

Commit 85b0fa7

Browse files
committed
Address files system table review comments
1 parent a872fd0 commit 85b0fa7

4 files changed

Lines changed: 80 additions & 17 deletions

File tree

src/paimon/common/utils/binary_row_partition_computer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,13 @@ Result<arrow::Type::type> BinaryRowPartitionComputer::GetTypeFromArrowSchema(
136136

137137
Result<std::string> BinaryRowPartitionComputer::PartToSimpleString(
138138
const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow& partition,
139-
const std::string& delimiter, int32_t max_length) {
139+
const std::string& delimiter, int32_t max_length, bool legacy_partition_name_enabled) {
140140
std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> partition_converters;
141141
partition_converters.reserve(partition_type->num_fields());
142142
for (const auto& field : partition_type->fields()) {
143143
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter converter,
144144
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
145-
field->type()->id(), /*legacy_partition_name_enabled=*/true));
145+
field->type()->id(), legacy_partition_name_enabled));
146146
partition_converters.emplace_back(converter);
147147
}
148148
std::vector<std::string> partition_vec;

src/paimon/common/utils/binary_row_partition_computer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ class BinaryRowPartitionComputer {
6060

6161
static Result<std::string> PartToSimpleString(
6262
const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow& partition,
63-
const std::string& delimiter, int32_t max_length);
63+
const std::string& delimiter, int32_t max_length,
64+
bool legacy_partition_name_enabled = true);
6465

6566
private:
6667
BinaryRowPartitionComputer(const std::vector<std::string>& partition_keys,

src/paimon/core/table/system/metadata_system_tables.cpp

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
#include "paimon/common/types/data_field.h"
4040
#include "paimon/common/utils/binary_row_partition_computer.h"
4141
#include "paimon/common/utils/date_time_utils.h"
42+
#include "paimon/common/utils/field_type_utils.h"
4243
#include "paimon/common/utils/internal_row_utils.h"
4344
#include "paimon/common/utils/object_utils.h"
4445
#include "paimon/common/utils/path_util.h"
4546
#include "paimon/common/utils/rapidjson_util.h"
47+
#include "paimon/core/casting/cast_executor_factory.h"
4648
#include "paimon/core/core_options.h"
4749
#include "paimon/core/io/data_file_meta.h"
4850
#include "paimon/core/manifest/file_entry.h"
@@ -290,8 +292,9 @@ Result<std::optional<std::string>> OptionalPartitionString(
290292
}
291293
PAIMON_ASSIGN_OR_RAISE(std::string value,
292294
BinaryRowPartitionComputer::PartToSimpleString(
293-
partition_schema, row, ",", kMaxPartitionStatsLength));
294-
return std::optional<std::string>(value);
295+
partition_schema, row, ",", kMaxPartitionStatsLength,
296+
/*legacy_partition_name_enabled=*/false));
297+
return std::optional<std::string>(fmt::format("{{{}}}", value));
295298
}
296299

297300
Result<VariantType> OptionalPartitionStringValue(
@@ -311,6 +314,21 @@ Result<std::string> FilePath(const std::shared_ptr<FileStorePathFactory>& path_f
311314
return PathUtil::JoinPath(bucket_path, file.file_name);
312315
}
313316

317+
Result<std::string> FieldValueString(const DataField& field, const VariantType& value) {
318+
PAIMON_ASSIGN_OR_RAISE(FieldType field_type,
319+
FieldTypeUtils::ConvertToFieldType(field.Type()->id()));
320+
std::shared_ptr<CastExecutor> cast_executor =
321+
CastExecutorFactory::GetCastExecutorFactory()->GetCastExecutor(field_type,
322+
FieldType::STRING);
323+
if (!cast_executor) {
324+
return DataDefine::VariantValueToString(value);
325+
}
326+
PAIMON_ASSIGN_OR_RAISE(Literal literal,
327+
DataDefine::VariantValueToLiteral(value, field.Type()->id()));
328+
PAIMON_ASSIGN_OR_RAISE(Literal string_literal, cast_executor->Cast(literal, arrow::utf8()));
329+
return string_literal.GetValue<std::string>();
330+
}
331+
314332
Result<std::vector<std::string>> RowValueStrings(const std::vector<DataField>& fields,
315333
const InternalRow& row) {
316334
std::shared_ptr<arrow::Schema> schema = DataField::ConvertDataFieldsToArrowSchema(fields);
@@ -323,7 +341,7 @@ Result<std::vector<std::string>> RowValueStrings(const std::vector<DataField>& f
323341
std::string value = "null";
324342
if (!row.IsNullAt(i)) {
325343
VariantType field_value = getters[i](row);
326-
value = DataDefine::VariantValueToString(field_value);
344+
PAIMON_ASSIGN_OR_RAISE(value, FieldValueString(fields[i], field_value));
327345
}
328346
values.push_back(std::move(value));
329347
}
@@ -406,14 +424,15 @@ Result<std::vector<DataField>> ProjectWriteFields(const std::shared_ptr<TableSch
406424
std::vector<DataField> fields;
407425
fields.reserve(file.write_cols->size() + data_schema->PartitionKeys().size());
408426
for (const auto& write_col : file.write_cols.value()) {
409-
if (write_col == SpecialFields::RowId().Name() ||
410-
write_col == SpecialFields::SequenceNumber().Name()) {
427+
if (SpecialFields::IsSpecialFieldName(write_col)) {
411428
continue;
412429
}
413430
PAIMON_ASSIGN_OR_RAISE(DataField field, data_schema->GetField(write_col));
414431
fields.push_back(std::move(field));
415432
}
416433

434+
// Partial writes may omit partition columns from write_cols. Keep them in the stats source
435+
// fields so SimpleStatsEvolution can map partition stats consistently.
417436
for (const auto& partition_key : data_schema->PartitionKeys()) {
418437
if (!ObjectUtils::Contains(file.write_cols.value(), partition_key)) {
419438
PAIMON_ASSIGN_OR_RAISE(DataField field, data_schema->GetField(partition_key));
@@ -423,6 +442,17 @@ Result<std::vector<DataField>> ProjectWriteFields(const std::shared_ptr<TableSch
423442
return fields;
424443
}
425444

445+
Result<std::vector<DataField>> KeyFieldsForFilesTable(
446+
const std::shared_ptr<TableSchema>& data_schema) {
447+
PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> key_fields,
448+
data_schema->TrimmedPrimaryKeyFields());
449+
// Java FilesTable falls back to logicalRowType when logicalTrimmedPrimaryKeysType is empty.
450+
if (key_fields.empty()) {
451+
return data_schema->Fields();
452+
}
453+
return key_fields;
454+
}
455+
426456
Result<std::shared_ptr<InternalArray>> WriteColsValue(
427457
const std::optional<std::vector<std::string>>& write_cols,
428458
const std::shared_ptr<MemoryPool>& pool) {
@@ -816,9 +846,11 @@ Result<std::vector<GenericRow>> FilesSystemTable::BuildRows() const {
816846
CreatePathFactory(context_, core_options, pool));
817847
PAIMON_ASSIGN_OR_RAISE(std::vector<ManifestEntry> entries,
818848
ReadLatestDataFiles(context_, path_factory, core_options, pool));
849+
std::shared_ptr<arrow::Schema> arrow_schema =
850+
DataField::ConvertDataFieldsToArrowSchema(context_.table_schema->Fields());
819851
PAIMON_ASSIGN_OR_RAISE(
820-
std::vector<DataField> partition_fields,
821-
context_.table_schema->GetFields(context_.table_schema->PartitionKeys()));
852+
std::shared_ptr<arrow::Schema> partition_schema,
853+
FieldMapping::GetPartitionSchema(arrow_schema, context_.table_schema->PartitionKeys()));
822854
const std::vector<DataField>& value_stats_fields = context_.table_schema->Fields();
823855

824856
std::vector<GenericRow> rows;
@@ -834,10 +866,7 @@ Result<std::vector<GenericRow>> FilesSystemTable::BuildRows() const {
834866
PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> data_stats_fields,
835867
ProjectWriteFields(data_schema, *file));
836868
PAIMON_ASSIGN_OR_RAISE(std::vector<DataField> key_fields,
837-
data_schema->TrimmedPrimaryKeyFields());
838-
if (key_fields.empty()) {
839-
key_fields = data_schema->Fields();
840-
}
869+
KeyFieldsForFilesTable(data_schema));
841870
auto stats_evolution = std::make_shared<SimpleStatsEvolution>(
842871
data_stats_fields, value_stats_fields,
843872
data_schema->Id() != context_.table_schema->Id() || file->write_cols.has_value(), pool);
@@ -849,9 +878,9 @@ Result<std::vector<GenericRow>> FilesSystemTable::BuildRows() const {
849878
if (context_.table_schema->PartitionKeys().empty()) {
850879
row.SetField(0, NullType());
851880
} else {
852-
PAIMON_ASSIGN_OR_RAISE(std::string partition,
853-
RowValuesString(partition_fields, entry.Partition(), "{", "}"));
854-
row.SetField(0, StringValue(partition));
881+
PAIMON_ASSIGN_OR_RAISE(VariantType partition, OptionalPartitionStringValue(
882+
entry.Partition(), partition_schema));
883+
row.SetField(0, partition);
855884
}
856885
row.SetField(1, entry.Bucket());
857886
PAIMON_ASSIGN_OR_RAISE(std::string file_path, FilePath(path_factory, entry, *file));

test/inte/read_inte_test.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,39 @@ TEST(SystemTableReadInteTest, TestReadFilesSystemTableForPartitionedTable) {
838838
ASSERT_EQ(max_value_stats_array->GetString(0), "{dt=20260527, pk=a, v=1}");
839839
}
840840

841+
TEST(SystemTableReadInteTest, TestReadFilesSystemTableForDatePartition) {
842+
arrow::FieldVector fields = {
843+
arrow::field("dt", arrow::date32()),
844+
arrow::field("v", arrow::int32()),
845+
};
846+
auto schema = arrow::schema(fields);
847+
std::map<std::string, std::string> options = {{Options::FILE_SYSTEM, "local"},
848+
{Options::FILE_FORMAT, "orc"},
849+
{Options::MANIFEST_FORMAT, "orc"},
850+
{Options::BUCKET, "1"}};
851+
auto dir = UniqueTestDirectory::Create();
852+
ASSERT_TRUE(dir);
853+
ASSERT_OK_AND_ASSIGN(auto helper,
854+
TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{"dt"},
855+
/*primary_keys=*/{}, options,
856+
/*is_streaming_mode=*/true));
857+
858+
ASSERT_OK_AND_ASSIGN(
859+
std::unique_ptr<RecordBatch> batch,
860+
TestHelper::MakeRecordBatch(arrow::struct_(fields), R"([[10440, 1]])",
861+
/*partition_map=*/{{"dt", "1998-08-02"}}, /*bucket=*/0, {}));
862+
ASSERT_OK(helper->WriteAndCommit(std::move(batch), /*commit_identifier=*/0,
863+
/*expected_commit_messages=*/std::nullopt));
864+
865+
std::string table_path = PathUtil::JoinPath(dir->Str(), "foo.db/bar");
866+
ASSERT_OK_AND_ASSIGN(auto files_result, ReadSystemTable(table_path + "$files", options));
867+
auto files_array = SingleStructChunk(files_result);
868+
ASSERT_EQ(files_array->length(), 1);
869+
auto partition_array = std::dynamic_pointer_cast<arrow::StringArray>(files_array->field(0));
870+
ASSERT_TRUE(partition_array);
871+
ASSERT_EQ(partition_array->GetString(0), "{1998-08-02}");
872+
}
873+
841874
TEST(SystemTableReadInteTest, TestReadFilesSystemTableWithSchemaEvolutionStats) {
842875
std::map<std::string, std::string> options = {{Options::FILE_SYSTEM, "local"}};
843876
std::string table_path = paimon::test::GetDataDir() +

0 commit comments

Comments
 (0)