Skip to content

Commit c1b9b98

Browse files
committed
pg_lake: clamp or error on oversize values per out_of_range_values
Some downstream consumers of Iceberg tables impose per-column byte caps smaller than what PostgreSQL values in the source can carry. For example, on Snowflake the column-type byte ceilings are: - STRING / VARCHAR : 16 MiB default, up to 128 MiB when declared with an explicit larger length. - BINARY : 8 MiB default, up to 64 MiB. - OBJECT / ARRAY / VARIANT : 128 MiB. This adds three opt-in GUCs (default 0 = disabled) that bound text / binary / nested-type values written to Iceberg: pg_lake_engine.iceberg_max_string_bytes pg_lake_engine.iceberg_max_binary_bytes pg_lake_engine.iceberg_max_nested_type_bytes The behavior on an oversize value follows the table's out_of_range_values option (same contract as temporal / numeric OOR): out_of_range_values = 'error' (default for Iceberg tables): Raise an error identifying the column, type, byte size, and exceeded GUC. Operators see the violation immediately rather than discovering silent data loss downstream. out_of_range_values = 'clamp': text / varchar / bpchar : truncated at a UTF-8 character boundary bytea : byte-truncated jsonb / json : NULL when the text-serialized form exceeds the limit (truncating would yield invalid JSON) array / composite / map : NULL the whole container when its measured byte size exceeds the nested-type cap Both the per-tuple FDW write path (IcebergSizeCheckOrClampSlotInPlace) and the SQL pushdown path (IcebergWrapQueryWithSizeClampChecks, around the SELECT used by INSERT..SELECT, COPY FROM, and snowflake_cdc snapshot via postgres_scan) honor the policy and produce identical clamped output under 'clamp'. duckdb_pglake adds two new check UDFs (iceberg_size_check_text / _blob) used by the SQL wrapper under 'error'. The existing iceberg_size_clamp_text / _blob and iceberg_byte_size UDFs cover the 'clamp' path; jsonb and nested 'error' cases use DuckDB's built-in error() inside CASE WHEN. Tests cover both modes via 13 cases in pg_lake_table/tests/pytests/test_iceberg_size_clamping.py. Signed-off-by: Marco Slot <marco.slot@snowflake.com>
1 parent 139aeca commit c1b9b98

11 files changed

Lines changed: 1868 additions & 0 deletions

File tree

duckdb_pglake/src/duckdb_pglake_extension.cpp

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,225 @@ PgErrorNestedListFun(DataChunk &args, ExpressionState &state, Vector &result)
389389
}
390390

391391

392+
/*
393+
* IcebergByteSize returns the in-memory byte footprint of any Datum.
394+
*
395+
* Walks the value at the vector level (no text serialization), summing
396+
* VARCHAR/BLOB string_t sizes, recursing through LIST/MAP children, and
397+
* adding fixed-width sizes for scalar types. Used PG-side as a cheap
398+
* proxy for "size that lands in the consumer's OBJECT/ARRAY/VARIANT
399+
* column", in lieu of casting the whole container to VARCHAR (which
400+
* would trigger DuckDB's text serialization).
401+
*
402+
* STRUCT field byte sums are computed by recursing into each field's
403+
* vector at the same row index; LIST/MAP recurse over `entry.length`
404+
* children starting at `entry.offset` in the child vector.
405+
*/
406+
static int64_t
407+
IcebergComputeByteSize(Vector &v, idx_t row, idx_t row_count)
408+
{
409+
if (v.GetVectorType() != VectorType::FLAT_VECTOR)
410+
v.Flatten(row_count);
411+
412+
if (FlatVector::IsNull(v, row))
413+
return 0;
414+
415+
auto &type = v.GetType();
416+
417+
switch (type.id())
418+
{
419+
case LogicalTypeId::VARCHAR:
420+
case LogicalTypeId::BLOB:
421+
{
422+
auto data = FlatVector::GetData<string_t>(v);
423+
return (int64_t) data[row].GetSize();
424+
}
425+
426+
case LogicalTypeId::LIST:
427+
case LogicalTypeId::MAP:
428+
{
429+
auto list_data = FlatVector::GetData<list_entry_t>(v);
430+
auto entry = list_data[row];
431+
auto &child = ListVector::GetEntry(v);
432+
idx_t child_size = ListVector::GetListSize(v);
433+
434+
int64_t total = 0;
435+
436+
for (idx_t i = 0; i < entry.length; i++)
437+
total += IcebergComputeByteSize(child, entry.offset + i,
438+
child_size);
439+
440+
return total;
441+
}
442+
443+
case LogicalTypeId::STRUCT:
444+
{
445+
auto &children = StructVector::GetEntries(v);
446+
int64_t total = 0;
447+
448+
for (auto &child : children)
449+
total += IcebergComputeByteSize(*child, row, row_count);
450+
451+
return total;
452+
}
453+
454+
default:
455+
return (int64_t) GetTypeIdSize(type.InternalType());
456+
}
457+
}
458+
459+
460+
static void
461+
IcebergByteSizeFun(DataChunk &args, ExpressionState &state, Vector &result)
462+
{
463+
auto &input = args.data[0];
464+
idx_t count = args.size();
465+
466+
if (input.GetVectorType() != VectorType::FLAT_VECTOR)
467+
input.Flatten(count);
468+
469+
auto out = FlatVector::GetData<int64_t>(result);
470+
auto &out_validity = FlatVector::Validity(result);
471+
472+
for (idx_t i = 0; i < count; i++)
473+
{
474+
if (FlatVector::IsNull(input, i))
475+
{
476+
out_validity.SetInvalid(i);
477+
continue;
478+
}
479+
out[i] = IcebergComputeByteSize(input, i, count);
480+
}
481+
}
482+
483+
484+
static unique_ptr<FunctionData>
485+
IcebergByteSizeBind(ClientContext &context, ScalarFunction &bound_function,
486+
vector<unique_ptr<Expression>> &arguments)
487+
{
488+
bound_function.return_type = LogicalType::BIGINT;
489+
return nullptr;
490+
}
491+
492+
493+
/*
494+
* IcebergSizeClampTextFun truncates a VARCHAR value at a UTF-8 character
495+
* boundary so its byte length does not exceed the second argument. If the
496+
* limit is <= 0 the value is returned unchanged so callers can encode
497+
* "disabled" as 0. Used to enforce Snowflake STRING/VARCHAR per-column
498+
* caps on the pushdown write path.
499+
*
500+
* Algorithm: if the input fits, return it. Otherwise, walk back from
501+
* `limit` to the nearest UTF-8 leading byte (continuation bytes have
502+
* the bit pattern 10xxxxxx). Worst case backs up at most 3 bytes,
503+
* since UTF-8 codepoints are at most 4 bytes long.
504+
*/
505+
static void
506+
IcebergSizeClampTextFun(DataChunk &args, ExpressionState &state, Vector &result)
507+
{
508+
BinaryExecutor::Execute<string_t, int32_t, string_t>(
509+
args.data[0], args.data[1], result, args.size(),
510+
[&](string_t input, int32_t limit) {
511+
auto data = input.GetData();
512+
auto size = (int64_t) input.GetSize();
513+
514+
if (limit <= 0 || size <= (int64_t) limit)
515+
return StringVector::AddString(result, data, size);
516+
517+
int64_t lim = (int64_t) limit;
518+
int64_t trim = lim;
519+
while (trim > 0 &&
520+
(((unsigned char) data[trim]) & 0xC0) == 0x80)
521+
{
522+
trim--;
523+
}
524+
525+
return StringVector::AddString(result, data, trim);
526+
});
527+
}
528+
529+
530+
/*
531+
* IcebergSizeClampBlobFun byte-truncates a BLOB value to the second
532+
* argument. If the limit is <= 0 the value is returned unchanged.
533+
* Used to enforce Snowflake BINARY per-column caps on the pushdown
534+
* write path.
535+
*/
536+
static void
537+
IcebergSizeClampBlobFun(DataChunk &args, ExpressionState &state, Vector &result)
538+
{
539+
BinaryExecutor::Execute<string_t, int32_t, string_t>(
540+
args.data[0], args.data[1], result, args.size(),
541+
[&](string_t input, int32_t limit) {
542+
auto data = input.GetData();
543+
auto size = (int64_t) input.GetSize();
544+
545+
if (limit <= 0 || size <= (int64_t) limit)
546+
return StringVector::AddStringOrBlob(result, data, size);
547+
548+
return StringVector::AddStringOrBlob(result, data, (idx_t) limit);
549+
});
550+
}
551+
552+
553+
/*
554+
* IcebergSizeCheckTextFun raises if a VARCHAR value's UTF-8 byte length
555+
* exceeds the second argument; otherwise passes through unchanged. The
556+
* third argument is the source column name, included in the error message
557+
* for actionable diagnostics. A non-positive limit disables the check.
558+
*
559+
* Counterpart to IcebergSizeClampTextFun, used on the pushdown write path
560+
* when the table's out_of_range_values policy is the default 'error'.
561+
*/
562+
static void
563+
IcebergSizeCheckTextFun(DataChunk &args, ExpressionState &state, Vector &result)
564+
{
565+
TernaryExecutor::Execute<string_t, int32_t, string_t, string_t>(
566+
args.data[0], args.data[1], args.data[2], result, args.size(),
567+
[&](string_t input, int32_t limit, string_t column_name) {
568+
auto data = input.GetData();
569+
auto size = (int64_t) input.GetSize();
570+
571+
if (limit > 0 && size > (int64_t) limit)
572+
throw InvalidInputException(
573+
"value of column \"%s\" (text, %lld bytes) exceeds "
574+
"iceberg_max_string_bytes (%d). "
575+
"Set out_of_range_values = 'clamp' on the table to "
576+
"truncate oversize values instead of erroring.",
577+
column_name.GetString().c_str(), (long long) size, limit);
578+
579+
return StringVector::AddString(result, data, size);
580+
});
581+
}
582+
583+
584+
/*
585+
* IcebergSizeCheckBlobFun raises if a BLOB value exceeds the second
586+
* argument; otherwise passes through unchanged. Counterpart to
587+
* IcebergSizeClampBlobFun.
588+
*/
589+
static void
590+
IcebergSizeCheckBlobFun(DataChunk &args, ExpressionState &state, Vector &result)
591+
{
592+
TernaryExecutor::Execute<string_t, int32_t, string_t, string_t>(
593+
args.data[0], args.data[1], args.data[2], result, args.size(),
594+
[&](string_t input, int32_t limit, string_t column_name) {
595+
auto data = input.GetData();
596+
auto size = (int64_t) input.GetSize();
597+
598+
if (limit > 0 && size > (int64_t) limit)
599+
throw InvalidInputException(
600+
"value of column \"%s\" (bytea, %lld bytes) exceeds "
601+
"iceberg_max_binary_bytes (%d). "
602+
"Set out_of_range_values = 'clamp' on the table to "
603+
"truncate oversize values instead of erroring.",
604+
column_name.GetString().c_str(), (long long) size, limit);
605+
606+
return StringVector::AddStringOrBlob(result, data, size);
607+
});
608+
}
609+
610+
392611

393612
static void LoadInternal(ExtensionLoader &loader) {
394613

@@ -438,6 +657,44 @@ static void LoadInternal(ExtensionLoader &loader) {
438657
loader.RegisterFunction(pg_error_nested);
439658
}
440659

660+
{
661+
ScalarFunction iceberg_size_clamp_text(
662+
"iceberg_size_clamp_text",
663+
{LogicalType::VARCHAR, LogicalType::INTEGER},
664+
LogicalType::VARCHAR,
665+
IcebergSizeClampTextFun);
666+
loader.RegisterFunction(iceberg_size_clamp_text);
667+
668+
ScalarFunction iceberg_size_clamp_blob(
669+
"iceberg_size_clamp_blob",
670+
{LogicalType::BLOB, LogicalType::INTEGER},
671+
LogicalType::BLOB,
672+
IcebergSizeClampBlobFun);
673+
loader.RegisterFunction(iceberg_size_clamp_blob);
674+
675+
ScalarFunction iceberg_size_check_text(
676+
"iceberg_size_check_text",
677+
{LogicalType::VARCHAR, LogicalType::INTEGER, LogicalType::VARCHAR},
678+
LogicalType::VARCHAR,
679+
IcebergSizeCheckTextFun);
680+
loader.RegisterFunction(iceberg_size_check_text);
681+
682+
ScalarFunction iceberg_size_check_blob(
683+
"iceberg_size_check_blob",
684+
{LogicalType::BLOB, LogicalType::INTEGER, LogicalType::VARCHAR},
685+
LogicalType::BLOB,
686+
IcebergSizeCheckBlobFun);
687+
loader.RegisterFunction(iceberg_size_check_blob);
688+
689+
ScalarFunction iceberg_byte_size(
690+
"iceberg_byte_size",
691+
{LogicalType::ANY},
692+
LogicalType::BIGINT,
693+
IcebergByteSizeFun,
694+
IcebergByteSizeBind);
695+
loader.RegisterFunction(iceberg_byte_size);
696+
}
697+
441698
PgLakeUtilityFunctions::RegisterFunctions(loader);
442699
PgLakeFileSystemFunctions::RegisterFunctions(loader);
443700

pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,40 @@ extern PGDLLEXPORT Datum IcebergErrorOrClampDatum(Datum value, Oid typeOid,
4141
int32 typmod,
4242
IcebergOutOfRangePolicy policy,
4343
bool *isNull);
44+
45+
/*
46+
* IcebergSizeClampDatum truncates, NULLs, or errors on a Datum so that
47+
* string and binary values fit the byte limits expressed by
48+
* pg_lake_engine.iceberg_max_string_bytes and
49+
* pg_lake_engine.iceberg_max_binary_bytes (0 = no limit).
50+
*
51+
* The behavior on an oversize value is selected by `policy`:
52+
* - ICEBERG_OOR_ERROR (default for Iceberg tables): raise an error
53+
* identifying the column and exceeded GUC.
54+
* - ICEBERG_OOR_CLAMP: silently fix up the value as below.
55+
* - ICEBERG_OOR_NONE: pass through unchanged.
56+
*
57+
* Under CLAMP, lossless types are truncated:
58+
* - text/varchar/bpchar -> trimmed at a UTF-8 character boundary to
59+
* iceberg_max_string_bytes.
60+
* - bytea -> byte-truncated to iceberg_max_binary_bytes.
61+
*
62+
* Structured-string types are replaced with NULL via *isNull = true,
63+
* since truncation would corrupt them:
64+
* - jsonb/json
65+
*
66+
* Recurses through arrays, composites, maps, and domains. Nested values
67+
* that would be NULLed are absorbed as NULL within the reconstructed
68+
* container.
69+
*
70+
* `columnName` is included in the error message under ERROR mode; pass
71+
* NULL or empty when the column context is unknown.
72+
*
73+
* If all three GUCs are 0, the value is returned unchanged regardless of
74+
* policy or type.
75+
*/
76+
extern PGDLLEXPORT Datum IcebergSizeClampDatum(Datum value, Oid typeOid,
77+
int32 typmod,
78+
IcebergOutOfRangePolicy policy,
79+
const char *columnName,
80+
bool *isNull);

pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,27 @@ extern PGDLLEXPORT char *IcebergWrapQueryWithErrorOrClampChecks(char *query,
3737
IcebergOutOfRangePolicy policy,
3838
bool queryHasRowId);
3939

40+
/*
41+
* IcebergWrapQueryWithSizeClampChecks wraps a query so that values
42+
* exceeding the per-column byte caps (pg_lake_engine.iceberg_max_string_bytes,
43+
* iceberg_max_binary_bytes, iceberg_max_nested_type_bytes) are either
44+
* clamped or rejected, per `policy`:
45+
*
46+
* - ICEBERG_OOR_ERROR (default): raise error identifying the column.
47+
* - ICEBERG_OOR_CLAMP: text/varchar/bpchar truncated at a UTF-8
48+
* character boundary; bytea byte-truncated; jsonb/json NULLed when
49+
* the serialized form exceeds the limit; arrays/structs/maps NULLed
50+
* when their measured byte size exceeds the nested-type cap.
51+
* - ICEBERG_OOR_NONE: no-op.
52+
*
53+
* Returns the original query unchanged when all GUCs are zero or no
54+
* column carries a clampable type.
55+
*/
56+
extern PGDLLEXPORT char *IcebergWrapQueryWithSizeClampChecks(char *query,
57+
TupleDesc tupleDesc,
58+
IcebergOutOfRangePolicy policy,
59+
bool queryHasRowId);
60+
4061
/*
4162
* IcebergWrapQueryWithNativeTypeConversion wraps a query to rewrite
4263
* columns whose native DuckDB shape does not match Iceberg's. See the

pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,23 @@ extern PGDLLEXPORT bool TypeNeedsIcebergValidation(Oid typeOid, int32 typmod,
7272
#define TEMPORAL_DATE_MIN_YEAR (-4712)
7373
#define TEMPORAL_TIMESTAMP_MIN_YEAR 1
7474
#define TEMPORAL_MAX_YEAR 9999
75+
76+
/*
77+
* Downstream byte limits for values written to Iceberg tables, set via the
78+
* pg_lake_engine.iceberg_max_string_bytes and
79+
* pg_lake_engine.iceberg_max_binary_bytes GUCs. 0 means no limit. These
80+
* caps are imposed by some downstream consumers (e.g. Snowflake VARCHAR
81+
* 16 MiB / BINARY 8 MiB) and applied via IcebergSizeClampDatum.
82+
*/
83+
extern PGDLLEXPORT int IcebergMaxStringBytes;
84+
extern PGDLLEXPORT int IcebergMaxBinaryBytes;
85+
extern PGDLLEXPORT int IcebergMaxNestedTypeBytes;
86+
87+
/*
88+
* TypeNeedsIcebergSizeClamping returns true if a Datum of typeOid (or any
89+
* lossless string / structured-string / bytea component nested within it)
90+
* could potentially be size-clamped by IcebergSizeClampDatum. Recurses
91+
* through arrays, composites, maps, and domains. Independent of the
92+
* current GUC values.
93+
*/
94+
extern PGDLLEXPORT bool TypeNeedsIcebergSizeClamping(Oid typeOid);

0 commit comments

Comments
 (0)