Skip to content

Commit 035fe7d

Browse files
committed
Collect Iceberg data file statistics during insertion
1 parent ee7f137 commit 035fe7d

20 files changed

+696
-9
lines changed

velox/dwio/common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ velox_add_library(
4242
OnDemandUnitLoader.cpp
4343
InputStream.cpp
4444
IntDecoder.cpp
45+
IcebergStatistics.cpp
4546
MetadataFilter.cpp
4647
Options.cpp
4748
OutputStream.cpp
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/common/IcebergStatistics.h"
18+
19+
namespace facebook::velox::dwio::common {
20+
21+
folly::dynamic IcebergDataFileStatistics::toJson() const {
22+
folly::dynamic json = folly::dynamic::object;
23+
json["recordCount"] = numRecords;
24+
25+
auto mapToJson = [](const auto& map) {
26+
folly::dynamic result = folly::dynamic::object;
27+
for (const auto& pair : map) {
28+
result[folly::to<std::string>(pair.first)] = pair.second;
29+
}
30+
return result;
31+
};
32+
33+
json["columnSizes"] = mapToJson(columnsSizes);
34+
json["valueCounts"] = mapToJson(valueCounts);
35+
json["nullValueCounts"] = mapToJson(nullValueCounts);
36+
json["nanValueCounts"] = mapToJson(nanValueCounts);
37+
json["lowerBounds"] = mapToJson(lowerBounds);
38+
json["upperBounds"] = mapToJson(upperBounds);
39+
40+
return json;
41+
}
42+
43+
} // namespace facebook::velox::dwio::common
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/json/dynamic.h>
20+
21+
namespace facebook::velox::dwio::common {
22+
23+
// Iceberg data_file struct fields.
24+
struct IcebergDataFileStatistics {
25+
int64_t numRecords;
26+
std::unordered_map<int32_t, int64_t> columnsSizes;
27+
std::unordered_map<int32_t, int64_t> valueCounts;
28+
std::unordered_map<int32_t, int64_t> nullValueCounts;
29+
std::unordered_map<int32_t, int64_t> nanValueCounts;
30+
std::unordered_map<int32_t, std::string> lowerBounds;
31+
std::unordered_map<int32_t, std::string> upperBounds;
32+
33+
IcebergDataFileStatistics() : numRecords(0) {}
34+
35+
folly::dynamic toJson() const;
36+
};
37+
38+
} // namespace facebook::velox::dwio::common

velox/dwio/common/Options.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,12 @@ struct WriterOptions {
706706
std::string sessionTimezoneName;
707707
bool adjustTimestampToTimezone{false};
708708

709+
// This option controls whether collect iceberg data file statistics
710+
// during write. It will be set to true only by IcebergDataSink.
711+
bool collectIcebergStats{false};
712+
std::shared_ptr<std::vector<std::pair<int32_t, bool>>> sourceColumnIndices{
713+
nullptr};
714+
709715
// WriterOption implementations can implement this function to specify how to
710716
// process format-specific session and connector configs.
711717
virtual void processConfigs(

velox/dwio/common/Statistics.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <folly/Hash.h>
2020
#include <folly/container/F14Map.h>
21+
#include <folly/json/dynamic.h>
2122

2223
#include "velox/common/base/Exceptions.h"
2324
#include "velox/common/base/RuntimeMetrics.h"

velox/dwio/common/Writer.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <optional>
2222
#include <string>
2323

24+
#include "velox/dwio/common/IcebergStatistics.h"
25+
#include "velox/dwio/common/Statistics.h"
2426
#include "velox/vector/ComplexVector.h"
2527

2628
namespace facebook::velox::dwio::common {
@@ -79,6 +81,11 @@ class Writer {
7981
/// Data can no longer be written.
8082
virtual void abort() = 0;
8183

84+
/// Return statistics based on each Iceberg data file
85+
std::shared_ptr<IcebergDataFileStatistics> dataFileStats() const {
86+
return dataFileStats_;
87+
};
88+
8289
protected:
8390
bool isRunning() const;
8491
bool isFinishing() const;
@@ -92,6 +99,10 @@ class Writer {
9299
static void checkStateTransition(State oldState, State newState);
93100

94101
State state_{State::kInit};
102+
bool collectIcebergStats_{false};
103+
std::shared_ptr<IcebergDataFileStatistics> dataFileStats_{nullptr};
104+
std::shared_ptr<std::vector<std::pair<int32_t, bool>>> sourceColumnIndices_{
105+
nullptr};
95106
};
96107

97108
FOLLY_ALWAYS_INLINE std::ostream& operator<<(

velox/dwio/parquet/common/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ velox_add_library(
1717
BloomFilter.cpp
1818
XxHasher.cpp
1919
LevelComparison.cpp
20-
LevelConversion.cpp)
20+
LevelConversion.cpp
21+
UnicodeUtil.cpp)
2122

2223
velox_link_libraries(
2324
velox_dwio_parquet_common
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/parquet/common/UnicodeUtil.h"
18+
19+
#include "velox/external/utf8proc/utf8proc.h"
20+
#include "velox/functions/lib/string/StringImpl.h"
21+
22+
namespace facebook::velox::parquet {
23+
24+
std::string UnicodeUtil::truncateStringMin(
25+
const char* input,
26+
int32_t inputLength,
27+
int32_t numCodePoints) {
28+
auto length = functions::stringImpl::cappedByteLength<false>(
29+
StringView(input, inputLength), numCodePoints);
30+
return std::string(input, length);
31+
}
32+
33+
std::string UnicodeUtil::truncateStringMax(
34+
const char* input,
35+
int32_t inputLength,
36+
int32_t numCodePoints) {
37+
auto truncatedLength = functions::stringImpl::cappedByteLength<false>(
38+
StringView(input, inputLength), numCodePoints);
39+
40+
if (truncatedLength == inputLength) {
41+
return std::string(input, inputLength);
42+
}
43+
44+
// Try to increment the last code point.
45+
for (auto i = numCodePoints - 1; i >= 0; --i) {
46+
const char* current = input;
47+
int32_t currentCodePoint = 0;
48+
49+
// Find the i-th code point position.
50+
while (current < input + truncatedLength && currentCodePoint < i) {
51+
int32_t charLength;
52+
utf8proc_codepoint(current, input + truncatedLength, charLength);
53+
current += charLength;
54+
currentCodePoint++;
55+
}
56+
57+
if (current >= input + truncatedLength)
58+
continue;
59+
60+
int32_t charLength;
61+
auto codePoint =
62+
utf8proc_codepoint(current, input + truncatedLength, charLength);
63+
auto nextCodePoint = codePoint + 1;
64+
65+
// Check if the incremented code point is valid.
66+
if (nextCodePoint != 0 && utf8proc_codepoint_valid(nextCodePoint)) {
67+
std::string result;
68+
result.reserve(truncatedLength + 4);
69+
result.assign(input, current - input);
70+
char buffer[4];
71+
auto bytesWritten = utf8proc_encode_char(
72+
nextCodePoint, reinterpret_cast<utf8proc_uint8_t*>(buffer));
73+
result.append(buffer, bytesWritten);
74+
return result;
75+
}
76+
}
77+
78+
// Return truncated string without intermediate string creation
79+
return std::string(input, truncatedLength);
80+
}
81+
82+
} // namespace facebook::velox::parquet
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <cstdint>
20+
#include <optional>
21+
#include <string>
22+
23+
namespace facebook::velox::parquet {
24+
25+
class UnicodeUtil {
26+
public:
27+
static bool isCharHighSurrogate(char16_t ch) {
28+
return (ch & 0xFC00) == 0xD800;
29+
}
30+
31+
static std::string truncateStringMin(
32+
const char* input,
33+
int32_t inputLength,
34+
int32_t numCodePoints);
35+
36+
static std::string truncateStringMax(
37+
const char* input,
38+
int32_t inputLength,
39+
int32_t numCodePoints);
40+
41+
private:
42+
UnicodeUtil() = delete;
43+
};
44+
45+
} // namespace facebook::velox::parquet

velox/dwio/parquet/tests/common/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp)
15+
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp
16+
UnicodeUtilTest.cpp)
1617

1718
add_test(velox_dwio_parquet_common_test velox_dwio_parquet_common_test)
1819
target_link_libraries(

0 commit comments

Comments
 (0)