Skip to content

Commit 570130a

Browse files
committed
Collect Iceberg data file statistics during insertion
1 parent ee7f137 commit 570130a

20 files changed

+648
-9
lines changed

velox/dwio/common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ velox_add_library(
4242
OnDemandUnitLoader.cpp
4343
InputStream.cpp
4444
IntDecoder.cpp
45+
IcebergStatistics.cpp
4546
MetadataFilter.cpp
4647
Options.cpp
4748
OutputStream.cpp
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/common/IcebergStatistics.h"
18+
19+
namespace facebook::velox::dwio::common {
20+
21+
folly::dynamic IcebergDataFileStatistics::toJson() const {
22+
folly::dynamic json = folly::dynamic::object;
23+
json["recordCount"] = numRecords;
24+
25+
auto mapToJson = [](const auto& map) {
26+
folly::dynamic result = folly::dynamic::object;
27+
for (const auto& pair : map) {
28+
result[folly::to<std::string>(pair.first)] = pair.second;
29+
}
30+
return result;
31+
};
32+
33+
json["columnSizes"] = mapToJson(columnsSizes);
34+
json["valueCounts"] = mapToJson(valueCounts);
35+
json["nullValueCounts"] = mapToJson(nullValueCounts);
36+
json["nanValueCounts"] = mapToJson(nanValueCounts);
37+
json["lowerBounds"] = mapToJson(lowerBounds);
38+
json["upperBounds"] = mapToJson(upperBounds);
39+
40+
return json;
41+
}
42+
43+
} // namespace facebook::velox::dwio::common
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/json/dynamic.h>
20+
21+
namespace facebook::velox::dwio::common {
22+
23+
// Iceberg data_file struct fields
24+
struct IcebergDataFileStatistics {
25+
int64_t numRecords;
26+
std::unordered_map<int32_t, int64_t> columnsSizes;
27+
std::unordered_map<int32_t, int64_t> valueCounts;
28+
std::unordered_map<int32_t, int64_t> nullValueCounts;
29+
std::unordered_map<int32_t, int64_t> nanValueCounts;
30+
std::unordered_map<int32_t, std::string> lowerBounds;
31+
std::unordered_map<int32_t, std::string> upperBounds;
32+
33+
IcebergDataFileStatistics() : numRecords(0) {}
34+
35+
folly::dynamic toJson() const;
36+
};
37+
38+
} // namespace facebook::velox::dwio::common

velox/dwio/common/Options.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,7 @@ struct WriterOptions {
705705

706706
std::string sessionTimezoneName;
707707
bool adjustTimestampToTimezone{false};
708+
std::shared_ptr<std::vector<int32_t>> sourceColumnIndices{nullptr};
708709

709710
// WriterOption implementations can implement this function to specify how to
710711
// process format-specific session and connector configs.

velox/dwio/common/Statistics.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <folly/Hash.h>
2020
#include <folly/container/F14Map.h>
21+
#include <folly/json/dynamic.h>
2122

2223
#include "velox/common/base/Exceptions.h"
2324
#include "velox/common/base/RuntimeMetrics.h"

velox/dwio/common/Writer.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <optional>
2222
#include <string>
2323

24+
#include "velox/dwio/common/IcebergStatistics.h"
25+
#include "velox/dwio/common/Statistics.h"
2426
#include "velox/vector/ComplexVector.h"
2527

2628
namespace facebook::velox::dwio::common {
@@ -79,6 +81,11 @@ class Writer {
7981
/// Data can no longer be written.
8082
virtual void abort() = 0;
8183

84+
/// Return statistics based on each Iceberg data file
85+
std::shared_ptr<IcebergDataFileStatistics> dataFileStats() const {
86+
return icebergDataFileStats_;
87+
};
88+
8289
protected:
8390
bool isRunning() const;
8491
bool isFinishing() const;
@@ -92,6 +99,8 @@ class Writer {
9299
static void checkStateTransition(State oldState, State newState);
93100

94101
State state_{State::kInit};
102+
std::shared_ptr<IcebergDataFileStatistics> icebergDataFileStats_;
103+
std::shared_ptr<std::vector<int32_t>> sourceColumnIndices_;
95104
};
96105

97106
FOLLY_ALWAYS_INLINE std::ostream& operator<<(

velox/dwio/parquet/common/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ velox_add_library(
1717
BloomFilter.cpp
1818
XxHasher.cpp
1919
LevelComparison.cpp
20-
LevelConversion.cpp)
20+
LevelConversion.cpp
21+
UnicodeUtil.cpp)
2122

2223
velox_link_libraries(
2324
velox_dwio_parquet_common
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/parquet/common/UnicodeUtil.h"
18+
19+
namespace facebook::velox::parquet {
20+
21+
std::string UnicodeUtil::truncateString(
22+
const std::string& input,
23+
int32_t length) {
24+
VELOX_CHECK_GT(length, 0, "Truncate length should be positive");
25+
return input.substr(
26+
0, functions::stringImpl::cappedByteLength<false>(input, length));
27+
}
28+
29+
std::optional<std::string> UnicodeUtil::truncateStringMin(
30+
const std::optional<std::string>& input,
31+
int32_t length) {
32+
if (!input.has_value()) {
33+
return std::nullopt;
34+
}
35+
return truncateString(input.value(), length);
36+
}
37+
38+
std::optional<std::string> UnicodeUtil::truncateStringMax(
39+
const std::optional<std::string>& input,
40+
int32_t length) {
41+
if (!input.has_value()) {
42+
return std::nullopt;
43+
}
44+
45+
const std::string& inputStr = input.value();
46+
const auto truncated = truncateString(inputStr, length);
47+
if (truncated.length() == inputStr.length()) {
48+
return inputStr;
49+
}
50+
51+
// Try to increment the last code point.
52+
for (auto i = length - 1; i >= 0; --i) {
53+
// Find the byte offset for the i-th code point.
54+
const char* data = truncated.data();
55+
const char* end = data + truncated.size();
56+
const char* current = data;
57+
int32_t currentCodePoint = 0;
58+
59+
while (current < end && currentCodePoint < i) {
60+
int32_t charLength;
61+
utf8proc_codepoint(current, end, charLength);
62+
current += charLength;
63+
currentCodePoint++;
64+
}
65+
66+
// Get the code point at this position.
67+
int32_t charLength;
68+
auto codePoint = utf8proc_codepoint(current, end, charLength);
69+
auto nextCodePoint = codePoint + 1;
70+
71+
// Check if the incremented code point is valid.
72+
if (nextCodePoint != 0 && utf8proc_codepoint_valid(nextCodePoint)) {
73+
auto result = truncated.substr(0, current - data);
74+
// Append the incremented code point.
75+
char buffer[4]; // UTF-8 uses up to 4 bytes per code point.
76+
auto bytesWritten = utf8proc_encode_char(
77+
nextCodePoint, reinterpret_cast<utf8proc_uint8_t*>(buffer));
78+
result.append(buffer, bytesWritten);
79+
return result;
80+
}
81+
}
82+
return std::nullopt;
83+
}
84+
85+
} // namespace facebook::velox::parquet
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/common/base/Exceptions.h"
20+
#include "velox/expression/VectorFunction.h"
21+
#include "velox/external/utf8proc/utf8proc.h"
22+
#include "velox/functions/lib/string/StringImpl.h"
23+
24+
namespace facebook::velox::parquet {
25+
26+
class UnicodeUtil {
27+
private:
28+
UnicodeUtil() = delete;
29+
30+
public:
31+
static bool isCharHighSurrogate(char16_t ch) {
32+
return (ch & 0xFC00) == 0xD800;
33+
}
34+
35+
// Truncates a string to the specified number of Unicode code points.
36+
static std::string truncateString(const std::string& input, int32_t length);
37+
38+
static std::optional<std::string> truncateStringMin(
39+
const std::optional<std::string>& input,
40+
int32_t length);
41+
42+
static std::optional<std::string> truncateStringMax(
43+
const std::optional<std::string>& input,
44+
int32_t length);
45+
};
46+
47+
} // namespace facebook::velox::parquet

velox/dwio/parquet/tests/common/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp)
15+
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp
16+
UnicodeUtilTest.cpp)
1617

1718
add_test(velox_dwio_parquet_common_test velox_dwio_parquet_common_test)
1819
target_link_libraries(

0 commit comments

Comments
 (0)