diff --git a/duckdb_pglake/src/duckdb_pglake_extension.cpp b/duckdb_pglake/src/duckdb_pglake_extension.cpp index b68988d1..e26e7abb 100644 --- a/duckdb_pglake/src/duckdb_pglake_extension.cpp +++ b/duckdb_pglake/src/duckdb_pglake_extension.cpp @@ -389,6 +389,225 @@ PgErrorNestedListFun(DataChunk &args, ExpressionState &state, Vector &result) } +/* + * IcebergByteSize returns the in-memory byte footprint of any Datum. + * + * Walks the value at the vector level (no text serialization), summing + * VARCHAR/BLOB string_t sizes, recursing through LIST/MAP children, and + * adding fixed-width sizes for scalar types. Used PG-side as a cheap + * proxy for "size that lands in the consumer's OBJECT/ARRAY/VARIANT + * column", in lieu of casting the whole container to VARCHAR (which + * would trigger DuckDB's text serialization). + * + * STRUCT field byte sums are computed by recursing into each field's + * vector at the same row index; LIST/MAP recurse over `entry.length` + * children starting at `entry.offset` in the child vector. + */ +static int64_t +IcebergComputeByteSize(Vector &v, idx_t row, idx_t row_count) +{ + if (v.GetVectorType() != VectorType::FLAT_VECTOR) + v.Flatten(row_count); + + if (FlatVector::IsNull(v, row)) + return 0; + + auto &type = v.GetType(); + + switch (type.id()) + { + case LogicalTypeId::VARCHAR: + case LogicalTypeId::BLOB: + { + auto data = FlatVector::GetData(v); + return (int64_t) data[row].GetSize(); + } + + case LogicalTypeId::LIST: + case LogicalTypeId::MAP: + { + auto list_data = FlatVector::GetData(v); + auto entry = list_data[row]; + auto &child = ListVector::GetEntry(v); + idx_t child_size = ListVector::GetListSize(v); + + int64_t total = 0; + + for (idx_t i = 0; i < entry.length; i++) + total += IcebergComputeByteSize(child, entry.offset + i, + child_size); + + return total; + } + + case LogicalTypeId::STRUCT: + { + auto &children = StructVector::GetEntries(v); + int64_t total = 0; + + for (auto &child : children) + total += IcebergComputeByteSize(*child, row, row_count); + + return total; + } + + default: + return (int64_t) GetTypeIdSize(type.InternalType()); + } +} + + +static void +IcebergByteSizeFun(DataChunk &args, ExpressionState &state, Vector &result) +{ + auto &input = args.data[0]; + idx_t count = args.size(); + + if (input.GetVectorType() != VectorType::FLAT_VECTOR) + input.Flatten(count); + + auto out = FlatVector::GetData(result); + auto &out_validity = FlatVector::Validity(result); + + for (idx_t i = 0; i < count; i++) + { + if (FlatVector::IsNull(input, i)) + { + out_validity.SetInvalid(i); + continue; + } + out[i] = IcebergComputeByteSize(input, i, count); + } +} + + +static unique_ptr +IcebergByteSizeBind(ClientContext &context, ScalarFunction &bound_function, + vector> &arguments) +{ + bound_function.return_type = LogicalType::BIGINT; + return nullptr; +} + + +/* + * IcebergSizeClampTextFun truncates a VARCHAR value at a UTF-8 character + * boundary so its byte length does not exceed the second argument. If the + * limit is <= 0 the value is returned unchanged so callers can encode + * "disabled" as 0. Used to enforce Snowflake STRING/VARCHAR per-column + * caps on the pushdown write path. + * + * Algorithm: if the input fits, return it. Otherwise, walk back from + * `limit` to the nearest UTF-8 leading byte (continuation bytes have + * the bit pattern 10xxxxxx). Worst case backs up at most 3 bytes, + * since UTF-8 codepoints are at most 4 bytes long. + */ +static void +IcebergSizeClampTextFun(DataChunk &args, ExpressionState &state, Vector &result) +{ + BinaryExecutor::Execute( + args.data[0], args.data[1], result, args.size(), + [&](string_t input, int32_t limit) { + auto data = input.GetData(); + auto size = (int64_t) input.GetSize(); + + if (limit <= 0 || size <= (int64_t) limit) + return StringVector::AddString(result, data, size); + + int64_t lim = (int64_t) limit; + int64_t trim = lim; + while (trim > 0 && + (((unsigned char) data[trim]) & 0xC0) == 0x80) + { + trim--; + } + + return StringVector::AddString(result, data, trim); + }); +} + + +/* + * IcebergSizeClampBlobFun byte-truncates a BLOB value to the second + * argument. If the limit is <= 0 the value is returned unchanged. + * Used to enforce Snowflake BINARY per-column caps on the pushdown + * write path. + */ +static void +IcebergSizeClampBlobFun(DataChunk &args, ExpressionState &state, Vector &result) +{ + BinaryExecutor::Execute( + args.data[0], args.data[1], result, args.size(), + [&](string_t input, int32_t limit) { + auto data = input.GetData(); + auto size = (int64_t) input.GetSize(); + + if (limit <= 0 || size <= (int64_t) limit) + return StringVector::AddStringOrBlob(result, data, size); + + return StringVector::AddStringOrBlob(result, data, (idx_t) limit); + }); +} + + +/* + * IcebergSizeCheckTextFun raises if a VARCHAR value's UTF-8 byte length + * exceeds the second argument; otherwise passes through unchanged. The + * third argument is the source column name, included in the error message + * for actionable diagnostics. A non-positive limit disables the check. + * + * Counterpart to IcebergSizeClampTextFun, used on the pushdown write path + * when the table's out_of_range_values policy is the default 'error'. + */ +static void +IcebergSizeCheckTextFun(DataChunk &args, ExpressionState &state, Vector &result) +{ + TernaryExecutor::Execute( + args.data[0], args.data[1], args.data[2], result, args.size(), + [&](string_t input, int32_t limit, string_t column_name) { + auto data = input.GetData(); + auto size = (int64_t) input.GetSize(); + + if (limit > 0 && size > (int64_t) limit) + throw InvalidInputException( + "value of column \"%s\" (text, %lld bytes) exceeds " + "iceberg_max_string_bytes (%d). " + "Set out_of_range_values = 'clamp' on the table to " + "truncate oversize values instead of erroring.", + column_name.GetString().c_str(), (long long) size, limit); + + return StringVector::AddString(result, data, size); + }); +} + + +/* + * IcebergSizeCheckBlobFun raises if a BLOB value exceeds the second + * argument; otherwise passes through unchanged. Counterpart to + * IcebergSizeClampBlobFun. + */ +static void +IcebergSizeCheckBlobFun(DataChunk &args, ExpressionState &state, Vector &result) +{ + TernaryExecutor::Execute( + args.data[0], args.data[1], args.data[2], result, args.size(), + [&](string_t input, int32_t limit, string_t column_name) { + auto data = input.GetData(); + auto size = (int64_t) input.GetSize(); + + if (limit > 0 && size > (int64_t) limit) + throw InvalidInputException( + "value of column \"%s\" (bytea, %lld bytes) exceeds " + "iceberg_max_binary_bytes (%d). " + "Set out_of_range_values = 'clamp' on the table to " + "truncate oversize values instead of erroring.", + column_name.GetString().c_str(), (long long) size, limit); + + return StringVector::AddStringOrBlob(result, data, size); + }); +} + + static void LoadInternal(ExtensionLoader &loader) { @@ -438,6 +657,44 @@ static void LoadInternal(ExtensionLoader &loader) { loader.RegisterFunction(pg_error_nested); } + { + ScalarFunction iceberg_size_clamp_text( + "iceberg_size_clamp_text", + {LogicalType::VARCHAR, LogicalType::INTEGER}, + LogicalType::VARCHAR, + IcebergSizeClampTextFun); + loader.RegisterFunction(iceberg_size_clamp_text); + + ScalarFunction iceberg_size_clamp_blob( + "iceberg_size_clamp_blob", + {LogicalType::BLOB, LogicalType::INTEGER}, + LogicalType::BLOB, + IcebergSizeClampBlobFun); + loader.RegisterFunction(iceberg_size_clamp_blob); + + ScalarFunction iceberg_size_check_text( + "iceberg_size_check_text", + {LogicalType::VARCHAR, LogicalType::INTEGER, LogicalType::VARCHAR}, + LogicalType::VARCHAR, + IcebergSizeCheckTextFun); + loader.RegisterFunction(iceberg_size_check_text); + + ScalarFunction iceberg_size_check_blob( + "iceberg_size_check_blob", + {LogicalType::BLOB, LogicalType::INTEGER, LogicalType::VARCHAR}, + LogicalType::BLOB, + IcebergSizeCheckBlobFun); + loader.RegisterFunction(iceberg_size_check_blob); + + ScalarFunction iceberg_byte_size( + "iceberg_byte_size", + {LogicalType::ANY}, + LogicalType::BIGINT, + IcebergByteSizeFun, + IcebergByteSizeBind); + loader.RegisterFunction(iceberg_byte_size); + } + PgLakeUtilityFunctions::RegisterFunctions(loader); PgLakeFileSystemFunctions::RegisterFunctions(loader); diff --git a/pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h b/pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h index fa013e3d..75b9aac3 100644 --- a/pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h +++ b/pg_lake_engine/include/pg_lake/pgduck/iceberg_datum_validation.h @@ -41,3 +41,40 @@ extern PGDLLEXPORT Datum IcebergErrorOrClampDatum(Datum value, Oid typeOid, int32 typmod, IcebergOutOfRangePolicy policy, bool *isNull); + +/* + * IcebergSizeClampDatum truncates, NULLs, or errors on a Datum so that + * string and binary values fit the byte limits expressed by + * pg_lake_engine.iceberg_max_string_bytes and + * pg_lake_engine.iceberg_max_binary_bytes (0 = no limit). + * + * The behavior on an oversize value is selected by `policy`: + * - ICEBERG_OOR_ERROR (default for Iceberg tables): raise an error + * identifying the column and exceeded GUC. + * - ICEBERG_OOR_CLAMP: silently fix up the value as below. + * - ICEBERG_OOR_NONE: pass through unchanged. + * + * Under CLAMP, lossless types are truncated: + * - text/varchar/bpchar -> trimmed at a UTF-8 character boundary to + * iceberg_max_string_bytes. + * - bytea -> byte-truncated to iceberg_max_binary_bytes. + * + * Structured-string types are replaced with NULL via *isNull = true, + * since truncation would corrupt them: + * - jsonb/json + * + * Recurses through arrays, composites, maps, and domains. Nested values + * that would be NULLed are absorbed as NULL within the reconstructed + * container. + * + * `columnName` is included in the error message under ERROR mode; pass + * NULL or empty when the column context is unknown. + * + * If all three GUCs are 0, the value is returned unchanged regardless of + * policy or type. + */ +extern PGDLLEXPORT Datum IcebergSizeClampDatum(Datum value, Oid typeOid, + int32 typmod, + IcebergOutOfRangePolicy policy, + const char *columnName, + bool *isNull); diff --git a/pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h b/pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h index e058d2ba..ad867e8c 100644 --- a/pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h +++ b/pg_lake_engine/include/pg_lake/pgduck/iceberg_query_validation.h @@ -37,6 +37,27 @@ extern PGDLLEXPORT char *IcebergWrapQueryWithErrorOrClampChecks(char *query, IcebergOutOfRangePolicy policy, bool queryHasRowId); +/* + * IcebergWrapQueryWithSizeClampChecks wraps a query so that values + * exceeding the per-column byte caps (pg_lake_engine.iceberg_max_string_bytes, + * iceberg_max_binary_bytes, iceberg_max_nested_type_bytes) are either + * clamped or rejected, per `policy`: + * + * - ICEBERG_OOR_ERROR (default): raise error identifying the column. + * - ICEBERG_OOR_CLAMP: text/varchar/bpchar truncated at a UTF-8 + * character boundary; bytea byte-truncated; jsonb/json NULLed when + * the serialized form exceeds the limit; arrays/structs/maps NULLed + * when their measured byte size exceeds the nested-type cap. + * - ICEBERG_OOR_NONE: no-op. + * + * Returns the original query unchanged when all GUCs are zero or no + * column carries a clampable type. + */ +extern PGDLLEXPORT char *IcebergWrapQueryWithSizeClampChecks(char *query, + TupleDesc tupleDesc, + IcebergOutOfRangePolicy policy, + bool queryHasRowId); + /* * IcebergWrapQueryWithNativeTypeConversion wraps a query to rewrite * columns whose native DuckDB shape does not match Iceberg's. See the diff --git a/pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h b/pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h index 68b8838f..1024bbdb 100644 --- a/pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h +++ b/pg_lake_engine/include/pg_lake/pgduck/iceberg_validation.h @@ -72,3 +72,23 @@ extern PGDLLEXPORT bool TypeNeedsIcebergValidation(Oid typeOid, int32 typmod, #define TEMPORAL_DATE_MIN_YEAR (-4712) #define TEMPORAL_TIMESTAMP_MIN_YEAR 1 #define TEMPORAL_MAX_YEAR 9999 + +/* + * Downstream byte limits for values written to Iceberg tables, set via the + * pg_lake_engine.iceberg_max_string_bytes and + * pg_lake_engine.iceberg_max_binary_bytes GUCs. 0 means no limit. These + * caps are imposed by some downstream consumers (e.g. Snowflake VARCHAR + * 16 MiB / BINARY 8 MiB) and applied via IcebergSizeClampDatum. + */ +extern PGDLLEXPORT int IcebergMaxStringBytes; +extern PGDLLEXPORT int IcebergMaxBinaryBytes; +extern PGDLLEXPORT int IcebergMaxNestedTypeBytes; + +/* + * TypeNeedsIcebergSizeClamping returns true if a Datum of typeOid (or any + * lossless string / structured-string / bytea component nested within it) + * could potentially be size-clamped by IcebergSizeClampDatum. Recurses + * through arrays, composites, maps, and domains. Independent of the + * current GUC values. + */ +extern PGDLLEXPORT bool TypeNeedsIcebergSizeClamping(Oid typeOid); diff --git a/pg_lake_engine/src/init.c b/pg_lake_engine/src/init.c index 12f8abdf..0e35a47f 100644 --- a/pg_lake_engine/src/init.c +++ b/pg_lake_engine/src/init.c @@ -44,6 +44,7 @@ #include "pg_extension_base/pg_extension_base_ids.h" #include "pg_lake/pgduck/cache_worker.h" #include "pg_lake/pgduck/client.h" +#include "pg_lake/pgduck/iceberg_validation.h" #include "pg_lake/util/s3_writer_utils.h" #include "utils/guc.h" @@ -186,6 +187,50 @@ _PG_init(void) 0, NULL, NULL, NULL); + DefineCustomIntVariable("pg_lake_engine.iceberg_max_string_bytes", + gettext_noop("Maximum bytes for string values written to " + "Iceberg tables. Values of text/varchar/bpchar " + "exceeding this size are truncated at a UTF-8 " + "character boundary; values of jsonb/json are " + "replaced with NULL since truncation would " + "corrupt the structure. 0 disables the limit. " + "Intended for downstream consumers (e.g. " + "Snowflake) that impose per-column byte caps."), + NULL, + &IcebergMaxStringBytes, + 0, 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE, + NULL, NULL, NULL); + + DefineCustomIntVariable("pg_lake_engine.iceberg_max_binary_bytes", + gettext_noop("Maximum bytes for bytea values written to " + "Iceberg tables. Values exceeding this size are " + "byte-truncated. 0 disables the limit."), + NULL, + &IcebergMaxBinaryBytes, + 0, 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE, + NULL, NULL, NULL); + + DefineCustomIntVariable("pg_lake_engine.iceberg_max_nested_type_bytes", + gettext_noop("Maximum bytes for the JSON-serialized form of " + "array, composite, and map values written to " + "Iceberg tables. The whole container is replaced " + "with NULL when the sum of its leaf byte sizes " + "exceeds this size. 0 disables the limit. Distinct " + "from iceberg_max_string_bytes because downstream " + "consumers' OBJECT/ARRAY/VARIANT columns typically " + "have a much larger ceiling than STRING/VARCHAR " + "(e.g. on Snowflake: 128 MiB vs. 16 MiB)."), + NULL, + &IcebergMaxNestedTypeBytes, + 0, 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE, + NULL, NULL, NULL); + DefineCustomStringVariable( "pg_lake.stage_location", gettext_noop("Base URL for @STAGE/ resolution in paths"), diff --git a/pg_lake_engine/src/pgduck/iceberg_datum_validation.c b/pg_lake_engine/src/pgduck/iceberg_datum_validation.c index 455ec4d5..cc6ac5d1 100644 --- a/pg_lake_engine/src/pgduck/iceberg_datum_validation.c +++ b/pg_lake_engine/src/pgduck/iceberg_datum_validation.c @@ -40,15 +40,19 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/detoast.h" #include "catalog/pg_type.h" #include "funcapi.h" #include "datatype/timestamp.h" +#include "mb/pg_wchar.h" #include "pg_lake/pgduck/iceberg_datum_validation.h" #include "pg_lake/pgduck/map.h" #include "pg_lake/pgduck/numeric.h" #include "pg_lake/util/temporal_utils.h" #include "utils/array.h" +#include "utils/builtins.h" #include "utils/date.h" +#include "utils/jsonb.h" #include "utils/lsyscache.h" #include "utils/numeric.h" #include "utils/timestamp.h" @@ -69,6 +73,92 @@ static Datum IcebergErrorOrClampNestedDatum(Datum value, Oid typeOid, int32 typmod, IcebergOutOfRangePolicy policy, bool *isNull, bool *modified); +static Datum IcebergSizeClampStringScalar(Datum value, Oid typeOid, + IcebergOutOfRangePolicy policy, + const char *columnName, + bool *isNull, int32 *byteSize); +static Datum IcebergSizeClampBinaryScalar(Datum value, + IcebergOutOfRangePolicy policy, + const char *columnName, + int32 *byteSize); +static Datum IcebergSizeClampNestedDatum(Datum value, Oid typeOid, + int32 typmod, + IcebergOutOfRangePolicy policy, + const char *columnName, + bool *isNull, bool *modified, + int64 *byteSize); +static void RaiseSizeOverflow(const char *columnName, const char *typeLabel, + int64 size, int64 limit, const char *gucName); + + +/* + * RaiseSizeOverflow ereports a uniform "value too long" error for size-clamp + * violations under ICEBERG_OOR_ERROR. columnName may be NULL/empty when the + * caller doesn't have column context; the message degrades gracefully. + */ +static void +RaiseSizeOverflow(const char *columnName, const char *typeLabel, + int64 size, int64 limit, const char *gucName) +{ + if (columnName != NULL && columnName[0] != '\0') + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("value of column \"%s\" (%s, %lld bytes) exceeds %s (%lld)", + columnName, typeLabel, (long long) size, + gucName, (long long) limit), + errhint("Set out_of_range_values = 'clamp' on the table to " + "truncate oversize values instead of erroring."))); + else + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("%s value of %lld bytes exceeds %s (%lld)", + typeLabel, (long long) size, gucName, + (long long) limit), + errhint("Set out_of_range_values = 'clamp' on the table to " + "truncate oversize values instead of erroring."))); +} + + +/* + * SizeClampVarlenaFastPath returns true if `value`'s entire on-disk varlena + * fits comfortably under every active size-clamp limit, in which case no + * leaf clamp and no aggregate clamp can fire and the recursion can be + * skipped. *byteSize is set to the size as a conservative estimate for + * aggregate roll-up. + * + * The 2x bound covers jsonb's binary-vs-text serialization gap (and applies + * uniformly even when the value has no jsonb leaves; the slight + * conservatism is cheap to swallow). Excludes JSONBOID at the top level + * because IcebergSizeClampStringScalar has its own type-aware fast path + * that checks the binary form against half the per-leaf string limit. + * + * Caller must ensure typeOid is a varlena type (typlen == -1) before + * dereferencing the Datum as a varlena pointer. + */ +static bool +SizeClampVarlenaFastPath(Datum value, Oid typeOid, int64 *byteSize) +{ + if (typeOid == JSONBOID) + return false; + if (get_typlen(typeOid) != -1) + return false; + + int64 totalSize = (int64) toast_raw_datum_size(value) - VARHDRSZ; + int64 safeBound = INT64_MAX; + + if (IcebergMaxStringBytes > 0 && IcebergMaxStringBytes < safeBound) + safeBound = IcebergMaxStringBytes; + if (IcebergMaxBinaryBytes > 0 && IcebergMaxBinaryBytes < safeBound) + safeBound = IcebergMaxBinaryBytes; + if (IcebergMaxNestedTypeBytes > 0 && IcebergMaxNestedTypeBytes < safeBound) + safeBound = IcebergMaxNestedTypeBytes; + + if (safeBound == INT64_MAX || totalSize * 2 > safeBound) + return false; + + *byteSize = totalSize; + return true; +} /* @@ -519,3 +609,340 @@ IcebergErrorOrClampDatum(Datum value, Oid typeOid, int32 typmod, return IcebergErrorOrClampNestedDatum(value, typeOid, typmod, policy, isNull, &modified); } + + +/* + * IcebergSizeClampStringScalar handles a single text/varchar/bpchar/jsonb/json + * Datum. Text-y values are truncated at a UTF-8 character boundary so that + * the result fits within IcebergMaxStringBytes. jsonb/json values that + * exceed the limit are NULLed via *isNull, since truncation would corrupt + * the structure. + * + * *byteSize reports the byte length of the post-clamp value as the downstream + * consumer will see it (text-serialized form for jsonb; varlena content + * length for the others). Used by container recursion to compute aggregate + * sizes. Set to 0 when the result is NULL. + * + * Fast paths that avoid allocations for under-the-limit values: + * - text/varchar/bpchar/json: PG_DETOAST_DATUM_PACKED is a no-op for + * non-toasted (the common case); VARSIZE_ANY_EXHDR + branch returns the + * value unchanged, no copy. + * - jsonb: jsonb_out is unavoidable when we need an exact text length, but + * skip it entirely when neither leaf nor aggregate clamping is active, + * or when the binary varlena size is small enough that the text form is + * bounded under the limit (binSize <= maxBytes / 2 — text rendering of + * typical jsonb data adds overhead in only a small constant factor over + * the binary representation, and our worst case lives at 2x). + */ +static Datum +IcebergSizeClampStringScalar(Datum value, Oid typeOid, + IcebergOutOfRangePolicy policy, + const char *columnName, + bool *isNull, int32 *byteSize) +{ + int32 maxBytes = IcebergMaxStringBytes; + + *byteSize = 0; + + if (typeOid == JSONBOID) + { + /* + * Skip serialization entirely when neither leaf nor aggregate + * clamping needs the size. Aggregate clamping never fires for a + * top-level scalar, but the recursion uses byteSize to roll up into + * containers so we must compute it when aggregate is on. + */ + if (maxBytes == 0 && IcebergMaxNestedTypeBytes == 0) + return value; + + /* + * Cheap binary-size pre-check: if the on-disk binary form is + * comfortably under the per-leaf cap, the text form is also under (we + * use 2x as a generous bound). Avoids the jsonb_out cstring + * allocation for the common case of small jsonb values. + */ + struct varlena *jb = (struct varlena *) PG_DETOAST_DATUM_PACKED(value); + int32 binSize = VARSIZE_ANY_EXHDR(jb); + + if (maxBytes > 0 && (int64) binSize * 2 <= (int64) maxBytes) + { + *byteSize = binSize; + return value; + } + + char *cstr = DatumGetCString(DirectFunctionCall1(jsonb_out, value)); + int32 textLen = strlen(cstr); + + pfree(cstr); + + if (maxBytes == 0 || textLen <= maxBytes) + { + *byteSize = textLen; + return value; + } + + if (policy == ICEBERG_OOR_ERROR) + RaiseSizeOverflow(columnName, "jsonb", textLen, maxBytes, + "iceberg_max_string_bytes"); + + *isNull = true; + return (Datum) 0; + } + + struct varlena *v = (struct varlena *) PG_DETOAST_DATUM_PACKED(value); + int32 srcLen = VARSIZE_ANY_EXHDR(v); + + if (maxBytes == 0 || srcLen <= maxBytes) + { + *byteSize = srcLen; + return value; + } + + if (typeOid == JSONOID) + { + if (policy == ICEBERG_OOR_ERROR) + RaiseSizeOverflow(columnName, "json", srcLen, maxBytes, + "iceberg_max_string_bytes"); + + *isNull = true; + return (Datum) 0; + } + + Assert(typeOid == TEXTOID || typeOid == VARCHAROID || + typeOid == BPCHAROID); + + if (policy == ICEBERG_OOR_ERROR) + { + const char *typeLabel = (typeOid == TEXTOID) ? "text" : + (typeOid == VARCHAROID) ? "varchar" : "bpchar"; + + RaiseSizeOverflow(columnName, typeLabel, srcLen, maxBytes, + "iceberg_max_string_bytes"); + } + + const char *src = VARDATA_ANY(v); + int32 trimmedLen = pg_mbcliplen(src, srcLen, maxBytes); + + struct varlena *result = (struct varlena *) palloc(VARHDRSZ + trimmedLen); + + SET_VARSIZE(result, VARHDRSZ + trimmedLen); + memcpy(VARDATA(result), src, trimmedLen); + + *byteSize = trimmedLen; + return PointerGetDatum(result); +} + + +/* + * IcebergSizeClampBinaryScalar byte-truncates a bytea Datum to + * IcebergMaxBinaryBytes when needed. *byteSize reports the post-clamp + * varlena content length (or pre-clamp length when no clamping is needed, + * for aggregate roll-up). + */ +static Datum +IcebergSizeClampBinaryScalar(Datum value, + IcebergOutOfRangePolicy policy, + const char *columnName, + int32 *byteSize) +{ + int32 maxBytes = IcebergMaxBinaryBytes; + + struct varlena *v = (struct varlena *) PG_DETOAST_DATUM_PACKED(value); + int32 srcLen = VARSIZE_ANY_EXHDR(v); + + if (maxBytes == 0 || srcLen <= maxBytes) + { + *byteSize = srcLen; + return value; + } + + if (policy == ICEBERG_OOR_ERROR) + RaiseSizeOverflow(columnName, "bytea", srcLen, maxBytes, + "iceberg_max_binary_bytes"); + + struct varlena *result = (struct varlena *) palloc(VARHDRSZ + maxBytes); + + SET_VARSIZE(result, VARHDRSZ + maxBytes); + memcpy(VARDATA(result), VARDATA_ANY(v), maxBytes); + + *byteSize = maxBytes; + return PointerGetDatum(result); +} + + +/* + * IcebergSizeClampNestedDatum recursively size-clamps a Datum, deconstructing + * and reconstructing arrays, composites, maps (domain over array of + * composites), and domains. The recursion shape mirrors + * IcebergErrorOrClampNestedDatum. + * + * In addition to per-leaf clamping, an aggregate-size check NULLs the entire + * array or composite when the sum of its leaf byte sizes exceeds + * IcebergMaxNestedTypeBytes (the limit on the serialized form that downstream + * consumers receive when an array/struct lands in an OBJECT/ARRAY/VARIANT + * column; distinct from the per-leaf STRING limit since downstream caps + * usually differ). Snowflake stores semi-structured data without per-record + * headers, so sum-of-leaves is a close-enough approximation of the + * serialized JSON length without paying for an extra serialization pass. + * + * *isNull is set to true when the value is replaced by NULL: a leaf + * jsonb/json over the limit, or a container whose aggregate exceeds the + * limit. Inside containers, NULLed children are absorbed as NULL within + * the reconstructed container. + * + * *modified is set to true when the returned Datum differs from the input, + * allowing callers to skip reconstruction when nothing changed. + * + * *byteSize reports the post-clamp byte size of the value as the consumer + * will see it (text-serialized form for jsonb; sum of leaf bytes for + * containers). Used for aggregate roll-up by the caller. Set to 0 when + * the result is NULL. + */ +static Datum +IcebergSizeClampNestedDatum(Datum value, Oid typeOid, int32 typmod, + IcebergOutOfRangePolicy policy, + const char *columnName, + bool *isNull, bool *modified, int64 *byteSize) +{ + *modified = false; + *byteSize = 0; + + if (typeOid == TEXTOID || typeOid == VARCHAROID || + typeOid == BPCHAROID || typeOid == JSONBOID || + typeOid == JSONOID) + { + int32 leafSize = 0; + Datum result = IcebergSizeClampStringScalar(value, typeOid, + policy, columnName, + isNull, &leafSize); + + *modified = (result != value) || *isNull; + *byteSize = leafSize; + return result; + } + + if (typeOid == BYTEAOID) + { + int32 leafSize = 0; + Datum result = IcebergSizeClampBinaryScalar(value, policy, + columnName, &leafSize); + + *modified = (result != value); + *byteSize = leafSize; + return result; + } + + /* + * Container types (array / composite / map / domain over either): apply a + * single aggregate-size check against IcebergMaxNestedTypeBytes. + * + * We deliberately do NOT recurse into elements/fields to per-leaf-clamp + * inner strings or bytea. Inside an array/object on the consumer side + * the leaves don't have their own column cap — they're just JSON inside + * the parent OBJECT/ARRAY/VARIANT, which has the aggregate cap. And if + * iceberg_max_string_bytes <= iceberg_max_nested_type_bytes, no inner + * leaf can exceed the per-leaf limit while staying inside an under-cap + * container. + * + * So a container is either small enough to pass through verbatim, or big + * enough to NULL (CLAMP) / error (ERROR). No deconstruct/deform, no + * per-element walk. + */ + Oid elemType = get_element_type(typeOid); + + if (OidIsValid(elemType)) + { + int64 aggregateSize = (int64) toast_raw_datum_size(value) - VARHDRSZ; + + if (IcebergMaxNestedTypeBytes > 0 && aggregateSize > IcebergMaxNestedTypeBytes) + { + if (policy == ICEBERG_OOR_ERROR) + RaiseSizeOverflow(columnName, "array", aggregateSize, + IcebergMaxNestedTypeBytes, + "iceberg_max_nested_type_bytes"); + + *isNull = true; + *modified = true; + return (Datum) 0; + } + + *byteSize = aggregateSize; + return value; + } + + char typtype = get_typtype(typeOid); + + if (typtype == TYPTYPE_DOMAIN) + { + int32 baseTypmod = typmod; + Oid baseType = getBaseTypeAndTypmod(typeOid, &baseTypmod); + + return IcebergSizeClampNestedDatum(value, baseType, baseTypmod, + policy, columnName, + isNull, modified, byteSize); + } + + if (typtype == TYPTYPE_COMPOSITE) + { + int64 aggregateSize = (int64) toast_raw_datum_size(value) - VARHDRSZ; + + if (IcebergMaxNestedTypeBytes > 0 && aggregateSize > IcebergMaxNestedTypeBytes) + { + if (policy == ICEBERG_OOR_ERROR) + RaiseSizeOverflow(columnName, "composite", aggregateSize, + IcebergMaxNestedTypeBytes, + "iceberg_max_nested_type_bytes"); + + *isNull = true; + *modified = true; + return (Datum) 0; + } + + *byteSize = aggregateSize; + return value; + } + + return value; +} + + +/* + * IcebergSizeClampDatum truncates or NULLs a Datum so that string and binary + * values fit the byte limits expressed by + * pg_lake_engine.iceberg_max_string_bytes and + * pg_lake_engine.iceberg_max_binary_bytes. + * + * See header comment for full semantics. When both GUCs are 0 the value + * passes through unchanged. + */ +Datum +IcebergSizeClampDatum(Datum value, Oid typeOid, int32 typmod, + IcebergOutOfRangePolicy policy, + const char *columnName, bool *isNull) +{ + *isNull = false; + + if (policy == ICEBERG_OOR_NONE || + (IcebergMaxStringBytes == 0 && IcebergMaxBinaryBytes == 0 && + IcebergMaxNestedTypeBytes == 0)) + return value; + + bool modified = false; + int64 byteSize = 0; + + /* + * Fast path for varlena values (text/varchar/bpchar/bytea/json/array/ + * struct/map): if the entire on-disk varlena fits comfortably under every + * active limit, no leaf clamp and no aggregate clamp can fire. Skips the + * recursive deconstruct/deform that the slow path would do — the + * typical case where a column's type is clampable but the row's value + * happens to be small. Sound under both CLAMP and ERROR because no limit + * can be exceeded when the entire varlena fits comfortably. + */ + if (SizeClampVarlenaFastPath(value, typeOid, &byteSize)) + return value; + + return IcebergSizeClampNestedDatum(value, typeOid, typmod, policy, + columnName, isNull, &modified, + &byteSize); +} diff --git a/pg_lake_engine/src/pgduck/iceberg_query_validation.c b/pg_lake_engine/src/pgduck/iceberg_query_validation.c index 858e5d63..6fb2fb91 100644 --- a/pg_lake_engine/src/pgduck/iceberg_query_validation.c +++ b/pg_lake_engine/src/pgduck/iceberg_query_validation.c @@ -469,6 +469,251 @@ IcebergWrapQueryWithErrorOrClampChecks(char *query, TupleDesc tupleDesc, } +/* ================================================================ + * Query wrapping for downstream byte-cap size clamp. + * See IcebergWrapQueryWithSizeClampChecks below. + * ================================================================ */ + +/* + * AppendIcebergSizeClampExpression emits DuckDB SQL that enforces the + * size limits on `expr` per the policy: + * + * ICEBERG_OOR_CLAMP — truncate / NULL the value: + * - text/varchar/bpchar : iceberg_size_clamp_text(expr, $maxStringBytes) + * - bytea : iceberg_size_clamp_blob(expr, $maxBinaryBytes) + * - jsonb / json : NULL when strlen(expr::VARCHAR) exceeds the + * string limit, since truncating the serialized + * form would yield invalid JSON. + * - array / composite / + * map : NULL when iceberg_byte_size(expr) exceeds + * iceberg_max_nested_type_bytes. + * + * ICEBERG_OOR_ERROR — raise on oversize, including the column name in + * the message. Uses iceberg_size_check_text / _blob for the leaf cases + * and DuckDB's built-in error() inside CASE for jsonb/json/containers. + * + * ICEBERG_OOR_NONE — pass through unchanged (function returns false). + * + * Returns true when a transformed expression was written to buf; false if + * the type needs no wrapping (caller emits the bare expression). + * + * `columnName` is the source column's name (already SQL-quoted by the + * caller's call site is not safe — pass the raw string and the helper will + * single-quote it for embedding into the error message). + */ +static bool +AppendIcebergSizeClampExpression(StringInfo buf, const char *expr, + const char *columnName, + IcebergOutOfRangePolicy policy, + Oid typeOid, int32 typmod, int depth) +{ + if (policy == ICEBERG_OOR_NONE) + return false; + + if (typeOid == TEXTOID || typeOid == VARCHAROID || typeOid == BPCHAROID) + { + if (policy == ICEBERG_OOR_ERROR) + appendStringInfo(buf, + "iceberg_size_check_text(%s, %d, '%s')", + expr, IcebergMaxStringBytes, columnName); + else + appendStringInfo(buf, "iceberg_size_clamp_text(%s, %d)", + expr, IcebergMaxStringBytes); + return true; + } + + if (typeOid == BYTEAOID) + { + if (policy == ICEBERG_OOR_ERROR) + appendStringInfo(buf, + "iceberg_size_check_blob(%s, %d, '%s')", + expr, IcebergMaxBinaryBytes, columnName); + else + appendStringInfo(buf, "iceberg_size_clamp_blob(%s, %d)", + expr, IcebergMaxBinaryBytes); + return true; + } + + if (typeOid == JSONBOID || typeOid == JSONOID) + { + const char *typeLabel = (typeOid == JSONBOID) ? "jsonb" : "json"; + + if (policy == ICEBERG_OOR_ERROR) + appendStringInfo(buf, + "(CASE WHEN strlen(%s::VARCHAR) > %d " + "THEN error(printf(" + "'value of column \"%s\" (%s, %%d bytes) " + "exceeds iceberg_max_string_bytes (%d). " + "Set out_of_range_values = ''clamp'' on the " + "table to truncate oversize values instead " + "of erroring.', strlen(%s::VARCHAR))) " + "ELSE %s END)", + expr, IcebergMaxStringBytes, + columnName, typeLabel, + IcebergMaxStringBytes, + expr, expr); + else + appendStringInfo(buf, + "(CASE WHEN strlen(%s::VARCHAR) > %d " + "THEN NULL ELSE %s END)", + expr, IcebergMaxStringBytes, expr); + return true; + } + + /* + * Containers (array / composite / map): aggregate-only check. Skip + * entirely when the aggregate GUC is disabled. + */ + if (IcebergMaxNestedTypeBytes <= 0) + return false; + + if (OidIsValid(get_element_type(typeOid)) || + IsMapTypeOid(typeOid) || + get_typtype(typeOid) == TYPTYPE_COMPOSITE) + { + const char *typeLabel = + OidIsValid(get_element_type(typeOid)) ? "array" : + IsMapTypeOid(typeOid) ? "map" : "composite"; + + if (policy == ICEBERG_OOR_ERROR) + appendStringInfo(buf, + "(CASE WHEN iceberg_byte_size(%s) > %d " + "THEN error(printf(" + "'value of column \"%s\" (%s, %%d bytes) " + "exceeds iceberg_max_nested_type_bytes (%d). " + "Set out_of_range_values = ''clamp'' on the " + "table to truncate oversize values instead " + "of erroring.', iceberg_byte_size(%s))) " + "ELSE %s END)", + expr, IcebergMaxNestedTypeBytes, + columnName, typeLabel, + IcebergMaxNestedTypeBytes, + expr, expr); + else + appendStringInfo(buf, + "(CASE WHEN iceberg_byte_size(%s) > %d " + "THEN NULL ELSE %s END)", + expr, IcebergMaxNestedTypeBytes, expr); + return true; + } + + /* domain: unwrap to base type and recurse */ + if (get_typtype(typeOid) == TYPTYPE_DOMAIN) + { + Oid baseType = getBaseTypeAndTypmod(typeOid, &typmod); + + return AppendIcebergSizeClampExpression(buf, expr, columnName, policy, + baseType, typmod, depth); + } + + return false; +} + + +/* + * IcebergWrapQueryWithSizeClampChecks wraps a query with an outer SELECT + * that enforces the per-column size limits on each clampable column. + * Behavior on oversize values is selected by `policy`: + * + * - ICEBERG_OOR_ERROR: raise an error identifying the column / type / + * exceeded GUC (default for Iceberg tables). + * - ICEBERG_OOR_CLAMP: silently truncate / NULL. + * - ICEBERG_OOR_NONE: no-op, original query returned. + * + * When all GUCs are 0 or no column carries a clampable type, the original + * query is returned unchanged so the wrapper is free for non-clamping + * callers. + * + * Both the INSERT..SELECT pushdown path (via WriteQueryResultTo) and the + * snowflake_cdc snapshot path (via AddQueryResultToTable) flow through + * this wrapper, so they share the same policy-driven behavior as the + * per-tuple IcebergSizeCheckOrClampSlotInPlace path. + */ +char * +IcebergWrapQueryWithSizeClampChecks(char *query, TupleDesc tupleDesc, + IcebergOutOfRangePolicy policy, + bool queryHasRowId) +{ + if (tupleDesc == NULL || policy == ICEBERG_OOR_NONE) + return query; + + if (IcebergMaxStringBytes == 0 && IcebergMaxBinaryBytes == 0 && + IcebergMaxNestedTypeBytes == 0) + return query; + + bool needsAnyClamp = false; + + for (int i = 0; i < tupleDesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); + + if (attr->attisdropped) + continue; + + if (TypeNeedsIcebergSizeClamping(attr->atttypid)) + { + needsAnyClamp = true; + break; + } + } + + if (!needsAnyClamp) + return query; + + StringInfoData wrapped; + + initStringInfo(&wrapped); + appendStringInfoString(&wrapped, "SELECT "); + + bool firstColumn = true; + + for (int i = 0; i < tupleDesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); + + if (attr->attisdropped) + continue; + + if (!firstColumn) + appendStringInfoString(&wrapped, ", "); + + const char *quotedName = + duckdb_quote_identifier(NameStr(attr->attname)); + + StringInfoData exprBuf; + + initStringInfo(&exprBuf); + + if (AppendIcebergSizeClampExpression(&exprBuf, quotedName, + NameStr(attr->attname), policy, + attr->atttypid, + attr->atttypmod, 0)) + { + appendStringInfo(&wrapped, "%s AS %s", exprBuf.data, quotedName); + } + else + { + appendStringInfoString(&wrapped, quotedName); + } + + pfree(exprBuf.data); + + firstColumn = false; + } + + if (queryHasRowId) + { + if (!firstColumn) + appendStringInfoString(&wrapped, ", "); + appendStringInfoString(&wrapped, "_row_id"); + } + + appendStringInfo(&wrapped, " FROM (%s) AS __iceberg_size_clamp", query); + + return wrapped.data; +} + + /* ================================================================ * Query wrapping for native-type -> Iceberg conversion. * See IcebergWrapQueryWithNativeTypeConversion below. diff --git a/pg_lake_engine/src/pgduck/iceberg_validation.c b/pg_lake_engine/src/pgduck/iceberg_validation.c index a3415c52..dbc5df84 100644 --- a/pg_lake_engine/src/pgduck/iceberg_validation.c +++ b/pg_lake_engine/src/pgduck/iceberg_validation.c @@ -39,6 +39,16 @@ static IcebergOutOfRangePolicy GetIcebergOutOfRangePolicyFromOptions(List *options); +/* + * Backing variables for pg_lake_engine.iceberg_max_string_bytes and + * pg_lake_engine.iceberg_max_binary_bytes. Registered in pg_lake_engine + * _PG_init(). 0 means no limit. + */ +int IcebergMaxStringBytes = 0; +int IcebergMaxBinaryBytes = 0; +int IcebergMaxNestedTypeBytes = 0; + + /* * GetIcebergOutOfRangePolicyFromOptions reads the "out_of_range_values" option * from a list of DefElem options (table options). @@ -170,3 +180,45 @@ TypeNeedsIcebergValidation(Oid typeOid, int32 typmod, bool isPushdown) return false; } + + +/* + * TypeNeedsIcebergSizeClamping returns true if a Datum of typeOid contains + * any leaf type that could be size-clamped: text/varchar/bpchar/bytea (which + * truncate) or jsonb/json (which become NULL). It also returns true for + * any array, composite, or map type, since their aggregate JSON-serialized + * size can exceed the downstream consumer's column cap regardless of + * element/field types (e.g. an int[] of millions of values). Recurses + * through domains. + * + * Independent of the current pg_lake_engine.iceberg_max_string_bytes and + * pg_lake_engine.iceberg_max_binary_bytes values: this is a static + * type-shape check used to gate the per-row clamp call cheaply. + */ +bool +TypeNeedsIcebergSizeClamping(Oid typeOid) +{ + if (typeOid == TEXTOID || typeOid == VARCHAROID || + typeOid == BPCHAROID || typeOid == BYTEAOID || + typeOid == JSONBOID || typeOid == JSONOID) + return true; + + /* Any array type: aggregate size matters even for non-clampable elements. */ + if (OidIsValid(get_element_type(typeOid))) + return true; + + /* map check must precede the generic domain unwrap (maps are domains) */ + if (IsMapTypeOid(typeOid)) + return true; + + char typtype = get_typtype(typeOid); + + if (typtype == TYPTYPE_DOMAIN) + return TypeNeedsIcebergSizeClamping(getBaseType(typeOid)); + + /* Any composite type: aggregate size of fields matters. */ + if (typtype == TYPTYPE_COMPOSITE) + return true; + + return false; +} diff --git a/pg_lake_engine/src/pgduck/write_data.c b/pg_lake_engine/src/pgduck/write_data.c index 6e3fe5ff..8b94bbe2 100644 --- a/pg_lake_engine/src/pgduck/write_data.c +++ b/pg_lake_engine/src/pgduck/write_data.c @@ -134,6 +134,14 @@ WriteQueryResultTo(char *query, queryHasRowId); } + if (IcebergMaxStringBytes > 0 || IcebergMaxBinaryBytes > 0 || + IcebergMaxNestedTypeBytes > 0) + { + query = IcebergWrapQueryWithSizeClampChecks(query, queryTupleDesc, + outOfRangePolicy, + queryHasRowId); + } + if (wrapNativeTypes && destinationFormat == DATA_FORMAT_ICEBERG) { query = IcebergWrapQueryWithNativeTypeConversion(query, queryTupleDesc, diff --git a/pg_lake_table/src/fdw/pg_lake_table.c b/pg_lake_table/src/fdw/pg_lake_table.c index d4107894..79a00f07 100644 --- a/pg_lake_table/src/fdw/pg_lake_table.c +++ b/pg_lake_table/src/fdw/pg_lake_table.c @@ -243,6 +243,15 @@ typedef struct PgLakeModifyState IcebergOutOfRangePolicy outOfRangePolicy; bool needsOutOfRangeValidation; + /* + * needsSizeClamping is true if any column type contains a leaf type + * (text/varchar/bpchar/bytea/jsonb/json) that could potentially be + * size-clamped by IcebergSizeClampDatum at write time, when either + * pg_lake_engine.iceberg_max_string_bytes or + * pg_lake_engine.iceberg_max_binary_bytes is set. + */ + bool needsSizeClamping; + /* slot used for position deletes */ TupleTableSlot *deleteSlot; @@ -2591,6 +2600,28 @@ TupleDescNeedsIcebergValidation(TupleDesc tupleDesc) } +/* + * TupleDescNeedsIcebergSizeClamping returns true if any non-dropped column + * could potentially be size-clamped at write time. + */ +static bool +TupleDescNeedsIcebergSizeClamping(TupleDesc tupleDesc) +{ + for (int i = 0; i < tupleDesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); + + if (attr->attisdropped) + continue; + + if (TypeNeedsIcebergSizeClamping(attr->atttypid)) + return true; + } + + return false; +} + + /* * IcebergErrorOrClampSlotInPlace clamps or rejects out-of-range temporal and numeric * values in the slot, modifying it in-place. Handles nested types (arrays, @@ -2635,6 +2666,52 @@ IcebergErrorOrClampSlotInPlace(TupleTableSlot *slot, TupleDesc tupleDesc, } +/* + * IcebergSizeCheckOrClampSlotInPlace enforces the size-clamping GUCs on + * string/binary/nested values in `slot`. Behavior is selected per-table + * via `out_of_range_values`: + * - ICEBERG_OOR_ERROR (default): raises on the first oversize value, + * identifying the column. + * - ICEBERG_OOR_CLAMP: truncates / NULLs the value in place. + * - ICEBERG_OOR_NONE: no-op. + * + * Handles nested types via IcebergSizeClampDatum's recursive walker. + */ +static void +IcebergSizeCheckOrClampSlotInPlace(TupleTableSlot *slot, TupleDesc tupleDesc, + IcebergOutOfRangePolicy policy) +{ + int natts = tupleDesc->natts; + + if (policy == ICEBERG_OOR_NONE) + return; + + slot_getallattrs(slot); + + for (int i = 0; i < natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); + + if (attr->attisdropped || slot->tts_isnull[i]) + continue; + + if (!TypeNeedsIcebergSizeClamping(attr->atttypid)) + continue; + + bool isNull = false; + Datum clamped = IcebergSizeClampDatum(slot->tts_values[i], + attr->atttypid, + attr->atttypmod, + policy, + NameStr(attr->attname), + &isNull); + + slot->tts_values[i] = clamped; + slot->tts_isnull[i] = isNull; + } +} + + /* * ClampAndCheckConstraints normalizes the slot for Iceberg write and then * runs PostgreSQL constraint checks (NOT NULL, CHECK, etc.). @@ -2655,6 +2732,12 @@ ClampAndCheckConstraints(PgLakeModifyState * fmstate, IcebergErrorOrClampSlotInPlace(slot, fmstate->tupleDesc, fmstate->outOfRangePolicy); + if (fmstate->needsSizeClamping && + (IcebergMaxStringBytes > 0 || IcebergMaxBinaryBytes > 0 || + IcebergMaxNestedTypeBytes > 0)) + IcebergSizeCheckOrClampSlotInPlace(slot, fmstate->tupleDesc, + fmstate->outOfRangePolicy); + Relation rel = resultRelInfo->ri_RelationDesc; if (rel->rd_att->constr) @@ -3501,6 +3584,7 @@ create_foreign_modify(Relation rel, fmstate->outOfRangePolicy = GetIcebergOutOfRangePolicyForTable(relationId); fmstate->needsOutOfRangeValidation = TupleDescNeedsIcebergValidation(fmstate->tupleDesc); + fmstate->needsSizeClamping = TupleDescNeedsIcebergSizeClamping(fmstate->tupleDesc); } if (operation == CMD_UPDATE || operation == CMD_DELETE) diff --git a/pg_lake_table/tests/pytests/test_iceberg_size_clamping.py b/pg_lake_table/tests/pytests/test_iceberg_size_clamping.py new file mode 100644 index 00000000..46e8781d --- /dev/null +++ b/pg_lake_table/tests/pytests/test_iceberg_size_clamping.py @@ -0,0 +1,672 @@ +import pytest +from utils_pytest import * + + +def test_size_clamp_text_bytea_jsonb(s3, pg_conn, extension, with_default_location): + """text/varchar truncate, bytea byte-truncates, jsonb/json -> NULL. + + pg_lake_engine.iceberg_max_string_bytes governs text/varchar/bpchar/jsonb/json + pg_lake_engine.iceberg_max_binary_bytes governs bytea. + + UTF-8 boundary: 'é' is 2 bytes; 8 'é' = 16 bytes truncates to 10 bytes + (5 'é') under a 10-byte limit, since pg_mbcharcliplen never crosses a + character boundary. + """ + schema = "test_size_clamp_basic" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t ( + id int, + t text, + vc varchar(50), + blob bytea, + jb jsonb, + js json + ) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_string_bytes = 10", pg_conn) + run_command("SET pg_lake_engine.iceberg_max_binary_bytes = 5", pg_conn) + run_command( + f""" + INSERT INTO {schema}.t VALUES + (1, repeat('x', 30), repeat('x', 30), + repeat('y', 30)::bytea, + ('{{"k":"' || repeat('z', 100) || '"}}')::jsonb, + ('{{"k":"' || repeat('z', 100) || '"}}')::json), + (2, repeat('é', 8), repeat('é', 8), + 'short'::bytea, + '{{"a":1}}'::jsonb, + '{{"a":1}}'::json), + (3, 'fits', 'fits', NULL, NULL, NULL); + """, + pg_conn, + ) + pg_conn.commit() + + rows = run_query( + f"""SELECT id, + t, octet_length(t) AS t_len, + vc, octet_length(vc) AS vc_len, + blob, octet_length(blob) AS blob_len, + jb, js + FROM {schema}.t ORDER BY id""", + pg_conn, + ) + + # Row 1: ASCII over limit. + assert rows[0]["id"] == 1 + assert rows[0]["t"] == "x" * 10 + assert rows[0]["t_len"] == 10 + assert rows[0]["vc"] == "x" * 10 + assert rows[0]["vc_len"] == 10 + assert bytes(rows[0]["blob"]) == b"y" * 5 + assert rows[0]["blob_len"] == 5 + assert rows[0]["jb"] is None + assert rows[0]["js"] is None + + # Row 2: 8 'é' = 16 bytes -> 5 'é' = 10 bytes; small bytea/jsonb pass through. + assert rows[1]["id"] == 2 + assert rows[1]["t"] == "é" * 5 + assert rows[1]["t_len"] == 10 + assert rows[1]["vc"] == "é" * 5 + assert bytes(rows[1]["blob"]) == b"short" + assert rows[1]["jb"] == {"a": 1} + assert rows[1]["js"] == {"a": 1} + + # Row 3: under-limit values pass through unchanged. + assert rows[2]["id"] == 3 + assert rows[2]["t"] == "fits" + assert rows[2]["vc"] == "fits" + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command("RESET pg_lake_engine.iceberg_max_binary_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_disabled_by_default(s3, pg_conn, extension, with_default_location): + """With both GUCs at the default (0), large values pass through unchanged.""" + schema = "test_size_clamp_off" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, t text, blob bytea, jb jsonb) + USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + # Sanity: GUCs are 0 by default. + rows = run_query("SHOW pg_lake_engine.iceberg_max_string_bytes", pg_conn) + assert rows[0][0] == "0" + rows = run_query("SHOW pg_lake_engine.iceberg_max_binary_bytes", pg_conn) + assert rows[0][0] == "0" + rows = run_query("SHOW pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + assert rows[0][0] == "0" + + run_command( + f"""INSERT INTO {schema}.t VALUES + (1, repeat('x', 1000), repeat('y', 1000)::bytea, + ('{{"k":"' || repeat('z', 200) || '"}}')::jsonb)""", + pg_conn, + ) + pg_conn.commit() + + rows = run_query( + f"SELECT octet_length(t), octet_length(blob), jb IS NOT NULL " + f"FROM {schema}.t", + pg_conn, + ) + assert rows[0][0] == 1000 + assert rows[0][1] == 1000 + assert rows[0][2] is True + finally: + pg_conn.rollback() + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_only_one_guc(s3, pg_conn, extension, with_default_location): + """Setting only the string GUC leaves bytea unchanged, and vice versa.""" + schema = "test_size_clamp_partial" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, t text, blob bytea) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + # Only string limit set: bytea must pass through. + run_command("SET pg_lake_engine.iceberg_max_string_bytes = 5", pg_conn) + run_command( + f"INSERT INTO {schema}.t VALUES " + f"(1, repeat('x', 100), repeat('y', 100)::bytea)", + pg_conn, + ) + pg_conn.commit() + + # Only binary limit set: text must pass through. + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command("SET pg_lake_engine.iceberg_max_binary_bytes = 5", pg_conn) + run_command( + f"INSERT INTO {schema}.t VALUES " + f"(2, repeat('x', 100), repeat('y', 100)::bytea)", + pg_conn, + ) + pg_conn.commit() + + rows = run_query( + f"SELECT id, octet_length(t), octet_length(blob) " + f"FROM {schema}.t ORDER BY id", + pg_conn, + ) + assert rows[0] == [1, 5, 100], f"row 1: {rows[0]}" + assert rows[1] == [2, 100, 5], f"row 2: {rows[1]}" + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command("RESET pg_lake_engine.iceberg_max_binary_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_not_null_jsonb(s3, pg_conn, extension, with_default_location): + """A jsonb column with NOT NULL fails the constraint after clamp-to-NULL. + + Mirrors the existing numeric-NaN + NOT NULL behavior in + test_special_numeric.py: clamping happens before constraint checks, so a + value that becomes NULL through clamping is caught by NOT NULL. + """ + schema = "test_size_clamp_not_null" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, jb jsonb NOT NULL) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_string_bytes = 10", pg_conn) + + err = run_command( + f"INSERT INTO {schema}.t VALUES " + f"(1, ('{{\"k\":\"' || repeat('z', 100) || '\"}}')::jsonb)", + pg_conn, + raise_error=False, + ) + assert "null value" in str(err) and "violates not-null" in str(err) + pg_conn.rollback() + + # An under-limit value succeeds. + run_command(f"INSERT INTO {schema}.t VALUES (2, '{{\"a\":1}}'::jsonb)", pg_conn) + pg_conn.commit() + + rows = run_query(f"SELECT id, jb FROM {schema}.t", pg_conn) + assert len(rows) == 1 + assert rows[0]["id"] == 2 + assert rows[0]["jb"] == {"a": 1} + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_aggregate_array(s3, pg_conn, extension, with_default_location): + """Array whose serialized size exceeds the aggregate limit gets NULLed. + + The aggregate check is on the container's varlena size (per-tuple) or + the JSON-serialized text length (pushdown), so sizes include array + overhead, not just leaf bytes. Inner leaves are not separately clamped. + """ + schema = "test_size_clamp_agg_arr" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, arr text[]) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 100", pg_conn) + # 50 elements of 8 bytes each: well over the 100-byte aggregate. + big = "ARRAY[" + ",".join(f"repeat('x', 8)" for _ in range(50)) + "]" + run_command( + f"INSERT INTO {schema}.t VALUES (1, {big})", + pg_conn, + ) + # Two-element array — under the aggregate limit, passes through. + run_command( + f"INSERT INTO {schema}.t VALUES " + f"(2, ARRAY[repeat('a', 6), repeat('b', 6)])", + pg_conn, + ) + pg_conn.commit() + + rows = run_query(f"SELECT id, arr FROM {schema}.t ORDER BY id", pg_conn) + assert rows[0]["id"] == 1 + assert rows[0]["arr"] is None, f"row 1: expected NULL, got {rows[0]['arr']}" + assert rows[1]["id"] == 2 + assert rows[1]["arr"] == ["a" * 6, "b" * 6] + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_aggregate_composite(s3, pg_conn, extension, with_default_location): + """Composite fields that individually fit but sum over the limit -> NULL the row.""" + schema = "test_size_clamp_agg_rec" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TYPE {schema}.kv AS (a text, b text, c text); + CREATE TABLE {schema}.t (id int, rec {schema}.kv) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 100", pg_conn) + # Each field is 50 bytes; the composite varlena is well over 100. + run_command( + f"INSERT INTO {schema}.t VALUES " + f"(1, ROW(repeat('a', 50), repeat('b', 50), repeat('c', 50))::{schema}.kv)", + pg_conn, + ) + # Each field is 4 bytes; comfortably under the limit. + run_command( + f"INSERT INTO {schema}.t VALUES " + f"(2, ROW(repeat('a', 4), repeat('b', 4), repeat('c', 4))::{schema}.kv)", + pg_conn, + ) + pg_conn.commit() + + rows = run_query(f"SELECT id, rec FROM {schema}.t ORDER BY id", pg_conn) + assert rows[0]["id"] == 1 + assert rows[0]["rec"] is None, f"row 1: expected NULL, got {rows[0]['rec']}" + assert rows[1]["id"] == 2 + assert rows[1]["rec"] is not None + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_aggregate_int_array(s3, pg_conn, extension, with_default_location): + """int[] aggregate over the limit gets NULLed even though int isn't a clampable leaf. + + The 128 MB ARRAY/OBJECT cap on Snowflake applies to all arrays regardless + of element type. We can't truncate individual ints meaningfully, so the + whole array is replaced with NULL when its varlena content (a cheap + upper-bound proxy for the JSON serialization length) exceeds the limit. + + Note: INSERT...SELECT is disabled here since pg_lake_table's pushdown + path bypasses the row-level clamp; INSERT...VALUES drives the per-tuple + FDW callback that runs the clamp. + """ + schema = "test_size_clamp_agg_int" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, arr int[]) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 100", pg_conn) + # 50 int4 elements: ~220 bytes varlena content, > 100 limit -> NULL. + # Inline literal forces VALUES path (pushdown only kicks in for INSERT...SELECT). + big_array = "ARRAY[" + ",".join(str(i) for i in range(1, 51)) + "]" + run_command( + f"INSERT INTO {schema}.t VALUES (1, {big_array})", + pg_conn, + ) + # 5 int4 = ~40 bytes, well under -> passes. + run_command( + f"INSERT INTO {schema}.t VALUES (2, ARRAY[1, 2, 3, 4, 5])", + pg_conn, + ) + pg_conn.commit() + + rows = run_query(f"SELECT id, arr FROM {schema}.t ORDER BY id", pg_conn) + assert rows[0]["id"] == 1 + assert rows[0]["arr"] is None, f"row 1: expected NULL, got {rows[0]['arr']}" + assert rows[1]["id"] == 2 + assert rows[1]["arr"] == [1, 2, 3, 4, 5] + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_insert_select(s3, pg_conn, extension, with_default_location): + """INSERT..SELECT pushdown clamps each row via the SQL wrapper. + + The pushdown path delegates the whole INSERT..SELECT to the query engine + via WriteQueryResultTo, which wraps the inner SELECT with + iceberg_size_clamp_text / _blob calls and aggregate-NULL CASE + expressions before the data ever reaches the destination. Output is + byte-identical to the per-tuple ExecForeignInsert path. + """ + schema = "test_size_clamp_insert_select" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, txt text, blob bytea, jb jsonb) + USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + # Sanity: pushdown is on by default; we want to exercise that path. + rows = run_query("SHOW pg_lake_table.enable_insert_select_pushdown", pg_conn) + assert rows[0][0] == "on" + + run_command("SET pg_lake_engine.iceberg_max_string_bytes = 5", pg_conn) + run_command("SET pg_lake_engine.iceberg_max_binary_bytes = 3", pg_conn) + run_command( + f"INSERT INTO {schema}.t " + f"SELECT g, repeat('x', 30), repeat('y', 30)::bytea, " + f"('{{\"k\":\"' || repeat('z', 100) || '\"}}')::jsonb " + f"FROM generate_series(1, 3) g", + pg_conn, + ) + pg_conn.commit() + + rows = run_query( + f"SELECT id, txt, blob, jb FROM {schema}.t ORDER BY id", pg_conn + ) + assert len(rows) == 3 + for i, row in enumerate(rows, start=1): + assert row["id"] == i + assert row["txt"] == "x" * 5, f"row {i}: txt = {row['txt']!r}" + assert bytes(row["blob"]) == b"y" * 3, f"row {i}: blob = {row['blob']!r}" + assert row["jb"] is None, f"row {i}: jb = {row['jb']!r}" + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command("RESET pg_lake_engine.iceberg_max_binary_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_insert_select_array_aggregate( + s3, pg_conn, extension, with_default_location +): + """INSERT..SELECT pushdown clamps array aggregates: int[] over the limit -> NULL.""" + schema = "test_size_clamp_pd_arr" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, arr int[]) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 100", pg_conn) + run_command( + f"INSERT INTO {schema}.t SELECT 1, array_agg(g) " + f"FROM generate_series(1, 50) g", + pg_conn, + ) + run_command( + f"INSERT INTO {schema}.t SELECT 2, ARRAY[1, 2, 3, 4, 5]", + pg_conn, + ) + pg_conn.commit() + + rows = run_query(f"SELECT id, arr FROM {schema}.t ORDER BY id", pg_conn) + # 50 int4 -> JSON serialization > 100 bytes -> aggregate NULL. + assert rows[0]["id"] == 1 + assert rows[0]["arr"] is None, f"row 1: got {rows[0]['arr']}" + # 5 int4 -> sum well under 100 -> array preserved. + assert rows[1]["id"] == 2 + assert rows[1]["arr"] == [1, 2, 3, 4, 5] + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_clamp_aggregate_int_composite( + s3, pg_conn, extension, with_default_location +): + """Composite of all non-clampable fields whose aggregate exceeds the limit gets NULLed.""" + schema = "test_size_clamp_agg_int_rec" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TYPE {schema}.tup AS (a bigint, b bigint, c bigint); + CREATE TABLE {schema}.t (id int, rec {schema}.tup) USING iceberg WITH (out_of_range_values = 'clamp'); + """, + pg_conn, + ) + pg_conn.commit() + + try: + # 3 bigint = 24 bytes; limit 16 -> NULL. + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 16", pg_conn) + run_command( + f"INSERT INTO {schema}.t VALUES (1, ROW(1::bigint, 2::bigint, 3::bigint)::{schema}.tup)", + pg_conn, + ) + # Limit raised to 100; same row passes. + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 100", pg_conn) + run_command( + f"INSERT INTO {schema}.t VALUES (2, ROW(1::bigint, 2::bigint, 3::bigint)::{schema}.tup)", + pg_conn, + ) + pg_conn.commit() + + rows = run_query(f"SELECT id, rec FROM {schema}.t ORDER BY id", pg_conn) + assert rows[0]["id"] == 1 + assert rows[0]["rec"] is None, f"row 1: expected NULL, got {rows[0]['rec']}" + assert rows[1]["id"] == 2 + assert rows[1]["rec"] is not None + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +# ===================================================================== +# Default policy = error: each oversize type/path raises identifying +# the column / type / exceeded GUC. Mirrors the clamp tests above but +# with no out_of_range_values option (defaults to 'error'). +# ===================================================================== + + +def test_size_check_error_default_per_tuple( + s3, pg_conn, extension, with_default_location +): + """Without out_of_range_values, oversize text/bytea/jsonb on the + per-tuple FDW path raises with column name + type + GUC.""" + schema = "test_size_check_error" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, t text, b bytea, j jsonb) USING iceberg; + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_string_bytes = 10", pg_conn) + run_command("SET pg_lake_engine.iceberg_max_binary_bytes = 5", pg_conn) + # Commit the SETs so they survive each pytest.raises rollback; + # plain SET inside a transaction is rolled back with the failed + # INSERT, which would silently disable clamping for the next case. + pg_conn.commit() + + with pytest.raises( + Exception, match=r'column "t".*text.*iceberg_max_string_bytes \(10\)' + ): + run_command( + f"INSERT INTO {schema}.t VALUES (1, repeat('x', 30), 'short'::bytea, '{{\"k\":1}}'::jsonb)", + pg_conn, + ) + pg_conn.rollback() + + with pytest.raises( + Exception, match=r'column "b".*bytea.*iceberg_max_binary_bytes \(5\)' + ): + run_command( + f"INSERT INTO {schema}.t VALUES (2, 'ok', repeat('y', 30)::bytea, '{{\"k\":1}}'::jsonb)", + pg_conn, + ) + pg_conn.rollback() + + with pytest.raises( + Exception, match=r'column "j".*jsonb.*iceberg_max_string_bytes \(10\)' + ): + run_command( + f"INSERT INTO {schema}.t VALUES (3, 'ok', 'ok'::bytea, ('{{\"k\":\"' || repeat('z', 100) || '\"}}')::jsonb)", + pg_conn, + ) + pg_conn.rollback() + + # Under-limit row passes cleanly. + run_command( + f"INSERT INTO {schema}.t VALUES (4, 'ok', 'ok'::bytea, '{{\"k\":1}}'::jsonb)", + pg_conn, + ) + pg_conn.commit() + + rows = run_query(f"SELECT id FROM {schema}.t", pg_conn) + assert [r["id"] for r in rows] == [4] + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command("RESET pg_lake_engine.iceberg_max_binary_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_check_error_default_aggregate_nested( + s3, pg_conn, extension, with_default_location +): + """Without out_of_range_values, an oversize array raises on the + per-tuple FDW path. Covers the aggregate-cap branch.""" + schema = "test_size_check_nested_err" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, arr text[]) USING iceberg; + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_nested_type_bytes = 100", pg_conn) + + with pytest.raises( + Exception, + match=r'column "arr".*array.*iceberg_max_nested_type_bytes \(100\)', + ): + run_command( + f"INSERT INTO {schema}.t SELECT 1, " + f"ARRAY(SELECT repeat('a', 50) FROM generate_series(1, 50))::text[]", + pg_conn, + ) + pg_conn.rollback() + + run_command( + f"INSERT INTO {schema}.t VALUES (2, ARRAY['hi','there']::text[])", pg_conn + ) + pg_conn.commit() + rows = run_query(f"SELECT id, arr FROM {schema}.t", pg_conn) + assert len(rows) == 1 + assert rows[0]["id"] == 2 + assert rows[0]["arr"] == ["hi", "there"] + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_nested_type_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit() + + +def test_size_check_error_default_insert_select( + s3, pg_conn, extension, with_default_location +): + """Without out_of_range_values, oversize values on the SQL pushdown + (INSERT..SELECT) path raise via the iceberg_size_check_text DuckDB + UDF. The error surfaces as a DuckDB InvalidInputException carrying + the column name and GUC.""" + schema = "test_size_check_pushdown" + + run_command( + f""" + CREATE SCHEMA {schema}; + CREATE TABLE {schema}.t (id int, txt text) USING iceberg; + """, + pg_conn, + ) + pg_conn.commit() + + try: + run_command("SET pg_lake_engine.iceberg_max_string_bytes = 5", pg_conn) + + with pytest.raises( + Exception, match=r'column "txt".*iceberg_max_string_bytes \(5\)' + ): + run_command( + f"INSERT INTO {schema}.t SELECT 1, repeat('x', 30)", + pg_conn, + ) + pg_conn.rollback() + + run_command(f"INSERT INTO {schema}.t SELECT 2, 'ok'", pg_conn) + pg_conn.commit() + + rows = run_query(f"SELECT id, txt FROM {schema}.t", pg_conn) + assert len(rows) == 1 + assert rows[0]["id"] == 2 + assert rows[0]["txt"] == "ok" + finally: + pg_conn.rollback() + run_command("RESET pg_lake_engine.iceberg_max_string_bytes", pg_conn) + run_command(f"DROP SCHEMA {schema} CASCADE", pg_conn) + pg_conn.commit()