Skip to content

Commit a8ab083

Browse files
sfc-gh-mslotclaude
andcommitted
pg_lake: clamp string and binary values to per-column byte limits on Iceberg writes
Some downstream consumers of Iceberg tables impose per-column byte caps smaller than what PostgreSQL values in the source can carry. For example, on Snowflake the column-type byte ceilings are: - STRING / VARCHAR : 16 MiB default, up to 128 MiB when declared with an explicit larger length. - BINARY : 8 MiB default, up to 64 MiB. - OBJECT / ARRAY / VARIANT : 128 MiB. Without a guard, rows whose individual values exceed the target column's cap reach the consumer and surface as opaque "value too long" errors when the consumer ingests them. This adds an opt-in, GUC-driven clamp at write time so values that would exceed a configurable byte limit are normalized in place before they are written to the Iceberg table. Two new GUCs (PGC_USERSET, GUC_UNIT_BYTE, default 0 = disabled, no behavior change unless set): - pg_lake_engine.iceberg_max_string_bytes governs text, varchar, bpchar, jsonb, json, and the aggregate JSON-serialized size of array, composite, and map values that land in STRING / OBJECT / ARRAY columns on the consumer side. - pg_lake_engine.iceberg_max_binary_bytes governs bytea. Operators set the GUCs to the target column's actual ceiling, which may be the default (16 MiB / 8 MiB on Snowflake) or a higher value (up to 128 MiB / 64 MiB) when the destination column was declared with an explicit larger length. Per-leaf behavior, applied recursively through arrays, composites, maps, and domains via IcebergSizeClampDatum: - text/varchar/bpchar: truncate at a UTF-8 character boundary (pg_mbcliplen). - bytea: byte-truncate. - jsonb/json: replace with NULL, since truncating the serialized form would yield invalid JSON. For jsonb the byte limit applies to the text-serialized form (what the consumer ultimately receives), not the on-disk binary varlena, so the size check serializes via jsonb_out before deciding. For json (stored as text) the varlena length matches what the consumer sees, so VARSIZE_ANY_EXHDR is used directly. In addition, an aggregate-size check NULLs an entire array or composite when the sum of its leaf byte sizes exceeds iceberg_max_string_bytes — the limit on the JSON-serialized form that downstream consumers receive when an array or struct lands in a STRING / OBJECT / ARRAY column. Snowflake stores semi-structured data without per-record headers, so sum-of-leaves is a close-enough approximation of the serialized JSON length without paying for an extra serialization pass. Each recursion level reports its post- clamp byte size up to its parent so the roll-up costs no extra walk. Aggregate clamping covers all containers, not just those whose elements/fields are themselves clampable. For arrays of non-string element types (e.g. int[]), the array's varlena content size (VARSIZE_ANY_EXHDR) serves as a cheap upper-bound proxy for the JSON serialization length; for composite fields of non-clampable types, the field's typlen-based size is folded into the aggregate. The new size clamp slots into the same FDW write path as the existing out_of_range_values clamp. ClampAndCheckConstraints invokes IcebergSizeClampSlotInPlace after the temporal/numeric clamp and before ExecConstraints, so a NOT NULL column whose value is replaced with NULL fails the constraint (mirroring the existing numeric-NaN clamp behavior). PgLakeModifyState gains a needsSizeClamping flag computed once at init via TupleDescNeedsIcebergSizeClamping, so the per-row work is skipped entirely on tables whose columns cannot trigger size clamping; the slot helper is also short-circuited unless at least one GUC is non-zero. Tests: pg_lake_table/tests/pytests/test_iceberg_size_clamping.py covers UTF-8 boundary truncation, bytea byte truncation, jsonb/json → NULL, NOT NULL constraint violation after clamp-to-NULL, recursion into text[] and composite types, aggregate-size NULLing of array and composite values whose leaves individually fit (including int[] and all-non-clampable composite fields), the disabled-by-default no-op path, and the only-one-GUC-set path. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Marco Slot <marco.slot@snowflake.com>
1 parent 139aeca commit a8ab083

7 files changed

Lines changed: 1084 additions & 0 deletions

File tree

pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,27 @@ extern PGDLLEXPORT Datum IcebergErrorOrClampDatum(Datum value, Oid typeOid,
4141
int32 typmod,
4242
IcebergOutOfRangePolicy policy,
4343
bool *isNull);
44+
45+
/*
46+
* IcebergSizeClampDatum truncates or NULLs a Datum so that string and
47+
* binary values fit the byte limits expressed by
48+
* pg_lake_engine.iceberg_max_string_bytes and
49+
* pg_lake_engine.iceberg_max_binary_bytes (0 = no limit).
50+
*
51+
* Lossless types are truncated:
52+
* - text/varchar/bpchar -> trimmed at a UTF-8 character boundary to
53+
* iceberg_max_string_bytes.
54+
* - bytea -> byte-truncated to iceberg_max_binary_bytes.
55+
*
56+
* Structured-string types are replaced with NULL via *isNull = true,
57+
* since truncation would corrupt them:
58+
* - jsonb/json
59+
*
60+
* Recurses through arrays, composites, maps, and domains. Nested values
61+
* that would be NULLed are absorbed as NULL within the reconstructed
62+
* container.
63+
*
64+
* If both GUCs are 0, the value is returned unchanged regardless of type.
65+
*/
66+
extern PGDLLEXPORT Datum IcebergSizeClampDatum(Datum value, Oid typeOid,
67+
int32 typmod, bool *isNull);

pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,22 @@ extern PGDLLEXPORT bool TypeNeedsIcebergValidation(Oid typeOid, int32 typmod,
7272
#define TEMPORAL_DATE_MIN_YEAR (-4712)
7373
#define TEMPORAL_TIMESTAMP_MIN_YEAR 1
7474
#define TEMPORAL_MAX_YEAR 9999
75+
76+
/*
77+
* Downstream byte limits for values written to Iceberg tables, set via the
78+
* pg_lake_engine.iceberg_max_string_bytes and
79+
* pg_lake_engine.iceberg_max_binary_bytes GUCs. 0 means no limit. These
80+
* caps are imposed by some downstream consumers (e.g. Snowflake VARCHAR
81+
* 16 MiB / BINARY 8 MiB) and applied via IcebergSizeClampDatum.
82+
*/
83+
extern PGDLLEXPORT int IcebergMaxStringBytes;
84+
extern PGDLLEXPORT int IcebergMaxBinaryBytes;
85+
86+
/*
87+
* TypeNeedsIcebergSizeClamping returns true if a Datum of typeOid (or any
88+
* lossless string / structured-string / bytea component nested within it)
89+
* could potentially be size-clamped by IcebergSizeClampDatum. Recurses
90+
* through arrays, composites, maps, and domains. Independent of the
91+
* current GUC values.
92+
*/
93+
extern PGDLLEXPORT bool TypeNeedsIcebergSizeClamping(Oid typeOid);

pg_lake_engine/src/init.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "pg_extension_base/pg_extension_base_ids.h"
4545
#include "pg_lake/pgduck/cache_worker.h"
4646
#include "pg_lake/pgduck/client.h"
47+
#include "pg_lake/pgduck/iceberg_validation.h"
4748
#include "pg_lake/util/s3_writer_utils.h"
4849
#include "utils/guc.h"
4950

@@ -186,6 +187,33 @@ _PG_init(void)
186187
0,
187188
NULL, NULL, NULL);
188189

190+
DefineCustomIntVariable("pg_lake_engine.iceberg_max_string_bytes",
191+
gettext_noop("Maximum bytes for string values written to "
192+
"Iceberg tables. Values of text/varchar/bpchar "
193+
"exceeding this size are truncated at a UTF-8 "
194+
"character boundary; values of jsonb/json are "
195+
"replaced with NULL since truncation would "
196+
"corrupt the structure. 0 disables the limit. "
197+
"Intended for downstream consumers (e.g. "
198+
"Snowflake) that impose per-column byte caps."),
199+
NULL,
200+
&IcebergMaxStringBytes,
201+
0, 0, INT_MAX,
202+
PGC_USERSET,
203+
GUC_UNIT_BYTE,
204+
NULL, NULL, NULL);
205+
206+
DefineCustomIntVariable("pg_lake_engine.iceberg_max_binary_bytes",
207+
gettext_noop("Maximum bytes for bytea values written to "
208+
"Iceberg tables. Values exceeding this size are "
209+
"byte-truncated. 0 disables the limit."),
210+
NULL,
211+
&IcebergMaxBinaryBytes,
212+
0, 0, INT_MAX,
213+
PGC_USERSET,
214+
GUC_UNIT_BYTE,
215+
NULL, NULL, NULL);
216+
189217
DefineCustomStringVariable(
190218
"pg_lake.stage_location",
191219
gettext_noop("Base URL for @STAGE/ resolution in paths"),

0 commit comments

Comments
 (0)