Skip to content

Commit 525f874

Browse files
Support iceberg schema type change
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent a85d4fe commit 525f874

13 files changed

Lines changed: 2056 additions & 48 deletions

File tree

pg_lake_engine/src/parquet/field.c

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "pg_lake/util/string_utils.h"
2626

2727
static FieldStructElement * DeepCopyFieldStructElement(FieldStructElement * structElementField);
28+
static bool FieldTypesEqual(const Field * fieldA, const Field * fieldB);
2829

2930
/*
3031
* DeepCopyField deep copies a Field.
@@ -149,11 +150,49 @@ pg_cmp_s32(int32 a, int32 b)
149150

150151

151152

153+
/*
154+
* FieldTypesEqual recursively compares two Field structs for type equality.
155+
*/
156+
static bool
157+
FieldTypesEqual(const Field * fieldA, const Field * fieldB)
158+
{
159+
if (fieldA->type != fieldB->type)
160+
return false;
161+
162+
switch (fieldA->type)
163+
{
164+
case FIELD_TYPE_SCALAR:
165+
return strcmp(fieldA->field.scalar.typeName,
166+
fieldB->field.scalar.typeName) == 0;
167+
case FIELD_TYPE_LIST:
168+
return FieldTypesEqual(fieldA->field.list.element,
169+
fieldB->field.list.element);
170+
case FIELD_TYPE_MAP:
171+
return FieldTypesEqual(fieldA->field.map.key,
172+
fieldB->field.map.key) &&
173+
FieldTypesEqual(fieldA->field.map.value,
174+
fieldB->field.map.value);
175+
case FIELD_TYPE_STRUCT:
176+
{
177+
if (fieldA->field.structType.nfields != fieldB->field.structType.nfields)
178+
return false;
179+
for (size_t i = 0; i < fieldA->field.structType.nfields; i++)
180+
{
181+
if (!FieldTypesEqual(fieldA->field.structType.fields[i].type,
182+
fieldB->field.structType.fields[i].type))
183+
return false;
184+
}
185+
return true;
186+
}
187+
default:
188+
return false;
189+
}
190+
}
191+
192+
152193
/*
153194
* SchemaFieldsEquivalent compares two DataFileSchemaField structs for equivalence.
154195
* It returns true if they are equivalent, false otherwise.
155-
* Note that we do not compare the field->type here, as we do not allow changing
156-
* the type of any field in the schema, including nested types.
157196
*/
158197
bool
159198
SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * fieldB)
@@ -176,11 +215,9 @@ SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * field
176215
if (!PgStrcasecmpNullable(fieldA->initialDefault, fieldB->initialDefault))
177216
return false;
178217

179-
/*
180-
* We don't allow changing any of the types of the fields in the schema,
181-
* including the fields of nested types. So we don't need to compare
182-
* anything about the field->type here.
183-
*/
218+
if (!FieldTypesEqual(fieldA->type, fieldB->type))
219+
return false;
220+
184221
return true;
185222
}
186223

pg_lake_iceberg/src/iceberg/iceberg_type_binary_serde.c

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,10 +371,29 @@ PGIcebergBinaryDeserialize(unsigned char *binaryValue, size_t binaryLen, Field *
371371
}
372372
else if (pgType.postgresTypeOid == INT8OID)
373373
{
374-
binaryValue = FromLittleEndian64(binaryValue);
375-
int64 longValue = *((int64 *) binaryValue);
374+
/*
375+
* Per the Iceberg spec, column metrics are serialized using the type
376+
* at the time the data file was written. After an int -> long type
377+
* promotion, existing data files retain 4-byte int bounds while the
378+
* current schema expects 8-byte long. Widen on read.
379+
*
380+
* See:
381+
* https://iceberg.apache.org/spec/#binary-single-value-serialization
382+
*/
383+
if (binaryLen == sizeof(int32))
384+
{
385+
binaryValue = FromLittleEndian32(binaryValue);
386+
int32 intValue = *((int32 *) binaryValue);
376387

377-
datum = Int64GetDatum(longValue);
388+
datum = Int64GetDatum((int64) intValue);
389+
}
390+
else
391+
{
392+
binaryValue = FromLittleEndian64(binaryValue);
393+
int64 longValue = *((int64 *) binaryValue);
394+
395+
datum = Int64GetDatum(longValue);
396+
}
378397
}
379398
else if (pgType.postgresTypeOid == FLOAT4OID)
380399
{
@@ -385,10 +404,25 @@ PGIcebergBinaryDeserialize(unsigned char *binaryValue, size_t binaryLen, Field *
385404
}
386405
else if (pgType.postgresTypeOid == FLOAT8OID)
387406
{
388-
binaryValue = FromLittleEndian64(binaryValue);
389-
float8 doubleValue = *((float8 *) binaryValue);
407+
/*
408+
* Same as int -> long above: after a float -> double type promotion,
409+
* existing data files retain 4-byte float bounds while the current
410+
* schema expects 8-byte double. Widen on read.
411+
*/
412+
if (binaryLen == sizeof(float4))
413+
{
414+
binaryValue = FromLittleEndian32(binaryValue);
415+
float4 floatValue = *((float4 *) binaryValue);
390416

391-
datum = Float8GetDatum(doubleValue);
417+
datum = Float8GetDatum((float8) floatValue);
418+
}
419+
else
420+
{
421+
binaryValue = FromLittleEndian64(binaryValue);
422+
float8 doubleValue = *((float8 *) binaryValue);
423+
424+
datum = Float8GetDatum(doubleValue);
425+
}
392426
}
393427
else if (pgType.postgresTypeOid == DATEOID)
394428
{

pg_lake_table/include/pg_lake/ddl/ddl_changes.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "nodes/pg_list.h"
2222
#include "nodes/parsenodes.h"
2323

24+
#include "pg_lake/pgduck/type.h"
25+
2426
/*
2527
* DDLOperationType describes a type of ddl operation, which
2628
* cause catalog modification and possibly iceberg metadata modification.
@@ -37,6 +39,7 @@ typedef enum DDLOperationType
3739
DDL_COLUMN_DROP_NOT_NULL = 8,
3840
DDL_TABLE_SET_PARTITION_BY = 9,
3941
DDL_TABLE_DROP_PARTITION_BY = 10,
42+
DDL_COLUMN_ALTER_TYPE = 11,
4043
} DDLOperationType;
4144

4245
/*
@@ -55,6 +58,9 @@ typedef struct IcebergDDLOperation
5558
/* applicable for set/drop default */
5659
const char *writeDefault;
5760

61+
/* applicable for alter column type */
62+
PGType newPgType;
63+
5864
/* applicable for create iceberg table */
5965
bool hasCustomLocation;
6066

pg_lake_table/include/pg_lake/fdw/schema_operations/field_id_mapping_catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ extern PGDLLEXPORT List *GetRegisteredFieldForAttributes(Oid relationId, List *a
3333
extern PGDLLEXPORT DataFileSchemaField * GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo);
3434
extern PGDLLEXPORT AttrNumber GetAttributeForFieldId(Oid relationId, int fieldId);
3535
extern PGDLLEXPORT void UpdateRegisteredFieldWriteDefaultForAttribute(Oid relationId, AttrNumber attNum, const char *writeDefault);
36+
extern PGDLLEXPORT void UpdateRegisteredFieldTypeForAttribute(Oid relationId, AttrNumber attNum, PGType newPgType);
3637
extern PGDLLEXPORT int GetLargestRegisteredFieldId(Oid relationId);
3738
extern PGDLLEXPORT void RegisterIcebergColumnMapping(Oid relationId, Field * field,
3839
AttrNumber attNo, int parentFieldId, PGType pgType,

pg_lake_table/src/ddl/alter_table.c

Lines changed: 170 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include "pg_lake/partitioning/partition_spec_catalog.h"
5959
#include "pg_lake/rest_catalog/rest_catalog.h"
6060
#include "pg_lake/object_store_catalog/object_store_catalog.h"
61+
#include "pg_lake/pgduck/numeric.h"
6162
#include "pg_lake/util/rel_utils.h"
6263

6364

@@ -119,6 +120,7 @@ typedef struct PgLakeDDL
119120
static bool Allowed(Node *arg, Oid relationId);
120121
static bool Disallowed(Node *arg, Oid relationId);
121122
static bool DisallowedAddColumnWithUnsupportedConstraints(Node *arg, Oid relationId);
123+
static bool AllowedAlterColumnTypeForIceberg(Node *arg, Oid relationId);
122124
static bool DisallowedForWritableRestRenameTable(Node *arg, Oid relationId);
123125
static bool DisallowedForWritableRestSetSchema(Node *arg, Oid relationId);
124126

@@ -155,7 +157,7 @@ static const PgLakeDDL PgLakeDDLs[] = {
155157
#if PG_VERSION_NUM >= 170000
156158
ALTER_TABLE_DDL(AT_SetExpression, Disallowed, Disallowed),
157159
#endif
158-
ALTER_TABLE_DDL(AT_AlterColumnType, Disallowed, Disallowed),
160+
ALTER_TABLE_DDL(AT_AlterColumnType, AllowedAlterColumnTypeForIceberg, Disallowed),
159161

160162
/* allowed for writable tables, not allowed for iceberg tables */
161163
ALTER_TABLE_DDL(AT_AddIdentity, Disallowed, Allowed),
@@ -409,6 +411,32 @@ CreateDDLOperationsForAlterTable(AlterTableStmt *alterStmt)
409411

410412
ddlOperations = lappend(ddlOperations, ddlOperation);
411413
}
414+
else if (subcommand->subtype == AT_AlterColumnType)
415+
{
416+
char *columnName = subcommand->name;
417+
AttrNumber attrNo = get_attnum(relationId, columnName);
418+
419+
IcebergDDLOperation *ddlOperation = palloc0(sizeof(IcebergDDLOperation));
420+
421+
ddlOperation->type = DDL_COLUMN_ALTER_TYPE;
422+
ddlOperation->attrNumber = attrNo;
423+
424+
/*
425+
* At this point, PgLakeCommonParentProcessUtility has already
426+
* been called so the column type in pg_attribute is updated. We
427+
* read the new type from the relation.
428+
*/
429+
Oid newTypeOid = InvalidOid;
430+
int32 newTypMod = -1;
431+
Oid newCollId = InvalidOid;
432+
433+
get_atttypetypmodcoll(relationId, attrNo,
434+
&newTypeOid, &newTypMod, &newCollId);
435+
436+
ddlOperation->newPgType = MakePGType(newTypeOid, newTypMod);
437+
438+
ddlOperations = lappend(ddlOperations, ddlOperation);
439+
}
412440
else if (subcommand->subtype == AT_DropNotNull)
413441
{
414442
IcebergDDLOperation *ddlOperation = palloc0(sizeof(IcebergDDLOperation));
@@ -841,10 +869,40 @@ ErrorIfUnsupportedAlterWritablePgLakeTableStmt(AlterTableStmt *alterStmt,
841869

842870
const char *cmdTypeStr = PgLakeUnsupportedAlterTableToString(cmd);
843871

844-
ereport(ERROR,
845-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
846-
errmsg("ALTER TABLE %s command not supported for "
847-
"%s tables", cmdTypeStr, tableTypeStr)));
872+
if (cmd->subtype == AT_AlterColumnType &&
873+
tableType == PG_LAKE_ICEBERG_TABLE_TYPE)
874+
{
875+
ColumnDef *def = (ColumnDef *) cmd->def;
876+
877+
if (def->raw_default != NULL)
878+
{
879+
ereport(ERROR,
880+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
881+
errmsg("ALTER TABLE %s command not supported for "
882+
"%s tables", cmdTypeStr, tableTypeStr),
883+
errdetail("USING requires rewriting data files, "
884+
"but Iceberg schema evolution is a "
885+
"metadata-only operation.")));
886+
}
887+
else
888+
{
889+
ereport(ERROR,
890+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
891+
errmsg("ALTER TABLE %s command not supported for "
892+
"%s tables", cmdTypeStr, tableTypeStr),
893+
errdetail("Allowed type promotions for Iceberg tables "
894+
"are: int -> bigint, float -> double, and "
895+
"decimal(P,S) -> decimal(P',S) where P' > P "
896+
"(wider precision, same scale).")));
897+
}
898+
}
899+
else
900+
{
901+
ereport(ERROR,
902+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
903+
errmsg("ALTER TABLE %s command not supported for "
904+
"%s tables", cmdTypeStr, tableTypeStr)));
905+
}
848906
}
849907
}
850908

@@ -1043,6 +1101,113 @@ Disallowed(Node *arg, Oid relationId)
10431101
}
10441102

10451103

1104+
/*
1105+
* AllowedAlterColumnTypeForIceberg validates whether a column type change
1106+
* is allowed for an Iceberg table per the Iceberg spec v2 type promotion rules.
1107+
*
1108+
* The Iceberg spec allows the following type promotions:
1109+
* - int -> long
1110+
* - float -> double
1111+
* - decimal(P, S) -> decimal(P', S) where P' > P (wider precision, same scale)
1112+
*
1113+
* See: https://iceberg.apache.org/spec/#schema-evolution
1114+
*/
1115+
static bool
1116+
AllowedAlterColumnTypeForIceberg(Node *arg, Oid relationId)
1117+
{
1118+
AlterTableCmd *cmd = (AlterTableCmd *) arg;
1119+
1120+
Assert(cmd->subtype == AT_AlterColumnType);
1121+
1122+
char *columnName = cmd->name;
1123+
ColumnDef *def = (ColumnDef *) cmd->def;
1124+
TypeName *newTypeName = def->typeName;
1125+
1126+
/*
1127+
* Reject USING clause. USING requires rewriting data files, but Iceberg
1128+
* schema evolution is metadata-only so data files are never rewritten.
1129+
*/
1130+
if (def->raw_default != NULL)
1131+
return false;
1132+
1133+
/* resolve the new type OID and typmod */
1134+
int32 newTypMod = 0;
1135+
Oid newTypeOid = InvalidOid;
1136+
1137+
typenameTypeIdAndMod(NULL, newTypeName, &newTypeOid, &newTypMod);
1138+
1139+
/* get the current column type from the relation */
1140+
AttrNumber attrNum = get_attnum(relationId, columnName);
1141+
1142+
if (attrNum == InvalidAttrNumber)
1143+
return false;
1144+
1145+
Oid currentTypeOid = InvalidOid;
1146+
int32 currentTypMod = -1;
1147+
Oid currentCollId = InvalidOid;
1148+
1149+
get_atttypetypmodcoll(relationId, attrNum,
1150+
&currentTypeOid, &currentTypMod, &currentCollId);
1151+
1152+
/* same type is always allowed (no-op from Iceberg perspective) */
1153+
if (currentTypeOid == newTypeOid && currentTypMod == newTypMod)
1154+
return true;
1155+
1156+
/*
1157+
* Iceberg type promotion: int -> long
1158+
*
1159+
* PostgreSQL int2 and int4 both map to Iceberg "int", and int8 maps to
1160+
* Iceberg "long". So we allow int2/int4 -> int8.
1161+
*/
1162+
if ((currentTypeOid == INT4OID || currentTypeOid == INT2OID) &&
1163+
newTypeOid == INT8OID)
1164+
return true;
1165+
1166+
/*
1167+
* Iceberg type promotion: float -> double
1168+
*
1169+
* PostgreSQL float4 maps to Iceberg "float" and float8 maps to Iceberg
1170+
* "double". So we allow float4 -> float8.
1171+
*/
1172+
if (currentTypeOid == FLOAT4OID && newTypeOid == FLOAT8OID)
1173+
return true;
1174+
1175+
/*
1176+
* Iceberg type promotion: decimal(P, S) -> decimal(P', S) where P' > P
1177+
*
1178+
* Both must be numeric, the new precision must be greater, and the scale
1179+
* must remain the same.
1180+
*/
1181+
if (currentTypeOid == NUMERICOID && newTypeOid == NUMERICOID)
1182+
{
1183+
int currentPrecision = -1;
1184+
int currentScale = -1;
1185+
int newPrecision = -1;
1186+
int newScale = -1;
1187+
1188+
GetDuckdbAdjustedPrecisionAndScaleFromNumericTypeMod(currentTypMod,
1189+
&currentPrecision,
1190+
&currentScale);
1191+
GetDuckdbAdjustedPrecisionAndScaleFromNumericTypeMod(newTypMod,
1192+
&newPrecision,
1193+
&newScale);
1194+
1195+
/* new precision must be wider and scale must stay the same */
1196+
if (newPrecision > currentPrecision && newScale == currentScale)
1197+
{
1198+
/* also validate the new type is supported for Iceberg tables */
1199+
ErrorIfTypeUnsupportedNumericForIcebergTables(newTypMod, columnName);
1200+
return true;
1201+
}
1202+
1203+
return false;
1204+
}
1205+
1206+
/* all other type changes are disallowed */
1207+
return false;
1208+
}
1209+
1210+
10461211
/*
10471212
* We have not yet implemented table renames in REST catalog.
10481213
*/

0 commit comments

Comments
 (0)