Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions duckdb_pglake/src/duckdb_pglake_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,225 @@ PgErrorNestedListFun(DataChunk &args, ExpressionState &state, Vector &result)
}


/*
* IcebergByteSize returns the in-memory byte footprint of any Datum.
*
* Walks the value at the vector level (no text serialization), summing
* VARCHAR/BLOB string_t sizes, recursing through LIST/MAP children, and
* adding fixed-width sizes for scalar types. Used PG-side as a cheap
* proxy for "size that lands in the consumer's OBJECT/ARRAY/VARIANT
* column", in lieu of casting the whole container to VARCHAR (which
* would trigger DuckDB's text serialization).
*
* STRUCT field byte sums are computed by recursing into each field's
* vector at the same row index; LIST/MAP recurse over `entry.length`
* children starting at `entry.offset` in the child vector.
*/
static int64_t
IcebergComputeByteSize(Vector &v, idx_t row, idx_t row_count)
{
if (v.GetVectorType() != VectorType::FLAT_VECTOR)
v.Flatten(row_count);

if (FlatVector::IsNull(v, row))
return 0;

auto &type = v.GetType();

switch (type.id())
{
case LogicalTypeId::VARCHAR:
case LogicalTypeId::BLOB:
{
auto data = FlatVector::GetData<string_t>(v);
return (int64_t) data[row].GetSize();
}

case LogicalTypeId::LIST:
case LogicalTypeId::MAP:
{
auto list_data = FlatVector::GetData<list_entry_t>(v);
auto entry = list_data[row];
auto &child = ListVector::GetEntry(v);
idx_t child_size = ListVector::GetListSize(v);

int64_t total = 0;

for (idx_t i = 0; i < entry.length; i++)
total += IcebergComputeByteSize(child, entry.offset + i,
child_size);

return total;
}

case LogicalTypeId::STRUCT:
{
auto &children = StructVector::GetEntries(v);
int64_t total = 0;

for (auto &child : children)
total += IcebergComputeByteSize(*child, row, row_count);

return total;
}

default:
return (int64_t) GetTypeIdSize(type.InternalType());
}
}


static void
IcebergByteSizeFun(DataChunk &args, ExpressionState &state, Vector &result)
{
auto &input = args.data[0];
idx_t count = args.size();

if (input.GetVectorType() != VectorType::FLAT_VECTOR)
input.Flatten(count);

auto out = FlatVector::GetData<int64_t>(result);
auto &out_validity = FlatVector::Validity(result);

for (idx_t i = 0; i < count; i++)
{
if (FlatVector::IsNull(input, i))
{
out_validity.SetInvalid(i);
continue;
}
out[i] = IcebergComputeByteSize(input, i, count);
}
}


static unique_ptr<FunctionData>
IcebergByteSizeBind(ClientContext &context, ScalarFunction &bound_function,
vector<unique_ptr<Expression>> &arguments)
{
bound_function.return_type = LogicalType::BIGINT;
return nullptr;
}


/*
* IcebergSizeClampTextFun truncates a VARCHAR value at a UTF-8 character
* boundary so its byte length does not exceed the second argument. If the
* limit is <= 0 the value is returned unchanged so callers can encode
* "disabled" as 0. Used to enforce Snowflake STRING/VARCHAR per-column
* caps on the pushdown write path.
*
* Algorithm: if the input fits, return it. Otherwise, walk back from
* `limit` to the nearest UTF-8 leading byte (continuation bytes have
* the bit pattern 10xxxxxx). Worst case backs up at most 3 bytes,
* since UTF-8 codepoints are at most 4 bytes long.
*/
static void
IcebergSizeClampTextFun(DataChunk &args, ExpressionState &state, Vector &result)
{
BinaryExecutor::Execute<string_t, int32_t, string_t>(
args.data[0], args.data[1], result, args.size(),
[&](string_t input, int32_t limit) {
auto data = input.GetData();
auto size = (int64_t) input.GetSize();

if (limit <= 0 || size <= (int64_t) limit)
return StringVector::AddString(result, data, size);

int64_t lim = (int64_t) limit;
int64_t trim = lim;
while (trim > 0 &&
(((unsigned char) data[trim]) & 0xC0) == 0x80)
{
trim--;
}

return StringVector::AddString(result, data, trim);
});
}


/*
* IcebergSizeClampBlobFun byte-truncates a BLOB value to the second
* argument. If the limit is <= 0 the value is returned unchanged.
* Used to enforce Snowflake BINARY per-column caps on the pushdown
* write path.
*/
static void
IcebergSizeClampBlobFun(DataChunk &args, ExpressionState &state, Vector &result)
{
BinaryExecutor::Execute<string_t, int32_t, string_t>(
args.data[0], args.data[1], result, args.size(),
[&](string_t input, int32_t limit) {
auto data = input.GetData();
auto size = (int64_t) input.GetSize();

if (limit <= 0 || size <= (int64_t) limit)
return StringVector::AddStringOrBlob(result, data, size);

return StringVector::AddStringOrBlob(result, data, (idx_t) limit);
});
}


/*
* IcebergSizeCheckTextFun raises if a VARCHAR value's UTF-8 byte length
* exceeds the second argument; otherwise passes through unchanged. The
* third argument is the source column name, included in the error message
* for actionable diagnostics. A non-positive limit disables the check.
*
* Counterpart to IcebergSizeClampTextFun, used on the pushdown write path
* when the table's out_of_range_values policy is the default 'error'.
*/
static void
IcebergSizeCheckTextFun(DataChunk &args, ExpressionState &state, Vector &result)
{
TernaryExecutor::Execute<string_t, int32_t, string_t, string_t>(
args.data[0], args.data[1], args.data[2], result, args.size(),
[&](string_t input, int32_t limit, string_t column_name) {
auto data = input.GetData();
auto size = (int64_t) input.GetSize();

if (limit > 0 && size > (int64_t) limit)
throw InvalidInputException(
"value of column \"%s\" (text, %lld bytes) exceeds "
"iceberg_max_string_bytes (%d). "
"Set out_of_range_values = 'clamp' on the table to "
"truncate oversize values instead of erroring.",
column_name.GetString().c_str(), (long long) size, limit);

return StringVector::AddString(result, data, size);
});
}


/*
* IcebergSizeCheckBlobFun raises if a BLOB value exceeds the second
* argument; otherwise passes through unchanged. Counterpart to
* IcebergSizeClampBlobFun.
*/
static void
IcebergSizeCheckBlobFun(DataChunk &args, ExpressionState &state, Vector &result)
{
TernaryExecutor::Execute<string_t, int32_t, string_t, string_t>(
args.data[0], args.data[1], args.data[2], result, args.size(),
[&](string_t input, int32_t limit, string_t column_name) {
auto data = input.GetData();
auto size = (int64_t) input.GetSize();

if (limit > 0 && size > (int64_t) limit)
throw InvalidInputException(
"value of column \"%s\" (bytea, %lld bytes) exceeds "
"iceberg_max_binary_bytes (%d). "
"Set out_of_range_values = 'clamp' on the table to "
"truncate oversize values instead of erroring.",
column_name.GetString().c_str(), (long long) size, limit);

return StringVector::AddStringOrBlob(result, data, size);
});
}



static void LoadInternal(ExtensionLoader &loader) {

Expand Down Expand Up @@ -438,6 +657,44 @@ static void LoadInternal(ExtensionLoader &loader) {
loader.RegisterFunction(pg_error_nested);
}

{
ScalarFunction iceberg_size_clamp_text(
"iceberg_size_clamp_text",
{LogicalType::VARCHAR, LogicalType::INTEGER},
LogicalType::VARCHAR,
IcebergSizeClampTextFun);
loader.RegisterFunction(iceberg_size_clamp_text);

ScalarFunction iceberg_size_clamp_blob(
"iceberg_size_clamp_blob",
{LogicalType::BLOB, LogicalType::INTEGER},
LogicalType::BLOB,
IcebergSizeClampBlobFun);
loader.RegisterFunction(iceberg_size_clamp_blob);

ScalarFunction iceberg_size_check_text(
"iceberg_size_check_text",
{LogicalType::VARCHAR, LogicalType::INTEGER, LogicalType::VARCHAR},
LogicalType::VARCHAR,
IcebergSizeCheckTextFun);
loader.RegisterFunction(iceberg_size_check_text);

ScalarFunction iceberg_size_check_blob(
"iceberg_size_check_blob",
{LogicalType::BLOB, LogicalType::INTEGER, LogicalType::VARCHAR},
LogicalType::BLOB,
IcebergSizeCheckBlobFun);
loader.RegisterFunction(iceberg_size_check_blob);

ScalarFunction iceberg_byte_size(
"iceberg_byte_size",
{LogicalType::ANY},
LogicalType::BIGINT,
IcebergByteSizeFun,
IcebergByteSizeBind);
loader.RegisterFunction(iceberg_byte_size);
}

PgLakeUtilityFunctions::RegisterFunctions(loader);
PgLakeFileSystemFunctions::RegisterFunctions(loader);

Expand Down
37 changes: 37 additions & 0 deletions pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,40 @@ extern PGDLLEXPORT Datum IcebergErrorOrClampDatum(Datum value, Oid typeOid,
int32 typmod,
IcebergOutOfRangePolicy policy,
bool *isNull);

/*
* IcebergSizeClampDatum truncates, NULLs, or errors on a Datum so that
* string and binary values fit the byte limits expressed by
* pg_lake_engine.iceberg_max_string_bytes and
* pg_lake_engine.iceberg_max_binary_bytes (0 = no limit).
*
* The behavior on an oversize value is selected by `policy`:
* - ICEBERG_OOR_ERROR (default for Iceberg tables): raise an error
* identifying the column and exceeded GUC.
* - ICEBERG_OOR_CLAMP: silently fix up the value as below.
* - ICEBERG_OOR_NONE: pass through unchanged.
*
* Under CLAMP, lossless types are truncated:
* - text/varchar/bpchar -> trimmed at a UTF-8 character boundary to
* iceberg_max_string_bytes.
* - bytea -> byte-truncated to iceberg_max_binary_bytes.
*
* Structured-string types are replaced with NULL via *isNull = true,
* since truncation would corrupt them:
* - jsonb/json
*
* Recurses through arrays, composites, maps, and domains. Nested values
* that would be NULLed are absorbed as NULL within the reconstructed
* container.
*
* `columnName` is included in the error message under ERROR mode; pass
* NULL or empty when the column context is unknown.
*
* If all three GUCs are 0, the value is returned unchanged regardless of
* policy or type.
*/
extern PGDLLEXPORT Datum IcebergSizeClampDatum(Datum value, Oid typeOid,
int32 typmod,
IcebergOutOfRangePolicy policy,
const char *columnName,
bool *isNull);
21 changes: 21 additions & 0 deletions pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@ extern PGDLLEXPORT char *IcebergWrapQueryWithErrorOrClampChecks(char *query,
IcebergOutOfRangePolicy policy,
bool queryHasRowId);

/*
* IcebergWrapQueryWithSizeClampChecks wraps a query so that values
* exceeding the per-column byte caps (pg_lake_engine.iceberg_max_string_bytes,
* iceberg_max_binary_bytes, iceberg_max_nested_type_bytes) are either
* clamped or rejected, per `policy`:
*
* - ICEBERG_OOR_ERROR (default): raise error identifying the column.
* - ICEBERG_OOR_CLAMP: text/varchar/bpchar truncated at a UTF-8
* character boundary; bytea byte-truncated; jsonb/json NULLed when
* the serialized form exceeds the limit; arrays/structs/maps NULLed
* when their measured byte size exceeds the nested-type cap.
* - ICEBERG_OOR_NONE: no-op.
*
* Returns the original query unchanged when all GUCs are zero or no
* column carries a clampable type.
*/
extern PGDLLEXPORT char *IcebergWrapQueryWithSizeClampChecks(char *query,
TupleDesc tupleDesc,
IcebergOutOfRangePolicy policy,
bool queryHasRowId);

/*
* IcebergWrapQueryWithNativeTypeConversion wraps a query to rewrite
* columns whose native DuckDB shape does not match Iceberg's. See the
Expand Down
20 changes: 20 additions & 0 deletions pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,23 @@ extern PGDLLEXPORT bool TypeNeedsIcebergValidation(Oid typeOid, int32 typmod,
#define TEMPORAL_DATE_MIN_YEAR (-4712)
#define TEMPORAL_TIMESTAMP_MIN_YEAR 1
#define TEMPORAL_MAX_YEAR 9999

/*
* Downstream byte limits for values written to Iceberg tables, set via the
* pg_lake_engine.iceberg_max_string_bytes and
* pg_lake_engine.iceberg_max_binary_bytes GUCs. 0 means no limit. These
* caps are imposed by some downstream consumers (e.g. Snowflake VARCHAR
* 16 MiB / BINARY 8 MiB) and applied via IcebergSizeClampDatum.
*/
extern PGDLLEXPORT int IcebergMaxStringBytes;
extern PGDLLEXPORT int IcebergMaxBinaryBytes;
extern PGDLLEXPORT int IcebergMaxNestedTypeBytes;

/*
* TypeNeedsIcebergSizeClamping returns true if a Datum of typeOid (or any
* lossless string / structured-string / bytea component nested within it)
* could potentially be size-clamped by IcebergSizeClampDatum. Recurses
* through arrays, composites, maps, and domains. Independent of the
* current GUC values.
*/
extern PGDLLEXPORT bool TypeNeedsIcebergSizeClamping(Oid typeOid);
Loading
Loading