Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions velox/dwio/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ velox_add_library(
OnDemandUnitLoader.cpp
InputStream.cpp
IntDecoder.cpp
IcebergStatistics.cpp
MetadataFilter.cpp
Options.cpp
OutputStream.cpp
Expand Down
49 changes: 49 additions & 0 deletions velox/dwio/common/IcebergStatistics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/common/IcebergStatistics.h"

namespace facebook::velox::dwio::common {

folly::dynamic IcebergDataFileStatistics::toJson() const {
folly::dynamic json = folly::dynamic::object;
json["recordCount"] = numRecords;

auto mapToJson = [](const auto& map) {
folly::dynamic result = folly::dynamic::object;
for (const auto& pair : map) {
result[folly::to<std::string>(pair.first)] = pair.second;
}
return result;
};

json["columnSizes"] = mapToJson(columnsSizes);
json["valueCounts"] = mapToJson(valueCounts);
json["nullValueCounts"] = mapToJson(nullValueCounts);
json["nanValueCounts"] = mapToJson(nanValueCounts);
json["lowerBounds"] = mapToJson(lowerBounds);
json["upperBounds"] = mapToJson(upperBounds);

folly::dynamic arr = folly::dynamic::array;
for (const auto& offset : splitOffsets) {
arr.push_back(offset);
}
json["splitOffsets"] = arr;

return json;
}

} // namespace facebook::velox::dwio::common
42 changes: 42 additions & 0 deletions velox/dwio/common/IcebergStatistics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/json/dynamic.h>

namespace facebook::velox::dwio::common {

// Iceberg data_file struct fields.
struct IcebergDataFileStatistics {
int64_t numRecords;
std::unordered_map<int32_t, int64_t> columnsSizes;
std::unordered_map<int32_t, int64_t> valueCounts;
std::unordered_map<int32_t, int64_t> nullValueCounts;
std::unordered_map<int32_t, int64_t> nanValueCounts;
std::unordered_map<int32_t, std::string> lowerBounds;
std::unordered_map<int32_t, std::string> upperBounds;

// Split offsets for the data file. For example, all row
// group offsets in a Parquet file. Must be sorted ascending.
std::vector<uint64_t> splitOffsets;

IcebergDataFileStatistics() : numRecords(0) {}

folly::dynamic toJson() const;
};

} // namespace facebook::velox::dwio::common
16 changes: 16 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,15 @@ class ReaderOptions : public io::ReaderOptions {
bool allowEmptyFile_{false};
};

/// Settings for collecting Iceberg column statistics. Holds the Iceberg
/// source column ID (field_id) and whether to skip bounds collection for this
/// field. For nested field, it contains child fields.
struct IcebergStatsSettings {
int32_t id;
bool skipBounds;
std::vector<IcebergStatsSettings> children;
};

struct WriterOptions {
TypePtr schema{nullptr};
velox::memory::MemoryPool* memoryPool{nullptr};
Expand All @@ -706,6 +715,13 @@ struct WriterOptions {
std::string sessionTimezoneName;
bool adjustTimestampToTimezone{false};

/// This option controls whether collect iceberg data file statistics
/// during write. It will be set to true only by IcebergDataSink.
bool collectIcebergDataFileStats{false};

std::shared_ptr<std::vector<IcebergStatsSettings>> icebergStatsSetting{
nullptr};

// WriterOption implementations can implement this function to specify how to
// process format-specific session and connector configs.
virtual void processConfigs(
Expand Down
12 changes: 12 additions & 0 deletions velox/dwio/common/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <optional>
#include <string>

#include "velox/dwio/common/IcebergStatistics.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Statistics.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::dwio::common {
Expand Down Expand Up @@ -79,6 +82,11 @@ class Writer {
/// Data can no longer be written.
virtual void abort() = 0;

/// Return statistics based on each Iceberg data file
std::shared_ptr<IcebergDataFileStatistics> dataFileStats() const {
return dataFileStats_;
};

protected:
bool isRunning() const;
bool isFinishing() const;
Expand All @@ -92,6 +100,10 @@ class Writer {
static void checkStateTransition(State oldState, State newState);

State state_{State::kInit};
bool collectIcebergStats_{false};
std::shared_ptr<IcebergDataFileStatistics> dataFileStats_{nullptr};
std::shared_ptr<std::vector<IcebergStatsSettings>> icebergStatsSetting_{
nullptr};
};

FOLLY_ALWAYS_INLINE std::ostream& operator<<(
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ velox_add_library(
BloomFilter.cpp
XxHasher.cpp
LevelComparison.cpp
LevelConversion.cpp)
LevelConversion.cpp
UnicodeUtil.cpp)

velox_link_libraries(
velox_dwio_parquet_common
Expand Down
81 changes: 81 additions & 0 deletions velox/dwio/parquet/common/UnicodeUtil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/dwio/parquet/common/UnicodeUtil.h"

#include "velox/external/utf8proc/utf8proc.h"
#include "velox/functions/lib/string/StringImpl.h"

namespace facebook::velox::parquet {

std::string UnicodeUtil::truncateStringMin(
const char* input,
int32_t inputLength,
int32_t numCodePoints) {
auto length = functions::stringImpl::cappedByteLength<false>(
StringView(input, inputLength), numCodePoints);
return std::string(input, length);
}

std::string UnicodeUtil::truncateStringMax(
const char* input,
int32_t inputLength,
int32_t numCodePoints) {
auto truncatedLength = functions::stringImpl::cappedByteLength<false>(
StringView(input, inputLength), numCodePoints);

if (truncatedLength == inputLength) {
return std::string(input, inputLength);
}

// Try to increment the last code point.
for (auto i = numCodePoints - 1; i >= 0; --i) {
const char* current = input;
int32_t currentCodePoint = 0;

// Find the i-th code point position.
while (current < input + truncatedLength && currentCodePoint < i) {
int32_t charLength;
utf8proc_codepoint(current, input + truncatedLength, charLength);
current += charLength;
currentCodePoint++;
}

if (current >= input + truncatedLength)
continue;

int32_t charLength;
auto codePoint =
utf8proc_codepoint(current, input + truncatedLength, charLength);
auto nextCodePoint = codePoint + 1;

// Check if the incremented code point is valid.
if (nextCodePoint != 0 && utf8proc_codepoint_valid(nextCodePoint)) {
std::string result;
result.reserve(truncatedLength + 4);
result.assign(input, current - input);
char buffer[4];
auto bytesWritten = utf8proc_encode_char(
nextCodePoint, reinterpret_cast<utf8proc_uint8_t*>(buffer));
result.append(buffer, bytesWritten);
return result;
}
}

return std::string(input, truncatedLength);
}

} // namespace facebook::velox::parquet
41 changes: 41 additions & 0 deletions velox/dwio/parquet/common/UnicodeUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <optional>
#include <string>

namespace facebook::velox::parquet {

class UnicodeUtil {
public:
static std::string truncateStringMin(
const char* input,
int32_t inputLength,
int32_t numCodePoints);

static std::string truncateStringMax(
const char* input,
int32_t inputLength,
int32_t numCodePoints);

private:
UnicodeUtil() = delete;
};

} // namespace facebook::velox::parquet
3 changes: 2 additions & 1 deletion velox/dwio/parquet/tests/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp)
add_executable(velox_dwio_parquet_common_test LevelConversionTest.cpp
UnicodeUtilTest.cpp)

add_test(velox_dwio_parquet_common_test velox_dwio_parquet_common_test)
target_link_libraries(
Expand Down
Loading