Skip to content

Commit 8756635

Browse files
sfc-gh-mslotclaude
andcommitted
pg_lake: clamp string and binary values to per-column byte limits on Iceberg writes
Some downstream consumers of Iceberg tables impose per-column byte caps smaller than what PostgreSQL values in the source can carry. For example, on Snowflake the column-type byte ceilings are: - STRING / VARCHAR : 16 MiB default, up to 128 MiB when declared with an explicit larger length. - BINARY : 8 MiB default, up to 64 MiB. - OBJECT / ARRAY / VARIANT : 128 MiB. Without a guard, rows whose individual values exceed the target column's cap reach the consumer and surface as opaque "value too long" errors when the consumer ingests them. This adds an opt-in, GUC-driven clamp that fires consistently across both the per-tuple FDW path and the DuckDB-pushdown paths (INSERT..SELECT, snapshot loads via postgres_scan, COPY FROM, etc.). Three new GUCs (PGC_USERSET, GUC_UNIT_BYTE, default 0 = disabled, no behavior change unless set): - pg_lake_engine.iceberg_max_string_bytes governs text, varchar, bpchar, jsonb, json (per-leaf clamp). text/varchar/bpchar are truncated at a UTF-8 character boundary; jsonb/json are replaced with NULL since truncating the serialized form would yield invalid JSON. - pg_lake_engine.iceberg_max_binary_bytes governs bytea (byte-truncate). - pg_lake_engine.iceberg_max_aggregate_bytes governs the on-disk size of array, composite, and map values that land in OBJECT/ ARRAY/VARIANT columns. The whole container is replaced with NULL when its varlena exceeds the limit; we deliberately do NOT recurse into elements/fields to per-leaf-clamp inner strings, because (a) inner leaves don't have their own column cap on the consumer side — they're just JSON inside the parent OBJECT/ARRAY/VARIANT — and (b) when iceberg_max_string_bytes <= iceberg_max_aggregate_bytes, no inner leaf can exceed the per-leaf limit while staying inside an under-cap container. Distinct from the string GUC because consumers usually cap semi-structured types more loosely than STRING (e.g. on Snowflake: 128 MiB vs. 16 MiB). Operators set the GUCs to the target column's actual ceiling. Two complementary code paths cover all writers: 1. **Per-tuple FDW path** (INSERT .. VALUES, single-row UPDATE/DELETE pipelines): IcebergSizeClampSlotInPlace runs in-process after the existing temporal/numeric clamp and before ExecConstraints. PgLakeModifyState gains a needsSizeClamping flag computed once at init via TupleDescNeedsIcebergSizeClamping, so the per-row work is skipped entirely on tables whose columns cannot trigger size clamping. Hot-path optimizations: IcebergSizeClampDatum short-circuits varlena values whose total on-disk size is comfortably under every active limit (toast_raw_datum_size against the smallest active GUC, halved to cover jsonb's binary-vs-text gap), avoiding even the leaf scalar dispatch on small values. IcebergSizeClampString- Scalar additionally pre-checks jsonb binary size against half the string limit before falling back to jsonb_out, since text serialization of typical jsonb data is bounded by a small constant factor over the binary representation. For arrays/composites/ maps, no deconstruct/deform happens: a single toast_raw_datum_size compared against iceberg_max_aggregate_bytes decides pass-through vs. NULL. 2. **DuckDB-pushdown path** (INSERT..SELECT, snapshot/initial-copy via postgres_scan, COPY FROM, compaction): a new IcebergWrapQueryWithSizeClampChecks rewriter wraps the inner SELECT with an outer projection that calls two new DuckDB scalar UDFs registered in duckdb_pglake (iceberg_size_clamp_text and iceberg_size_clamp_blob) for lossless leaf truncation, expresses jsonb/json NULL-on-overflow inline via strlen(::VARCHAR), and applies aggregate-NULL inline as `CASE WHEN strlen(<container>::VARCHAR) > <aggregate_max> THEN NULL ELSE <container> END`. No list_transform / struct_pack inside containers, mirroring the per-tuple side. The wrapper is hooked into pg_lake_engine/src/pgduck/write_data.c alongside the existing out_of_range_values clamp wrapper, so all pushdown writers (INSERT..SELECT, snapshot via postgres_scan, COPY FROM, vacuum/compaction) flow through it without per-callsite plumbing. Per-leaf truncation output is byte-identical between the two paths. The aggregate threshold uses different proxies — varlena size on the PG side, JSON-serialized text length on the SQL side — but both are within a small constant factor of the consumer-visible size. Tests: pg_lake_table/tests/pytests/test_iceberg_size_clamping.py covers UTF-8 boundary truncation, bytea byte truncation, jsonb/json → NULL, NOT NULL constraint violation after clamp-to-NULL, container NULLing on aggregate overflow (text[], composite-of-text, int[], all-bigint composite), the disabled-by-default no-op path, the only-one-GUC-set path, INSERT..SELECT pushdown clamping across text/bytea/jsonb, and INSERT..SELECT pushdown aggregate NULLing of int[]. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Marco Slot <marco.slot@snowflake.com>
1 parent 139aeca commit 8756635

11 files changed

Lines changed: 1457 additions & 0 deletions

File tree

duckdb_pglake/src/duckdb_pglake_extension.cpp

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,174 @@ PgErrorNestedListFun(DataChunk &args, ExpressionState &state, Vector &result)
389389
}
390390

391391

392+
/*
393+
* IcebergByteSize returns the in-memory byte footprint of any Datum.
394+
*
395+
* Walks the value at the vector level (no text serialization), summing
396+
* VARCHAR/BLOB string_t sizes, recursing through LIST/MAP children, and
397+
* adding fixed-width sizes for scalar types. Used PG-side as a cheap
398+
* proxy for "size that lands in the consumer's OBJECT/ARRAY/VARIANT
399+
* column", in lieu of casting the whole container to VARCHAR (which
400+
* would trigger DuckDB's text serialization).
401+
*
402+
* STRUCT field byte sums are computed by recursing into each field's
403+
* vector at the same row index; LIST/MAP recurse over `entry.length`
404+
* children starting at `entry.offset` in the child vector.
405+
*/
406+
static int64_t
407+
IcebergComputeByteSize(Vector &v, idx_t row, idx_t row_count)
408+
{
409+
if (v.GetVectorType() != VectorType::FLAT_VECTOR)
410+
v.Flatten(row_count);
411+
412+
if (FlatVector::IsNull(v, row))
413+
return 0;
414+
415+
auto &type = v.GetType();
416+
417+
switch (type.id())
418+
{
419+
case LogicalTypeId::VARCHAR:
420+
case LogicalTypeId::BLOB:
421+
{
422+
auto data = FlatVector::GetData<string_t>(v);
423+
return (int64_t) data[row].GetSize();
424+
}
425+
426+
case LogicalTypeId::LIST:
427+
case LogicalTypeId::MAP:
428+
{
429+
auto list_data = FlatVector::GetData<list_entry_t>(v);
430+
auto entry = list_data[row];
431+
auto &child = ListVector::GetEntry(v);
432+
idx_t child_size = ListVector::GetListSize(v);
433+
434+
int64_t total = 0;
435+
436+
for (idx_t i = 0; i < entry.length; i++)
437+
total += IcebergComputeByteSize(child, entry.offset + i,
438+
child_size);
439+
440+
return total;
441+
}
442+
443+
case LogicalTypeId::STRUCT:
444+
{
445+
auto &children = StructVector::GetEntries(v);
446+
int64_t total = 0;
447+
448+
for (auto &child : children)
449+
total += IcebergComputeByteSize(*child, row, row_count);
450+
451+
return total;
452+
}
453+
454+
default:
455+
return (int64_t) GetTypeIdSize(type.InternalType());
456+
}
457+
}
458+
459+
460+
static void
461+
IcebergByteSizeFun(DataChunk &args, ExpressionState &state, Vector &result)
462+
{
463+
auto &input = args.data[0];
464+
idx_t count = args.size();
465+
466+
if (input.GetVectorType() != VectorType::FLAT_VECTOR)
467+
input.Flatten(count);
468+
469+
auto out = FlatVector::GetData<int64_t>(result);
470+
auto &out_validity = FlatVector::Validity(result);
471+
472+
for (idx_t i = 0; i < count; i++)
473+
{
474+
if (FlatVector::IsNull(input, i))
475+
{
476+
out_validity.SetInvalid(i);
477+
continue;
478+
}
479+
out[i] = IcebergComputeByteSize(input, i, count);
480+
}
481+
}
482+
483+
484+
static unique_ptr<FunctionData>
485+
IcebergByteSizeBind(ClientContext &context, ScalarFunction &bound_function,
486+
vector<unique_ptr<Expression>> &arguments)
487+
{
488+
bound_function.return_type = LogicalType::BIGINT;
489+
return nullptr;
490+
}
491+
492+
493+
/*
494+
* IcebergSizeClampTextFun truncates a VARCHAR value at a UTF-8 character
495+
* boundary so its byte length does not exceed the second argument. If the
496+
* limit is <= 0 the value is returned unchanged so callers can encode
497+
* "disabled" as 0. Used to enforce Snowflake STRING/VARCHAR per-column
498+
* caps on the pushdown write path.
499+
*
500+
* Algorithm: if the input fits, return it. Otherwise, walk back from
501+
* `limit` to the nearest UTF-8 leading byte (continuation bytes have
502+
* the bit pattern 10xxxxxx). Worst case backs up at most 3 bytes,
503+
* since UTF-8 codepoints are at most 4 bytes long.
504+
*/
505+
static void
506+
IcebergSizeClampTextFun(DataChunk &args, ExpressionState &state, Vector &result)
507+
{
508+
BinaryExecutor::Execute<string_t, int32_t, string_t>(
509+
args.data[0], args.data[1], result, args.size(),
510+
[&](string_t input, int32_t limit) {
511+
if (limit <= 0)
512+
return input;
513+
514+
auto data = input.GetData();
515+
auto size = (int64_t) input.GetSize();
516+
int64_t lim = (int64_t) limit;
517+
518+
if (size <= lim)
519+
return input;
520+
521+
int64_t trim = lim;
522+
while (trim > 0 &&
523+
(((unsigned char) data[trim]) & 0xC0) == 0x80)
524+
{
525+
trim--;
526+
}
527+
528+
return StringVector::AddString(result, data, trim);
529+
});
530+
}
531+
532+
533+
/*
534+
* IcebergSizeClampBlobFun byte-truncates a BLOB value to the second
535+
* argument. If the limit is <= 0 the value is returned unchanged.
536+
* Used to enforce Snowflake BINARY per-column caps on the pushdown
537+
* write path.
538+
*/
539+
static void
540+
IcebergSizeClampBlobFun(DataChunk &args, ExpressionState &state, Vector &result)
541+
{
542+
BinaryExecutor::Execute<string_t, int32_t, string_t>(
543+
args.data[0], args.data[1], result, args.size(),
544+
[&](string_t input, int32_t limit) {
545+
if (limit <= 0)
546+
return input;
547+
548+
auto size = (int64_t) input.GetSize();
549+
int64_t lim = (int64_t) limit;
550+
551+
if (size <= lim)
552+
return input;
553+
554+
return StringVector::AddStringOrBlob(result, input.GetData(),
555+
(idx_t) lim);
556+
});
557+
}
558+
559+
392560

393561
static void LoadInternal(ExtensionLoader &loader) {
394562

@@ -438,6 +606,30 @@ static void LoadInternal(ExtensionLoader &loader) {
438606
loader.RegisterFunction(pg_error_nested);
439607
}
440608

609+
{
610+
ScalarFunction iceberg_size_clamp_text(
611+
"iceberg_size_clamp_text",
612+
{LogicalType::VARCHAR, LogicalType::INTEGER},
613+
LogicalType::VARCHAR,
614+
IcebergSizeClampTextFun);
615+
loader.RegisterFunction(iceberg_size_clamp_text);
616+
617+
ScalarFunction iceberg_size_clamp_blob(
618+
"iceberg_size_clamp_blob",
619+
{LogicalType::BLOB, LogicalType::INTEGER},
620+
LogicalType::BLOB,
621+
IcebergSizeClampBlobFun);
622+
loader.RegisterFunction(iceberg_size_clamp_blob);
623+
624+
ScalarFunction iceberg_byte_size(
625+
"iceberg_byte_size",
626+
{LogicalType::ANY},
627+
LogicalType::BIGINT,
628+
IcebergByteSizeFun,
629+
IcebergByteSizeBind);
630+
loader.RegisterFunction(iceberg_byte_size);
631+
}
632+
441633
PgLakeUtilityFunctions::RegisterFunctions(loader);
442634
PgLakeFileSystemFunctions::RegisterFunctions(loader);
443635

pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,27 @@ extern PGDLLEXPORT Datum IcebergErrorOrClampDatum(Datum value, Oid typeOid,
4141
int32 typmod,
4242
IcebergOutOfRangePolicy policy,
4343
bool *isNull);
44+
45+
/*
46+
* IcebergSizeClampDatum truncates or NULLs a Datum so that string and
47+
* binary values fit the byte limits expressed by
48+
* pg_lake_engine.iceberg_max_string_bytes and
49+
* pg_lake_engine.iceberg_max_binary_bytes (0 = no limit).
50+
*
51+
* Lossless types are truncated:
52+
* - text/varchar/bpchar -> trimmed at a UTF-8 character boundary to
53+
* iceberg_max_string_bytes.
54+
* - bytea -> byte-truncated to iceberg_max_binary_bytes.
55+
*
56+
* Structured-string types are replaced with NULL via *isNull = true,
57+
* since truncation would corrupt them:
58+
* - jsonb/json
59+
*
60+
* Recurses through arrays, composites, maps, and domains. Nested values
61+
* that would be NULLed are absorbed as NULL within the reconstructed
62+
* container.
63+
*
64+
* If both GUCs are 0, the value is returned unchanged regardless of type.
65+
*/
66+
extern PGDLLEXPORT Datum IcebergSizeClampDatum(Datum value, Oid typeOid,
67+
int32 typmod, bool *isNull);

pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ extern PGDLLEXPORT char *IcebergWrapQueryWithErrorOrClampChecks(char *query,
3737
IcebergOutOfRangePolicy policy,
3838
bool queryHasRowId);
3939

40+
/*
41+
* IcebergWrapQueryWithSizeClampChecks wraps a query with size-clamp
42+
* expressions for columns whose values may exceed downstream byte caps:
43+
* text/varchar/bpchar truncated at a UTF-8 character boundary, bytea
44+
* byte-truncated, jsonb/json NULLed when too long, and arrays/structs/
45+
* maps NULLed when their leaf-byte sum exceeds
46+
* pg_lake_engine.iceberg_max_string_bytes.
47+
*
48+
* Returns the original query unchanged when both GUCs are zero or no
49+
* column carries a clampable type.
50+
*/
51+
extern PGDLLEXPORT char *IcebergWrapQueryWithSizeClampChecks(char *query,
52+
TupleDesc tupleDesc,
53+
bool queryHasRowId);
54+
4055
/*
4156
* IcebergWrapQueryWithNativeTypeConversion wraps a query to rewrite
4257
* columns whose native DuckDB shape does not match Iceberg's. See the

pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,23 @@ extern PGDLLEXPORT bool TypeNeedsIcebergValidation(Oid typeOid, int32 typmod,
7272
#define TEMPORAL_DATE_MIN_YEAR (-4712)
7373
#define TEMPORAL_TIMESTAMP_MIN_YEAR 1
7474
#define TEMPORAL_MAX_YEAR 9999
75+
76+
/*
77+
* Downstream byte limits for values written to Iceberg tables, set via the
78+
* pg_lake_engine.iceberg_max_string_bytes and
79+
* pg_lake_engine.iceberg_max_binary_bytes GUCs. 0 means no limit. These
80+
* caps are imposed by some downstream consumers (e.g. Snowflake VARCHAR
81+
* 16 MiB / BINARY 8 MiB) and applied via IcebergSizeClampDatum.
82+
*/
83+
extern PGDLLEXPORT int IcebergMaxStringBytes;
84+
extern PGDLLEXPORT int IcebergMaxBinaryBytes;
85+
extern PGDLLEXPORT int IcebergMaxAggregateBytes;
86+
87+
/*
88+
* TypeNeedsIcebergSizeClamping returns true if a Datum of typeOid (or any
89+
* lossless string / structured-string / bytea component nested within it)
90+
* could potentially be size-clamped by IcebergSizeClampDatum. Recurses
91+
* through arrays, composites, maps, and domains. Independent of the
92+
* current GUC values.
93+
*/
94+
extern PGDLLEXPORT bool TypeNeedsIcebergSizeClamping(Oid typeOid);

pg_lake_engine/src/init.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "pg_extension_base/pg_extension_base_ids.h"
4545
#include "pg_lake/pgduck/cache_worker.h"
4646
#include "pg_lake/pgduck/client.h"
47+
#include "pg_lake/pgduck/iceberg_validation.h"
4748
#include "pg_lake/util/s3_writer_utils.h"
4849
#include "utils/guc.h"
4950

@@ -186,6 +187,50 @@ _PG_init(void)
186187
0,
187188
NULL, NULL, NULL);
188189

190+
DefineCustomIntVariable("pg_lake_engine.iceberg_max_string_bytes",
191+
gettext_noop("Maximum bytes for string values written to "
192+
"Iceberg tables. Values of text/varchar/bpchar "
193+
"exceeding this size are truncated at a UTF-8 "
194+
"character boundary; values of jsonb/json are "
195+
"replaced with NULL since truncation would "
196+
"corrupt the structure. 0 disables the limit. "
197+
"Intended for downstream consumers (e.g. "
198+
"Snowflake) that impose per-column byte caps."),
199+
NULL,
200+
&IcebergMaxStringBytes,
201+
0, 0, INT_MAX,
202+
PGC_USERSET,
203+
GUC_UNIT_BYTE,
204+
NULL, NULL, NULL);
205+
206+
DefineCustomIntVariable("pg_lake_engine.iceberg_max_binary_bytes",
207+
gettext_noop("Maximum bytes for bytea values written to "
208+
"Iceberg tables. Values exceeding this size are "
209+
"byte-truncated. 0 disables the limit."),
210+
NULL,
211+
&IcebergMaxBinaryBytes,
212+
0, 0, INT_MAX,
213+
PGC_USERSET,
214+
GUC_UNIT_BYTE,
215+
NULL, NULL, NULL);
216+
217+
DefineCustomIntVariable("pg_lake_engine.iceberg_max_aggregate_bytes",
218+
gettext_noop("Maximum bytes for the JSON-serialized form of "
219+
"array, composite, and map values written to "
220+
"Iceberg tables. The whole container is replaced "
221+
"with NULL when the sum of its leaf byte sizes "
222+
"exceeds this size. 0 disables the limit. Distinct "
223+
"from iceberg_max_string_bytes because downstream "
224+
"consumers' OBJECT/ARRAY/VARIANT columns typically "
225+
"have a much larger ceiling than STRING/VARCHAR "
226+
"(e.g. on Snowflake: 128 MiB vs. 16 MiB)."),
227+
NULL,
228+
&IcebergMaxAggregateBytes,
229+
0, 0, INT_MAX,
230+
PGC_USERSET,
231+
GUC_UNIT_BYTE,
232+
NULL, NULL, NULL);
233+
189234
DefineCustomStringVariable(
190235
"pg_lake.stage_location",
191236
gettext_noop("Base URL for @STAGE/ resolution in paths"),

0 commit comments

Comments
 (0)