Skip to content

Commit 319b5b7

Browse files
committed
Collect Iceberg data file statistics during insertion
1 parent 9d45734 commit 319b5b7

23 files changed

+840
-44
lines changed

velox/dwio/common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ velox_add_library(
4242
OnDemandUnitLoader.cpp
4343
InputStream.cpp
4444
IntDecoder.cpp
45+
IcebergStatistics.cpp
4546
MetadataFilter.cpp
4647
Options.cpp
4748
OutputStream.cpp
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/common/IcebergStatistics.h"
18+
19+
namespace facebook::velox::dwio::common {
20+
21+
folly::dynamic IcebergDataFileStatistics::toJson() const {
22+
folly::dynamic json = folly::dynamic::object;
23+
json["recordCount"] = numRecords;
24+
25+
auto mapToJson = [](const auto& map) {
26+
folly::dynamic result = folly::dynamic::object;
27+
for (const auto& pair : map) {
28+
result[folly::to<std::string>(pair.first)] = pair.second;
29+
}
30+
return result;
31+
};
32+
33+
json["columnSizes"] = mapToJson(columnsSizes);
34+
json["valueCounts"] = mapToJson(valueCounts);
35+
json["nullValueCounts"] = mapToJson(nullValueCounts);
36+
json["nanValueCounts"] = mapToJson(nanValueCounts);
37+
json["lowerBounds"] = mapToJson(lowerBounds);
38+
json["upperBounds"] = mapToJson(upperBounds);
39+
40+
return json;
41+
}
42+
43+
} // namespace facebook::velox::dwio::common
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/json/dynamic.h>
20+
21+
namespace facebook::velox::dwio::common {
22+
23+
// Iceberg data_file struct fields.
24+
struct IcebergDataFileStatistics {
25+
int64_t numRecords;
26+
std::unordered_map<int32_t, int64_t> columnsSizes;
27+
std::unordered_map<int32_t, int64_t> valueCounts;
28+
std::unordered_map<int32_t, int64_t> nullValueCounts;
29+
std::unordered_map<int32_t, int64_t> nanValueCounts;
30+
std::unordered_map<int32_t, std::string> lowerBounds;
31+
std::unordered_map<int32_t, std::string> upperBounds;
32+
33+
IcebergDataFileStatistics() : numRecords(0) {}
34+
35+
folly::dynamic toJson() const;
36+
};
37+
38+
} // namespace facebook::velox::dwio::common

velox/dwio/common/Options.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,15 @@ class ReaderOptions : public io::ReaderOptions {
684684
bool allowEmptyFile_{false};
685685
};
686686

687+
// Settings for collecting Iceberg column statistics. Holds the Iceberg
688+
// source column ID (field_id) and whether to skip bounds collection for this
689+
// field. For nested field, it contains child fields.
690+
struct IcebergStatsSettings {
691+
int32_t id;
692+
bool skipBounds;
693+
std::vector<IcebergStatsSettings> children;
694+
};
695+
687696
struct WriterOptions {
688697
TypePtr schema{nullptr};
689698
velox::memory::MemoryPool* memoryPool{nullptr};
@@ -706,6 +715,13 @@ struct WriterOptions {
706715
std::string sessionTimezoneName;
707716
bool adjustTimestampToTimezone{false};
708717

718+
// This option controls whether collect iceberg data file statistics
719+
// during write. It will be set to true only by IcebergDataSink.
720+
bool collectIcebergDataFileStats{false};
721+
722+
std::shared_ptr<std::vector<IcebergStatsSettings>> icebergStatsSetting{
723+
nullptr};
724+
709725
// WriterOption implementations can implement this function to specify how to
710726
// process format-specific session and connector configs.
711727
virtual void processConfigs(

velox/dwio/common/Writer.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include <optional>
2222
#include <string>
2323

24+
#include "velox/dwio/common/IcebergStatistics.h"
25+
#include "velox/dwio/common/Options.h"
26+
#include "velox/dwio/common/Statistics.h"
2427
#include "velox/vector/ComplexVector.h"
2528

2629
namespace facebook::velox::dwio::common {
@@ -79,6 +82,11 @@ class Writer {
7982
/// Data can no longer be written.
8083
virtual void abort() = 0;
8184

85+
/// Return statistics based on each Iceberg data file
86+
std::shared_ptr<IcebergDataFileStatistics> dataFileStats() const {
87+
return dataFileStats_;
88+
};
89+
8290
protected:
8391
bool isRunning() const;
8492
bool isFinishing() const;
@@ -92,6 +100,10 @@ class Writer {
92100
static void checkStateTransition(State oldState, State newState);
93101

94102
State state_{State::kInit};
103+
bool collectIcebergStats_{false};
104+
std::shared_ptr<IcebergDataFileStatistics> dataFileStats_{nullptr};
105+
std::shared_ptr<std::vector<IcebergStatsSettings>> icebergStatsSetting_{
106+
nullptr};
95107
};
96108

97109
FOLLY_ALWAYS_INLINE std::ostream& operator<<(

velox/dwio/parquet/common/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ velox_add_library(
1717
BloomFilter.cpp
1818
XxHasher.cpp
1919
LevelComparison.cpp
20-
LevelConversion.cpp)
20+
LevelConversion.cpp
21+
UnicodeUtil.cpp)
2122

2223
velox_link_libraries(
2324
velox_dwio_parquet_common
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/parquet/common/UnicodeUtil.h"
18+
19+
#include "velox/external/utf8proc/utf8proc.h"
20+
#include "velox/functions/lib/string/StringImpl.h"
21+
22+
namespace facebook::velox::parquet {
23+
24+
std::string UnicodeUtil::truncateStringMin(
25+
const char* input,
26+
int32_t inputLength,
27+
int32_t numCodePoints) {
28+
auto length = functions::stringImpl::cappedByteLength<false>(
29+
StringView(input, inputLength), numCodePoints);
30+
return std::string(input, length);
31+
}
32+
33+
std::string UnicodeUtil::truncateStringMax(
34+
const char* input,
35+
int32_t inputLength,
36+
int32_t numCodePoints) {
37+
auto truncatedLength = functions::stringImpl::cappedByteLength<false>(
38+
StringView(input, inputLength), numCodePoints);
39+
40+
if (truncatedLength == inputLength) {
41+
return std::string(input, inputLength);
42+
}
43+
44+
// Try to increment the last code point.
45+
for (auto i = numCodePoints - 1; i >= 0; --i) {
46+
const char* current = input;
47+
int32_t currentCodePoint = 0;
48+
49+
// Find the i-th code point position.
50+
while (current < input + truncatedLength && currentCodePoint < i) {
51+
int32_t charLength;
52+
utf8proc_codepoint(current, input + truncatedLength, charLength);
53+
current += charLength;
54+
currentCodePoint++;
55+
}
56+
57+
if (current >= input + truncatedLength)
58+
continue;
59+
60+
int32_t charLength;
61+
auto codePoint =
62+
utf8proc_codepoint(current, input + truncatedLength, charLength);
63+
auto nextCodePoint = codePoint + 1;
64+
65+
// Check if the incremented code point is valid.
66+
if (nextCodePoint != 0 && utf8proc_codepoint_valid(nextCodePoint)) {
67+
std::string result;
68+
result.reserve(truncatedLength + 4);
69+
result.assign(input, current - input);
70+
char buffer[4];
71+
auto bytesWritten = utf8proc_encode_char(
72+
nextCodePoint, reinterpret_cast<utf8proc_uint8_t*>(buffer));
73+
result.append(buffer, bytesWritten);
74+
return result;
75+
}
76+
}
77+
78+
return std::string(input, truncatedLength);
79+
}
80+
81+
} // namespace facebook::velox::parquet
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
#include <optional>
21+
#include <string>
22+
23+
namespace facebook::velox::parquet {
24+
25+
class UnicodeUtil {
26+
public:
27+
static std::string truncateStringMin(
28+
const char* input,
29+
int32_t inputLength,
30+
int32_t numCodePoints);
31+
32+
static std::string truncateStringMax(
33+
const char* input,
34+
int32_t inputLength,
35+
int32_t numCodePoints);
36+
37+
private:
38+
UnicodeUtil() = delete;
39+
};
40+
41+
} // namespace facebook::velox::parquet

velox/dwio/parquet/tests/common/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp)
15+
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp
16+
UnicodeUtilTest.cpp)
1617

1718
add_test(velox_dwio_parquet_common_test velox_dwio_parquet_common_test)
1819
target_link_libraries(

0 commit comments

Comments
 (0)