Skip to content

Commit a4c0a79

Browse files
Support iceberg schema type change
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent 62cb274 commit a4c0a79

16 files changed

Lines changed: 2456 additions & 75 deletions

File tree

pg_lake_engine/src/parquet/field.c

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "pg_lake/util/string_utils.h"
2626

2727
static FieldStructElement * DeepCopyFieldStructElement(FieldStructElement * structElementField);
28+
static bool FieldTypesEqual(const Field * fieldA, const Field * fieldB);
2829

2930
/*
3031
* DeepCopyField deep copies a Field.
@@ -149,11 +150,49 @@ pg_cmp_s32(int32 a, int32 b)
149150

150151

151152

153+
/*
154+
* FieldTypesEqual recursively compares two Field structs for type equality.
155+
*/
156+
static bool
157+
FieldTypesEqual(const Field * fieldA, const Field * fieldB)
158+
{
159+
if (fieldA->type != fieldB->type)
160+
return false;
161+
162+
switch (fieldA->type)
163+
{
164+
case FIELD_TYPE_SCALAR:
165+
return strcmp(fieldA->field.scalar.typeName,
166+
fieldB->field.scalar.typeName) == 0;
167+
case FIELD_TYPE_LIST:
168+
return FieldTypesEqual(fieldA->field.list.element,
169+
fieldB->field.list.element);
170+
case FIELD_TYPE_MAP:
171+
return FieldTypesEqual(fieldA->field.map.key,
172+
fieldB->field.map.key) &&
173+
FieldTypesEqual(fieldA->field.map.value,
174+
fieldB->field.map.value);
175+
case FIELD_TYPE_STRUCT:
176+
{
177+
if (fieldA->field.structType.nfields != fieldB->field.structType.nfields)
178+
return false;
179+
for (size_t i = 0; i < fieldA->field.structType.nfields; i++)
180+
{
181+
if (!FieldTypesEqual(fieldA->field.structType.fields[i].type,
182+
fieldB->field.structType.fields[i].type))
183+
return false;
184+
}
185+
return true;
186+
}
187+
default:
188+
return false;
189+
}
190+
}
191+
192+
152193
/*
153194
* SchemaFieldsEquivalent compares two DataFileSchemaField structs for equivalence.
154195
* It returns true if they are equivalent, false otherwise.
155-
* Note that we do not compare the field->type here, as we do not allow changing
156-
* the type of any field in the schema, including nested types.
157196
*/
158197
bool
159198
SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * fieldB)
@@ -176,11 +215,9 @@ SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * field
176215
if (!PgStrcasecmpNullable(fieldA->initialDefault, fieldB->initialDefault))
177216
return false;
178217

179-
/*
180-
* We don't allow changing any of the types of the fields in the schema,
181-
* including the fields of nested types. So we don't need to compare
182-
* anything about the field->type here.
183-
*/
218+
if (!FieldTypesEqual(fieldA->type, fieldB->type))
219+
return false;
220+
184221
return true;
185222
}
186223

pg_lake_iceberg/src/iceberg/iceberg_type_binary_serde.c

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,10 +371,29 @@ PGIcebergBinaryDeserialize(unsigned char *binaryValue, size_t binaryLen, Field *
371371
}
372372
else if (pgType.postgresTypeOid == INT8OID)
373373
{
374-
binaryValue = FromLittleEndian64(binaryValue);
375-
int64 longValue = *((int64 *) binaryValue);
374+
/*
375+
* Per the Iceberg spec, column metrics are serialized using the type
376+
* at the time the data file was written. After an int -> long type
377+
* promotion, existing data files retain 4-byte int bounds while the
378+
* current schema expects 8-byte long. Widen on read.
379+
*
380+
* See:
381+
* https://iceberg.apache.org/spec/#binary-single-value-serialization
382+
*/
383+
if (binaryLen == sizeof(int32))
384+
{
385+
binaryValue = FromLittleEndian32(binaryValue);
386+
int32 intValue = *((int32 *) binaryValue);
376387

377-
datum = Int64GetDatum(longValue);
388+
datum = Int64GetDatum((int64) intValue);
389+
}
390+
else
391+
{
392+
binaryValue = FromLittleEndian64(binaryValue);
393+
int64 longValue = *((int64 *) binaryValue);
394+
395+
datum = Int64GetDatum(longValue);
396+
}
378397
}
379398
else if (pgType.postgresTypeOid == FLOAT4OID)
380399
{
@@ -385,10 +404,25 @@ PGIcebergBinaryDeserialize(unsigned char *binaryValue, size_t binaryLen, Field *
385404
}
386405
else if (pgType.postgresTypeOid == FLOAT8OID)
387406
{
388-
binaryValue = FromLittleEndian64(binaryValue);
389-
float8 doubleValue = *((float8 *) binaryValue);
407+
/*
408+
* Same as int -> long above: after a float -> double type promotion,
409+
* existing data files retain 4-byte float bounds while the current
410+
* schema expects 8-byte double. Widen on read.
411+
*/
412+
if (binaryLen == sizeof(float4))
413+
{
414+
binaryValue = FromLittleEndian32(binaryValue);
415+
float4 floatValue = *((float4 *) binaryValue);
390416

391-
datum = Float8GetDatum(doubleValue);
417+
datum = Float8GetDatum((float8) floatValue);
418+
}
419+
else
420+
{
421+
binaryValue = FromLittleEndian64(binaryValue);
422+
float8 doubleValue = *((float8 *) binaryValue);
423+
424+
datum = Float8GetDatum(doubleValue);
425+
}
392426
}
393427
else if (pgType.postgresTypeOid == DATEOID)
394428
{

pg_lake_table/include/pg_lake/ddl/ddl_changes.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "nodes/pg_list.h"
2222
#include "nodes/parsenodes.h"
2323

24+
#include "pg_lake/pgduck/type.h"
25+
2426
/*
2527
* DDLOperationType describes a type of ddl operation, which
2628
* cause catalog modification and possibly iceberg metadata modification.
@@ -37,6 +39,7 @@ typedef enum DDLOperationType
3739
DDL_COLUMN_DROP_NOT_NULL = 8,
3840
DDL_TABLE_SET_PARTITION_BY = 9,
3941
DDL_TABLE_DROP_PARTITION_BY = 10,
42+
DDL_COLUMN_ALTER_TYPE = 11,
4043
} DDLOperationType;
4144

4245
/*
@@ -55,6 +58,9 @@ typedef struct IcebergDDLOperation
5558
/* applicable for set/drop default */
5659
const char *writeDefault;
5760

61+
/* applicable for alter column type */
62+
PGType newPgType;
63+
5864
/* applicable for create iceberg table */
5965
bool hasCustomLocation;
6066

pg_lake_table/include/pg_lake/fdw/schema_operations/field_id_mapping_catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ extern PGDLLEXPORT List *GetRegisteredFieldForAttributes(Oid relationId, List *a
3333
extern PGDLLEXPORT DataFileSchemaField * GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo);
3434
extern PGDLLEXPORT AttrNumber GetAttributeForFieldId(Oid relationId, int fieldId);
3535
extern PGDLLEXPORT void UpdateRegisteredFieldWriteDefaultForAttribute(Oid relationId, AttrNumber attNum, const char *writeDefault);
36+
extern PGDLLEXPORT void UpdateRegisteredFieldTypeForAttribute(Oid relationId, AttrNumber attNum, PGType newPgType);
3637
extern PGDLLEXPORT int GetLargestRegisteredFieldId(Oid relationId);
3738
extern PGDLLEXPORT void RegisterIcebergColumnMapping(Oid relationId, Field * field,
3839
AttrNumber attNo, int parentFieldId, PGType pgType,

0 commit comments

Comments
 (0)