diff --git a/CMakeLists.txt b/CMakeLists.txt index 98f2ada4e..35b9d64a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,9 @@ cmake_minimum_required(VERSION 3.5...3.29) # Set extension name here set(TARGET_NAME iceberg) -project(${TARGET_NAME}) + +set(EXTENSION_NAME ${TARGET_NAME}_extension) +set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) set(CMAKE_CXX_STANDARD 14 CACHE STRING "C++ standard") set(CMAKE_CXX_STANDARD_REQUIRED True) @@ -10,7 +12,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) set(EXTENSION_NAME ${TARGET_NAME}_extension) include_directories(src/include) - set(EXTENSION_SOURCES src/iceberg_extension.cpp src/iceberg_functions.cpp diff --git a/extension_config.cmake b/extension_config.cmake index a65e0f795..66a164e2c 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -1,3 +1,9 @@ +# Extension from this repo +duckdb_extension_load(iceberg + SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} + LOAD_TESTS +) + # This file is included by DuckDB's build system. It specifies which extension to load if (NOT EMSCRIPTEN) duckdb_extension_load(avro @@ -7,12 +13,6 @@ duckdb_extension_load(avro ) endif() -# Extension from this repo -duckdb_extension_load(iceberg - SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} - LOAD_TESTS -) - if (NOT EMSCRIPTEN) duckdb_extension_load(tpch) duckdb_extension_load(icu) diff --git a/scripts/data_generators/generate_data.py b/scripts/data_generators/generate_data.py index 87e5df5e5..fa9c3cd5e 100644 --- a/scripts/data_generators/generate_data.py +++ b/scripts/data_generators/generate_data.py @@ -31,6 +31,8 @@ print(f"Generating for '{target}'") for test in tests: print(f"Generating test '{test.table}'") + if test.table == "spark_written_upper_lower_bounds": + continue test.generate(con) if __name__ == "__main__": diff --git a/scripts/data_generators/tests/spark_written_upper_lower_bounds/__init__.py b/scripts/data_generators/tests/spark_written_upper_lower_bounds/__init__.py new file mode 100644 index 000000000..8f4f6e914 --- /dev/null +++ b/scripts/data_generators/tests/spark_written_upper_lower_bounds/__init__.py @@ -0,0 +1,52 @@ +from scripts.data_generators.tests.base import IcebergTest +import pathlib +from pyspark.sql import SparkSession +import pyspark +import pyspark.sql +from pyspark import SparkContext +import os + +SPARK_RUNTIME_PATH = os.path.join(os.path.dirname(__file__), '..', '..', 'iceberg-spark-runtime-3.5_2.12-1.9.0.jar') + +@IcebergTest.register() +class Test(IcebergTest): + def __init__(self): + path = pathlib.PurePath(__file__) + super().__init__(path.parent.name) + + def setup(self, con): + os.environ["PYSPARK_SUBMIT_ARGS"] = ( + "--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0,org.apache.iceberg:iceberg-aws-bundle:1.9.0 pyspark-shell" + ) + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_ACCESS_KEY_ID"] = "admin" + os.environ["AWS_SECRET_ACCESS_KEY"] = "password" + if SparkContext._active_spark_context is not None: + SparkContext._active_spark_context.stop() + + spark = ( + SparkSession.builder.appName("DuckDB REST Integration test") + .master("local[1]") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + ) + .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED") + .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED") + .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.demo.type", "rest") + .config("spark.sql.catalog.demo.uri", "http://127.0.0.1:8181") + .config("spark.sql.catalog.demo.warehouse", "s3://warehouse/wh/") + .config("spark.sql.catalog.demo.s3.endpoint", "http://127.0.0.1:9000") + .config("spark.sql.catalog.demo.s3.path-style-access", "true") + .config('spark.driver.memory', '10g') + .config('spark.sql.session.timeZone', 'UTC') + .config("spark.sql.catalogImplementation", "in-memory") + .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config('spark.jars', SPARK_RUNTIME_PATH) + .getOrCreate() + ) + spark.sql("USE demo") + spark.sql("CREATE NAMESPACE IF NOT EXISTS default") + con.con = spark + diff --git a/scripts/data_generators/tests/spark_written_upper_lower_bounds/q00.sql b/scripts/data_generators/tests/spark_written_upper_lower_bounds/q00.sql new file mode 100644 index 000000000..9aff1696b --- /dev/null +++ b/scripts/data_generators/tests/spark_written_upper_lower_bounds/q00.sql @@ -0,0 +1,21 @@ +CREATE OR REPLACE TABLE default.spark_written_upper_lower_bounds ( + int_type INTEGER, + bigint_type BIGINT, + varchar_type VARCHAR(100), + bool_type BOOLEAN, + float_type FLOAT, + double_type DOUBLE, + decimal_type_18_3 DECIMAL(18, 3), + date_type DATE, +-- time_type TIME, + timestamp_type TIMESTAMP, +-- timestamp_tz_type TIMESTAMPTZ, +-- uuid_type UUID, + binary_type BINARY +) +USING ICEBERG +TBLPROPERTIES ( + 'format-version' = '2', + 'write.update.mode' = 'copy-on-write', + 'write.target-file-size-bytes' = '2132' +); \ No newline at end of file diff --git a/scripts/data_generators/tests/spark_written_upper_lower_bounds/q01.sql b/scripts/data_generators/tests/spark_written_upper_lower_bounds/q01.sql new file mode 100644 index 000000000..bb1a83d74 --- /dev/null +++ b/scripts/data_generators/tests/spark_written_upper_lower_bounds/q01.sql @@ -0,0 +1,46 @@ +INSERT INTO default.spark_written_upper_lower_bounds VALUES +-- Lower bounds +( + -2147483648, -- int_type (Integer min) + -9223372036854775808, -- bigint_type (Long min) + '', -- varchar_type (empty string as lower bound) + false, -- bool_type + -3.4028235E38, -- float_type (Float min) + -1.7976931348623157E308, -- double_type (Double min) + -9999999999999.999, -- decimal(18,3) lower bound + DATE '0001-01-01', -- date_type (Spark's min date) +-- TIME '00:00:00', -- time_type + TIMESTAMP '0001-01-01 00:00:00', -- timestamp_type +-- TIMESTAMPTZ '0001-01-01 00:00:00+00',-- timestamp_tz_type +-- UUID '00000000-0000-0000-0000-000000000000', -- uuid_type + X'' -- binary_type (empty binary) +), +-- Upper bounds +( + 2147483647, -- int_type (Integer max) + 9223372036854775807, -- bigint_type (Long max) + RPAD('Z', 100, 'Z'), -- varchar_type (max-length string) + true, -- bool_type + 3.4028235E38, -- float_type (Float max) + 1.7976931348623157E308, -- double_type (Double max) + 9999999999999.999, -- decimal(18,3) upper bound + DATE '9999-12-31', -- date_type +-- TIME '23:59:59.999999', -- time_type (microsecond max) + TIMESTAMP '9999-12-31 23:59:59.999999', -- timestamp_type +-- TIMESTAMPTZ '9999-12-31 23:59:59.999999+00', -- timestamp_tz_type +-- UUID 'ffffffff-ffff-ffff-ffff-ffffffffffff', -- uuid_type + X'FFFFFFFF' -- binary_type (example max-ish binary) +), +-- NULL values +( + NULL, -- int_type (Integer max) + NULL, -- bigint_type (Long max) + NULL, -- varchar_type (max-length string) + NULL, -- bool_type + NULL, -- float_type (Float max) + NULL, -- double_type (Double max) + NULL, -- decimal(18,3) upper bound + NULL, -- date_type + NULL, -- timestamp_type + NULL -- binary_type (example max-ish binary) +); diff --git a/src/catalog_api.cpp b/src/catalog_api.cpp index fd1029a67..3383bca02 100644 --- a/src/catalog_api.cpp +++ b/src/catalog_api.cpp @@ -1,6 +1,4 @@ #include "catalog_api.hpp" -#include "include/catalog_api.hpp" - #include "catalog_utils.hpp" #include "iceberg_logging.hpp" #include "storage/irc_catalog.hpp" @@ -14,9 +12,7 @@ #include "duckdb/common/error_data.hpp" #include "duckdb/common/http_util.hpp" #include "duckdb/common/exception/http_exception.hpp" -#include "include/storage/irc_authorization.hpp" -#include "include/storage/irc_catalog.hpp" - +#include "storage/irc_authorization.hpp" #include "rest_catalog/objects/list.hpp" using namespace duckdb_yyjson; diff --git a/src/iceberg_manifest.cpp b/src/iceberg_manifest.cpp index f2d4e4cb6..ad029d90c 100644 --- a/src/iceberg_manifest.cpp +++ b/src/iceberg_manifest.cpp @@ -3,6 +3,7 @@ #include "duckdb/storage/caching_file_system.hpp" #include "catalog_utils.hpp" +#include "iceberg_value.hpp" #include "storage/iceberg_table_information.hpp" namespace duckdb { @@ -51,8 +52,21 @@ Value IcebergManifestEntry::ToDataFileStruct(const LogicalType &type) const { for (auto &child : upper_bounds) { upper_bounds_values.push_back(Value::STRUCT({{"key", child.first}, {"value", child.second}})); } + children.push_back(Value::MAP(LogicalType::STRUCT(bounds_types), upper_bounds_values)); + child_list_t null_value_count_types; + null_value_count_types.emplace_back("key", LogicalType::INTEGER); + null_value_count_types.emplace_back("value", LogicalType::BIGINT); + + vector null_value_counts_values; + // upper bounds: map<129: int, 130: binary> - 128 + for (auto &child : null_value_counts) { + null_value_counts_values.push_back(Value::STRUCT({{"key", child.first}, {"value", child.second}})); + } + + children.push_back(Value::MAP(LogicalType::STRUCT(null_value_count_types), null_value_counts_values)); + return Value::STRUCT(type, children); } @@ -335,6 +349,47 @@ idx_t WriteToFile(IcebergTableInformation &table_info, const IcebergManifestFile yyjson_mut_obj_add_uint(doc, val_obj, "id", UPPER_BOUNDS_VALUE); } + // null_value_counts_struct + // lower bounds struct + child_list_t null_value_counts_fields; + null_value_counts_fields.emplace_back("key", LogicalType::INTEGER); + null_value_counts_fields.emplace_back("value", LogicalType::BIGINT); + { + // child_list_t null_value_counts_struct; + // upper bounds: map<121: int, 122: binary> + children.emplace_back("null_value_counts", LogicalType::MAP(LogicalType::STRUCT(null_value_counts_fields))); + + child_list_t null_values_counts_record_field_ids; + null_values_counts_record_field_ids.emplace_back("__duckdb_field_id", Value::INTEGER(NULL_VALUE_COUNTS)); + null_values_counts_record_field_ids.emplace_back("key", Value::INTEGER(NULL_VALUE_COUNTS_KEY)); + null_values_counts_record_field_ids.emplace_back("value", Value::INTEGER(NULL_VALUE_COUNTS_VALUE)); + + data_file_field_ids.emplace_back("null_value_counts", Value::STRUCT(null_values_counts_record_field_ids)); + auto field_obj = yyjson_mut_arr_add_obj(doc, child_fields_arr); + yyjson_mut_obj_add_uint(doc, field_obj, "id", NULL_VALUE_COUNTS); + yyjson_mut_obj_add_strcpy(doc, field_obj, "name", "null_value_counts"); + yyjson_mut_obj_add_bool(doc, field_obj, "required", false); + + auto null_value_counts_type_struct = yyjson_mut_obj_add_obj(doc, field_obj, "type"); + yyjson_mut_obj_add_strcpy(doc, null_value_counts_type_struct, "type", "array"); + auto items_obj = yyjson_mut_obj_add_obj(doc, null_value_counts_type_struct, "items"); + yyjson_mut_obj_add_strcpy(doc, items_obj, "type", "record"); + yyjson_mut_obj_add_strcpy( + doc, items_obj, "name", + StringUtil::Format("k%d_k%d", NULL_VALUE_COUNTS_KEY, NULL_VALUE_COUNTS_VALUE).c_str()); + auto record_fields_arr = yyjson_mut_obj_add_arr(doc, items_obj, "fields"); + + auto key_obj = yyjson_mut_arr_add_obj(doc, record_fields_arr); + yyjson_mut_obj_add_strcpy(doc, key_obj, "name", "key"); + yyjson_mut_obj_add_strcpy(doc, key_obj, "type", "int"); + yyjson_mut_obj_add_uint(doc, key_obj, "id", NULL_VALUE_COUNTS_KEY); + + auto val_obj = yyjson_mut_arr_add_obj(doc, record_fields_arr); + yyjson_mut_obj_add_strcpy(doc, val_obj, "name", "value"); + yyjson_mut_obj_add_strcpy(doc, val_obj, "type", "binary"); + yyjson_mut_obj_add_uint(doc, val_obj, "id", NULL_VALUE_COUNTS_VALUE); + } + { // data_file: struct(...) - 2 names.push_back("data_file"); diff --git a/src/iceberg_value.cpp b/src/iceberg_value.cpp index f85671b52..8821d1a77 100644 --- a/src/iceberg_value.cpp +++ b/src/iceberg_value.cpp @@ -1,7 +1,11 @@ #include "iceberg_value.hpp" +#include "utf8proc_wrapper.hpp" #include "duckdb/common/helper.hpp" #include "duckdb/common/types/uuid.hpp" #include "duckdb/common/bswap.hpp" +#include "duckdb/common/types/date.hpp" +#include "duckdb/common/types/timestamp.hpp" +#include "duckdb/common/types/value.hpp" namespace duckdb { @@ -244,4 +248,171 @@ DeserializeResult IcebergValue::DeserializeValue(const string_t &blob, const Log return DeserializeError(blob, type); } +std::string truncate_and_increment_utf8(const std::string &input) { + std::vector bytes(input.begin(), input.end()); + // Truncate to first 16 bytes + idx_t n = std::min(16, bytes.size()); + if (n == 0) { + return std::string(bytes.begin(), bytes.end()); + } + bytes.resize(n); + idx_t i = n - 1; + while (((bytes[i] & 0xC0) == 0x80) && i > 0) { + // skip continuation bytes + --i; + } + bytes[i]++; + // Convert back to string + return std::string(bytes.begin(), bytes.end()); +} + +std::vector HexStringToBytes(const std::string &hex) { + std::vector bytes; + D_ASSERT(hex.size() % 2 == 0); + bytes.reserve(hex.size() / 2); + + for (size_t i = 0; i < hex.size(); i += 2) { + uint8_t byte = std::stoi(hex.substr(i, 2), nullptr, 16); + bytes.push_back(byte); + } + return bytes; +} + +SerializeResult IcebergValue::SerializeValue(Value input_value, LogicalType &column_type) { + switch (column_type.id()) { + case LogicalTypeId::INTEGER: { + int32_t val = input_value.GetValue(); + auto serialized_const_data_ptr = const_data_ptr_cast(&val); + auto serialized_val = Value::BLOB(serialized_const_data_ptr, sizeof(int32_t)); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::BIGINT: { + int64_t val = input_value.GetValue(); + auto serialized_const_data_ptr = const_data_ptr_cast(&val); + auto serialized_val = Value::BLOB(serialized_const_data_ptr, sizeof(int64_t)); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::VARCHAR: { + string val = truncate_and_increment_utf8(input_value.GetValue()); + + const_data_ptr_t string_data = reinterpret_cast(&val); + auto serialized_val = Value::BLOB(string_data, val.size()); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::FLOAT: { + float val = input_value.GetValue(); + auto serialized_const_data_ptr = const_data_ptr_cast(&val); + auto serialized_val = Value::BLOB(serialized_const_data_ptr, sizeof(float)); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::DOUBLE: { + // get const data ptr for the string value + double val = input_value.GetValue(); + auto serialized_const_data_ptr = const_data_ptr_cast(&val); + auto serialized_val = Value::BLOB(serialized_const_data_ptr, sizeof(double)); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::DATE: { + // get const data ptr for the string value + date_t val = input_value.GetValue(); + int32_t epoch_days = Date::EpochDays(val); + auto serialized_const_data_ptr = const_data_ptr_cast(&epoch_days); + auto serialized_val = Value::BLOB(serialized_const_data_ptr, sizeof(int32_t)); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::TIMESTAMP: { + // get const data ptr for the string value + timestamp_t val = input_value.GetValue(); + int64_t micros_since_epoch = Timestamp::GetEpochMicroSeconds(val); + auto serialized_const_data_ptr = const_data_ptr_cast(µs_since_epoch); + auto serialized_val = Value::BLOB(serialized_const_data_ptr, sizeof(int64_t)); + auto ret = SerializeResult(column_type, serialized_val); + return ret; + } + case LogicalTypeId::DECIMAL: { + auto decimal_as_string = input_value.GetValue(); + auto dec_pos = decimal_as_string.find("."); + // remove the decimal point + decimal_as_string.erase(dec_pos, 1); + auto unscaled = Value(decimal_as_string).DefaultCastAs(LogicalType::HUGEINT); + auto unscaled_hugeint = unscaled.GetValue(); + vector big_endian_bytes; + bool needs_positive_padding = false; + bool needs_negative_padding = false; + auto huge_int_bytes = sizeof(hugeint_t); + bool first_val = false; + bool is_negative = unscaled_hugeint < 0; + for (int i = 0; i < huge_int_bytes; i++) { + uint8_t get_8 = + static_cast(static_cast(unscaled_hugeint >> ((huge_int_bytes - i - 1) * 8))); + if (is_negative && (get_8 == 0xFF) && !first_val) { + // number is negative, these are sign-extending bytes that are not important + continue; + } + if (!is_negative && (get_8 == 0x00) && !first_val) { + // number is positive and these sign-extending bytes that are not important + continue; + } + if (!first_val) { + // check if we need padding + if (is_negative && ((get_8 & 0x80) != 0x80)) { + // negative padding is needed. the number is negative + // but the most significatn byte is not 1 + needs_negative_padding = true; + } else if (!is_negative && ((get_8 & 0x80) == 0x80)) { + // yes padding needed, number is positive but most significant byte is 1, + // sign extend with one byte of 0x00 + needs_positive_padding = true; + } + first_val = true; + } + big_endian_bytes.push_back(get_8); + } + if (needs_negative_padding) { + big_endian_bytes.push_back(0xFF); + } + if (needs_positive_padding) { + big_endian_bytes.push_back(0x00); + } + + // reverse the bytes to get them in big-endian order + std::reverse(big_endian_bytes.begin(), big_endian_bytes.end()); + + hugeint_t result = 0; + int n = big_endian_bytes.size(); + D_ASSERT(n <= 16); + for (int i = 0; i < n; i++) { + result |= static_cast(big_endian_bytes[i]) << (8 * (n - i - 1)); + } + + auto serialized_const_data_ptr = const_data_ptr_cast(&result); + auto ret_val = Value::BLOB(serialized_const_data_ptr, big_endian_bytes.size()); + auto ret = SerializeResult(column_type, ret_val); + return ret; + } + case LogicalTypeId::BLOB: + case LogicalTypeId::BIT: { + // get const data ptr for the string value + auto val = input_value.GetValue(); + auto bytes = HexStringToBytes(val); + auto serialized_const_data_ptr = const_data_ptr_cast(bytes.data()); + auto ret_val = Value::BLOB(serialized_const_data_ptr, bytes.size()); + auto ret = SerializeResult(column_type, ret_val); + return ret; + } + // boolean does not yet return proper values so we skip + case LogicalTypeId::BOOLEAN: + default: + break; + } + // return no serialized value, also no error. + return SerializeResult(); +} + } // namespace duckdb diff --git a/src/include/iceberg_value.hpp b/src/include/iceberg_value.hpp index 06b07d626..d21fe99ae 100644 --- a/src/include/iceberg_value.hpp +++ b/src/include/iceberg_value.hpp @@ -19,11 +19,11 @@ struct DeserializeResult { return !error.empty(); } const string GetError() const { - D_ASSERT(!error.empty()); + D_ASSERT(HasError()); return error; } const Value &GetValue() const { - D_ASSERT(error.empty()); + D_ASSERT(!HasError()); return value; } @@ -32,12 +32,49 @@ struct DeserializeResult { string error; }; +struct SerializeResult { +public: + SerializeResult(LogicalType &column_type, Value serialized_value) + : original_type(column_type), value(serialized_value) { + } + + SerializeResult() : original_type(LogicalType::INVALID), value(Value()) { + } + + explicit SerializeResult(const string &error) : error(error) { + } + +public: + bool HasError() const { + return !error.empty(); + } + string GetError() const { + D_ASSERT(HasError()); + return error; + } + const Value &GetValue() const { + D_ASSERT(!HasError()); + D_ASSERT(value.type() == LogicalType::BLOB); + return value; + } + // some returned stats are known to be incorrect. For that we do not serialize them + bool HasValue() const { + return !value.IsNull(); + } + +public: + string error; + LogicalType original_type; + Value value; +}; + struct IcebergValue { public: IcebergValue() = delete; public: static DeserializeResult DeserializeValue(const string_t &blob, const LogicalType &target); + static SerializeResult SerializeValue(Value input_value, LogicalType &column_type); }; } // namespace duckdb diff --git a/src/include/storage/iceberg_insert.hpp b/src/include/storage/iceberg_insert.hpp index 0ce4ebb21..ced000b8e 100644 --- a/src/include/storage/iceberg_insert.hpp +++ b/src/include/storage/iceberg_insert.hpp @@ -37,6 +37,39 @@ class IcebergInsertGlobalState : public GlobalSinkState { atomic insert_count; }; +struct IcebergColumnStats { + explicit IcebergColumnStats(LogicalType type_p) : type(std::move(type_p)) { + } + + // Copy constructor + IcebergColumnStats(const IcebergColumnStats &other); + IcebergColumnStats &operator=(const IcebergColumnStats &other); + IcebergColumnStats(IcebergColumnStats &&other) noexcept = default; + IcebergColumnStats &operator=(IcebergColumnStats &&other) noexcept = default; + + LogicalType type; + string min; + string max; + idx_t null_count = 0; + idx_t column_size_bytes = 0; + bool contains_nan = false; + bool has_null_count = false; + bool has_min = false; + bool has_max = false; + bool any_valid = true; + bool has_contains_nan = false; + bool has_column_size_bytes = false; + +public: + unique_ptr ToStats() const; + void MergeStats(const IcebergColumnStats &new_stats); + IcebergColumnStats Copy() const; + +private: + unique_ptr CreateNumericStats() const; + unique_ptr CreateStringStats() const; +}; + class IcebergInsert : public PhysicalOperator { public: //! INSERT INTO @@ -78,6 +111,9 @@ class IcebergInsert : public PhysicalOperator { static PhysicalOperator &PlanInsert(ClientContext &context, PhysicalPlanGenerator &planner, ICTableEntry &table); static vector GetInsertManifestEntries(IcebergInsertGlobalState &global_state); + static IcebergColumnStats ParseColumnStatsNew(const LogicalType &type, const vector &col_stats); + static void AddWrittenFiles(IcebergInsertGlobalState &global_state, DataChunk &chunk, + optional_ptr table); bool IsSink() const override { return true; diff --git a/src/metadata/iceberg_table_schema.cpp b/src/metadata/iceberg_table_schema.cpp index 409e789de..ffc22fd4e 100644 --- a/src/metadata/iceberg_table_schema.cpp +++ b/src/metadata/iceberg_table_schema.cpp @@ -2,6 +2,7 @@ #include "iceberg_metadata.hpp" #include "iceberg_utils.hpp" +#include "duckdb/common/exception.hpp" #include "rest_catalog/objects/list.hpp" namespace duckdb { diff --git a/src/storage/iceberg_insert.cpp b/src/storage/iceberg_insert.cpp index fc6061ce4..9cb909c95 100644 --- a/src/storage/iceberg_insert.cpp +++ b/src/storage/iceberg_insert.cpp @@ -1,4 +1,4 @@ -#include "../include/storage/iceberg_insert.hpp" +#include "storage/iceberg_insert.hpp" #include "storage/iceberg_insert.hpp" #include "storage/irc_catalog.hpp" #include "storage/irc_transaction.hpp" @@ -7,6 +7,7 @@ #include "metadata/iceberg_column_definition.hpp" #include "iceberg_multi_file_list.hpp" +#include "iceberg_value.hpp" #include "utils/iceberg_type.hpp" #include "duckdb/common/sort/partition_state.hpp" #include "duckdb/catalog/catalog_entry/copy_function_catalog_entry.hpp" @@ -100,44 +101,44 @@ static vector ParseQuotedList(const string &input, char list_separator) return result; } -struct IcebergColumnStats { - explicit IcebergColumnStats() = default; - - string min; - string max; - idx_t null_count = 0; - idx_t column_size_bytes = 0; - bool contains_nan = false; - bool has_null_count = false; - bool has_min = false; - bool has_max = false; - bool any_valid = true; - bool has_contains_nan = false; -}; - -static IcebergColumnStats ParseColumnStats(const vector col_stats) { - IcebergColumnStats column_stats; +static void AddToColDefMap(case_insensitive_map_t> &name_to_coldef, + string col_name_prefix, optional_ptr column_def) { + string column_name = column_def->name; + if (!col_name_prefix.empty()) { + column_name = col_name_prefix + "." + column_def->name; + } + if (column_def->IsIcebergPrimitiveType()) { + name_to_coldef.emplace(column_name, column_def.get()); + } else { + for (auto &child : column_def->children) { + AddToColDefMap(name_to_coldef, column_name, child.get()); + } + } +} + +IcebergColumnStats IcebergInsert::ParseColumnStatsNew(const LogicalType &type, const vector &col_stats) { + IcebergColumnStats column_stats(type); for (idx_t stats_idx = 0; stats_idx < col_stats.size(); stats_idx++) { auto &stats_children = StructValue::GetChildren(col_stats[stats_idx]); auto &stats_name = StringValue::Get(stats_children[0]); - auto &stats_value = StringValue::Get(stats_children[1]); if (stats_name == "min") { D_ASSERT(!column_stats.has_min); - column_stats.min = stats_value; + column_stats.min = StringValue::Get(stats_children[1]); column_stats.has_min = true; } else if (stats_name == "max") { D_ASSERT(!column_stats.has_max); - column_stats.max = stats_value; + column_stats.max = StringValue::Get(stats_children[1]); column_stats.has_max = true; } else if (stats_name == "null_count") { D_ASSERT(!column_stats.has_null_count); column_stats.has_null_count = true; - column_stats.null_count = StringUtil::ToUnsigned(stats_value); + column_stats.null_count = StringUtil::ToUnsigned(StringValue::Get(stats_children[1])); } else if (stats_name == "column_size_bytes") { - column_stats.column_size_bytes = StringUtil::ToUnsigned(stats_value); + column_stats.has_column_size_bytes = true; + column_stats.column_size_bytes = StringUtil::ToUnsigned(StringValue::Get(stats_children[1])); } else if (stats_name == "has_nan") { column_stats.has_contains_nan = true; - column_stats.contains_nan = stats_value == "true"; + column_stats.contains_nan = StringValue::Get(stats_children[1]) == "true"; } else { throw NotImplementedException("Unsupported stats type \"%s\" in IcebergInsert::Sink()", stats_name); } @@ -145,23 +146,8 @@ static IcebergColumnStats ParseColumnStats(const vector col_stats) { return column_stats; } -static void AddToColDefMap(case_insensitive_map_t> &name_to_coldef, - string col_name_prefix, optional_ptr column_def) { - string column_name = column_def->name; - if (!col_name_prefix.empty()) { - column_name = col_name_prefix + "." + column_def->name; - } - if (column_def->IsIcebergPrimitiveType()) { - name_to_coldef.emplace(column_name, column_def.get()); - } else { - for (auto &child : column_def->children) { - AddToColDefMap(name_to_coldef, column_name, child.get()); - } - } -} - -static void AddWrittenFiles(IcebergInsertGlobalState &global_state, DataChunk &chunk, - optional_ptr table) { +void IcebergInsert::AddWrittenFiles(IcebergInsertGlobalState &global_state, DataChunk &chunk, + optional_ptr table) { D_ASSERT(table); // grab lock for written files vector lock_guard guard(global_state.lock); @@ -169,6 +155,7 @@ static void AddWrittenFiles(IcebergInsertGlobalState &global_state, DataChunk &c auto partition_id = ic_table.table_info.table_metadata.default_spec_id; for (idx_t r = 0; r < chunk.size(); r++) { IcebergManifestEntry data_file; + auto &ic_table = table->Cast(); data_file.file_path = chunk.GetValue(0, r).GetValue(); data_file.record_count = static_cast(chunk.GetValue(1, r).GetValue()); data_file.file_size_in_bytes = static_cast(chunk.GetValue(2, r).GetValue()); @@ -201,17 +188,42 @@ static void AddWrittenFiles(IcebergInsertGlobalState &global_state, DataChunk &c auto &col_name = StringValue::Get(struct_children[0]); auto &col_stats = MapValue::GetChildren(struct_children[1]); auto column_names = ParseQuotedList(col_name, '.'); - auto stats = ParseColumnStats(col_stats); auto normalized_col_name = StringUtil::Join(column_names, "."); - auto ic_column_info = column_info.find(normalized_col_name); - D_ASSERT(ic_column_info != column_info.end()); - if (ic_column_info->second->required && stats.has_null_count && stats.null_count > 0) { + auto ic_column_info_it = column_info.find(normalized_col_name); + D_ASSERT(ic_column_info_it != column_info.end()); + auto column_info = ic_column_info_it->second; + auto stats = ParseColumnStatsNew(column_info->type, col_stats); + if (column_info->required && stats.has_null_count && stats.null_count > 0) { throw ConstraintException("NOT NULL constraint failed: %s.%s", table->name, normalized_col_name); } + // go through stats and add upper and lower bounds + // Do serialization of values here in case we read transaction updates + if (stats.has_min) { + auto serialized_value = IcebergValue::SerializeValue(stats.min, column_info->type); + if (serialized_value.HasError()) { + throw InvalidConfigurationException(serialized_value.GetError()); + } else if (serialized_value.HasValue()) { + data_file.lower_bounds[column_info->id] = serialized_value.GetValue(); + } + } + if (stats.has_max) { + auto serialized_value = IcebergValue::SerializeValue(stats.max, column_info->type); + if (serialized_value.HasError()) { + throw InvalidConfigurationException(serialized_value.GetError()); + } else if (serialized_value.HasValue()) { + data_file.upper_bounds[column_info->id] = serialized_value.GetValue(); + } + } + if (stats.has_column_size_bytes) { + data_file.column_sizes[column_info->id] = stats.column_size_bytes; + } + if (stats.has_null_count) { + data_file.null_value_counts[column_info->id] = stats.null_count; + } - //! TODO: convert 'stats' into 'data_file.lower_bounds', upper_bounds, value_counts, null_value_counts, - //! nan_value_counts ... + //! nan_value_counts won't work, we can only indicate if they exist. + //! unsure what we should do here } //! TODO: extract the partition info diff --git a/test/python/test_pyiceberg_read.py b/test/python/test_pyiceberg_read.py index 5c313dd5e..48b103589 100644 --- a/test/python/test_pyiceberg_read.py +++ b/test/python/test_pyiceberg_read.py @@ -127,3 +127,17 @@ def test_pyiceberg_read(self, rest_catalog): {'a': 100}, {'a': 100}, ] + + +@pytest.mark.skipif( + os.getenv('ICEBERG_SERVER_AVAILABLE', None) == None, reason="Test data wasn't generated, run 'make data' first" +) +class TestPyIcebergRead: + def test_pyiceberg_read(self, rest_catalog): + tbl = rest_catalog.load_table("default.test_reading_upper_and_lower_bounds") + scan = tbl.scan(row_filter=pyice.expressions.GreaterThanOrEqual("a", 350)) + + # Collect the file paths Iceberg selects + matched_files = [task.file.file_path for task in scan.plan_files()] + # only 1 data file should match the filter + assert len(matched_files) == 1 diff --git a/test/sql/local/irc/insert/test_write_upper_and_lower_bounds.test b/test/sql/local/irc/insert/test_write_upper_and_lower_bounds.test new file mode 100644 index 000000000..682604d54 --- /dev/null +++ b/test/sql/local/irc/insert/test_write_upper_and_lower_bounds.test @@ -0,0 +1,132 @@ +# name: test/sql/local/irc/insert/test_write_upper_and_lower_bounds.test +# group: [insert] + +require-env ICEBERG_SERVER_AVAILABLE + +require avro + +require parquet + +require iceberg + +require httpfs + +require icu + +# Do not ignore 'HTTP' error messages! +set ignore_error_messages + +statement ok +set enable_logging=true + +statement ok +set logging_level='debug' + +statement ok +SET TimeZone = 'UTC'; + +statement ok +pragma threads=1; + +mode skip + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 +); + +statement ok +ATTACH '' AS my_datalake ( + TYPE ICEBERG, + CLIENT_ID 'admin', + CLIENT_SECRET 'password', + ENDPOINT 'http://127.0.0.1:8181' +); + +statement ok +drop table if exists my_datalake.default.lower_upper_bounds_test; + +statement ok +create table my_datalake.default.lower_upper_bounds_test ( +int_type int, +long_type long, +varchar_type varchar, +bool_type bool, +float_type float, +double_type double, +decimal_type_18_3 DECIMAL(18,3), +date_type DATE, +timestamp_type TIMESTAMP, +binary_type BINARY +); + +statement ok +insert into my_datalake.default.lower_upper_bounds_test values +( -2147483648, + -9223372036854775808, + '', + false, + -3.4028235E38, + -1.7976931348623157E308, + -9999999999999.999, + DATE '0001-01-01', + TIMESTAMP '0001-01-01 00:00:00', + ''::BLOB +), +( + 2147483647, + 9223372036854775807, + 'ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ', + true, + 3.4028235E38, + 1.7976931348623157E308, + 9999999999999.999, + DATE '9999-12-31', + TIMESTAMP '9999-12-31 23:59:59.999999', + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'::BLOB +), +( + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL +); + +statement ok +set variable manifest_path_duckdb = (select manifest_path from iceberg_metadata(my_datalake.default.lower_upper_bounds_test) order by manifest_sequence_number desc limit 1); + +statement ok +set variable manifest_path_spark = (select manifest_path from iceberg_metadata(my_datalake.default.spark_written_upper_lower_bounds) order by manifest_sequence_number desc limit 1); + +# field ids 1, 2, 3 refer to int type, long type, and varchar type +query II nosort duckdb_vals +from (select unnest(map_keys(lower_bounds)) lower_keys, unnest(map_values(lower_bounds)) lower_vals, unnest(map_keys(upper_bounds)) upper_keys, unnest(map_values(upper_bounds)) upper_vals from (select unnest(data_file) from read_avro(getvariable('manifest_path_duckdb')))) where lower_keys in (1, 2, 3, 5, 6, 7, 8, 9) order by all; + +query II nosort duckdb_vals +from (select unnest(map_keys(lower_bounds)) lower_keys, unnest(map_values(lower_bounds)) lower_vals, unnest(map_keys(upper_bounds)) upper_keys, unnest(map_values(upper_bounds)) upper_vals from (select unnest(data_file) from read_avro(getvariable('manifest_path_spark')))) where lower_keys in (1, 2, 3, 5, 6, 7, 8, 9) order by all; + +# check null counts as well. + +query II nosort duckdb_null_value_counts +select unnest(map_keys(null_value_counts)), unnest(map_values(null_value_counts)) from (select unnest(data_file) from read_avro(getvariable('manifest_path_duckdb'))) order by all; + +query II nosort duckdb_null_value_counts +select unnest(map_keys(null_value_counts)), unnest(map_values(null_value_counts)) from (select unnest(data_file) from read_avro(getvariable('manifest_path_spark'))) order by all; + +# FIXME : +# bool values need to be fixed (field id 4) +# BLOB/BIT does not match spark because they max out blob upper/lower bounds at 4 bytes. + + + diff --git a/test/sql/local/irc/metadata/test_write_metadata.test b/test/sql/local/irc/metadata/test_write_metadata.test new file mode 100644 index 000000000..52282e85f --- /dev/null +++ b/test/sql/local/irc/metadata/test_write_metadata.test @@ -0,0 +1,62 @@ +# name: test/sql/local/irc/metadata/test_write_metadata.test +# group: [metadata] + +require-env ICEBERG_SERVER_AVAILABLE + +require avro + +require parquet + +require iceberg + +require httpfs + +require icu + +# Do not ignore 'HTTP' error messages! +set ignore_error_messages + +statement ok +set enable_logging=true + +statement ok +set logging_level='debug' + +statement ok +SET TimeZone = 'UTC'; + +statement ok +pragma threads=1; + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 +); + +statement ok +ATTACH '' AS my_datalake ( + TYPE ICEBERG, + CLIENT_ID 'admin', + CLIENT_SECRET 'password', + ENDPOINT 'http://127.0.0.1:8181' +); + +statement ok +drop table if exists my_datalake.default.test_reading_upper_and_lower_bounds; + +statement ok +create table my_datalake.default.test_reading_upper_and_lower_bounds as +select range a from range(100); + +statement ok +insert into my_datalake.default.test_reading_upper_and_lower_bounds +select range a from range(300,400); + + + +