Skip to content

Commit 937c534

Browse files
deep copy spec field
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent a905847 commit 937c534

7 files changed

Lines changed: 55 additions & 33 deletions

File tree

pg_lake_iceberg/include/pg_lake/iceberg/api/partitioning.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ typedef struct IcebergPartitionTransform
6363
ParsedIcebergPartitionTransform parsedTransform;
6464

6565
/* spec field info */
66-
IcebergPartitionSpecField specField;
66+
IcebergPartitionSpecField *specField;
6767

6868
/* source field of the column to which transform applies */
69-
DataFileSchemaField sourceField;
69+
DataFileSchemaField *sourceField;
7070

7171
/* Postgres column info to which transform applies */
7272
AttrNumber attnum;

pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,4 @@ extern PGDLLEXPORT IcebergTableMetadata * ReadIcebergTableMetadata(const char *t
292292
extern PGDLLEXPORT char *WriteIcebergTableMetadataToJson(IcebergTableMetadata * metadata);
293293
extern PGDLLEXPORT void AppendIcebergTableSchemaForRestCatalog(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length);
294294
extern PGDLLEXPORT void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length);
295+
extern PGDLLEXPORT IcebergPartitionSpecField * DeepCopyIcebergPartitionSpecField(const IcebergPartitionSpecField * field);

pg_lake_iceberg/src/iceberg/partitioning/partition.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ FindPartitionTransformById(List *transforms, int32_t partitionFieldId, bool erro
225225
{
226226
IcebergPartitionTransform *transform = (IcebergPartitionTransform *) lfirst(cell);
227227

228-
if (transform->specField.field_id == partitionFieldId)
228+
if (transform->specField->field_id == partitionFieldId)
229229
return transform;
230230
}
231231

pg_lake_iceberg/src/iceberg/write_table_metadata.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,31 @@ AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField *
764764
appendStringInfoString(command, "]");
765765
}
766766

767+
/*
768+
* DeepCopyIcebergPartitionSpecField deep copies a IcebergPartitionSpecField.
769+
*/
770+
IcebergPartitionSpecField *
771+
DeepCopyIcebergPartitionSpecField(const IcebergPartitionSpecField * field)
772+
{
773+
IcebergPartitionSpecField *copiedField = palloc0(sizeof(IcebergPartitionSpecField));
774+
775+
copiedField->field_id = field->field_id;
776+
copiedField->name = pstrdup(field->name);
777+
copiedField->name_length = field->name_length;
778+
copiedField->transform = pstrdup(field->transform);
779+
copiedField->transform_length = field->transform_length;
780+
781+
copiedField->source_id = field->source_id;
782+
copiedField->source_ids_length = field->source_ids_length;
783+
if (field->source_ids_length > 0)
784+
{
785+
copiedField->source_ids = palloc0(field->source_ids_length * sizeof(int32));
786+
memcpy(copiedField->source_ids, field->source_ids, field->source_ids_length * sizeof(int32));
787+
}
788+
789+
return copiedField;
790+
}
791+
767792
static void
768793
AppendIcebergSortOrderFields(StringInfo command, IcebergSortOrderField * fields, size_t fields_length)
769794
{

pg_lake_table/src/fdw/data_file_pruning.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ GetColumnBoundConstraintsFromPartition(Oid relationId, ColumnToFieldIdMapping *
676676
continue;
677677

678678
/* skip if transform's sourceId does not match the entry's fieldId */
679-
if (partitionTransform->sourceField.id != entry->fieldId)
679+
if (partitionTransform->sourceField->id != entry->fieldId)
680680
continue;
681681

682682
Expr *boundsConstraint =

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms)
152152
* ErrorIfColumnEverUsedInIcebergPartitionSpec(). Still, let's be
153153
* defensive and also check source field ids.
154154
*/
155-
if (specField->source_id != transform->sourceField.id)
155+
if (specField->source_id != transform->sourceField->id)
156156
return false;
157157

158158
/*
@@ -162,7 +162,7 @@ PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms)
162162
* Iceberg does here:
163163
* https://github.com/apache/iceberg/blob/8b55ac834015ce664f879ecfe1e80a941a994420/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L239-L259
164164
*/
165-
if (strcasecmp(specField->name, transform->specField.name) != 0)
165+
if (strcasecmp(specField->name, transform->specField->name) != 0)
166166
{
167167
return false;
168168
}
@@ -251,7 +251,7 @@ GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * s
251251
{
252252
IcebergPartitionTransform *transform = palloc0(sizeof(IcebergPartitionTransform));
253253

254-
transform->specField = *specField;
254+
transform->specField = DeepCopyIcebergPartitionSpecField(specField);
255255

256256
transform->attnum =
257257
GetAttributeForFieldId(relationId, specField->source_id);
@@ -260,23 +260,19 @@ GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * s
260260

261261
if (IsInternalIcebergTable(relationId))
262262
{
263-
DataFileSchemaField *sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
264-
265-
transform->sourceField = *sourceField;
263+
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
266264
}
267265
else
268266
{
269267
Assert(IsExternalIcebergTable(relationId));
270268

271269
DataFileSchema *schema = GetDataFileSchemaForTable(relationId);
272270

273-
DataFileSchemaField *sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);
274-
275-
transform->sourceField = *sourceField;
271+
transform->sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);
276272
}
277273

278274
/* parse transform name */
279-
ParseTransformName(transform->specField.transform,
275+
ParseTransformName(transform->specField->transform,
280276
&transform->parsedTransform.type,
281277
&transform->parsedTransform.bucketCount,
282278
&transform->parsedTransform.truncateLen);
@@ -415,8 +411,8 @@ ApplyPartitionTransformToTuple(IcebergPartitionTransform * transform, TupleTable
415411
{
416412
PartitionField *field = palloc0(sizeof(PartitionField));
417413

418-
field->field_name = pstrdup(transform->specField.name);
419-
field->field_id = transform->specField.field_id;
414+
field->field_name = pstrdup(transform->specField->name);
415+
field->field_id = transform->specField->field_id;
420416

421417
bool isNull = false;
422418
Datum columnValue = slot_getattr(slot, transform->attnum, &isNull);
@@ -455,7 +451,7 @@ ApplyPartitionTransformToTuple(IcebergPartitionTransform * transform, TupleTable
455451
ereport(ERROR,
456452
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
457453
errmsg("applying transform %s is not yet support ",
458-
transform->specField.transform)));
454+
transform->specField->transform)));
459455
}
460456

461457
field->value_type = GetTransformResultAvroType(transform);
@@ -479,7 +475,7 @@ ApplyIdentityTransformToColumn(IcebergPartitionTransform * transform, Datum colu
479475
return NULL;
480476
}
481477

482-
return PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField.type,
478+
return PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField->type,
483479
transform->pgType, valueSize);
484480
}
485481

@@ -840,15 +836,15 @@ ApplyBucketTransformToColumn(IcebergPartitionTransform * transform, Datum column
840836
else if (transform->pgType.postgresTypeOid == UUIDOID)
841837
{
842838
size_t valueSize = 0;
843-
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField.type,
839+
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField->type,
844840
transform->pgType, &valueSize);
845841

846842
*bucketValue = (MurmurHash3_32_Bytes(value, valueSize) & INT32_MAX) % transform->parsedTransform.bucketCount;
847843
}
848844
else if (transform->pgType.postgresTypeOid == NUMERICOID)
849845
{
850846
size_t valueSize = 0;
851-
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField.type,
847+
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField->type,
852848
transform->pgType, &valueSize);
853849

854850
*bucketValue = (MurmurHash3_32_Bytes(value, valueSize) & INT32_MAX) % transform->parsedTransform.bucketCount;

pg_lake_table/src/fdw/partitioning/partition_by_parser.c

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -532,27 +532,27 @@ AnalyzeIcebergTablePartitionBy(Oid relationId, List *parsedTransforms)
532532
/* 2) Check scalar column */
533533
EnsureTransformSourceColumnScalar(parsedTransform, sourceField);
534534

535-
analyzedTransform->sourceField = *sourceField;
535+
analyzedTransform->sourceField = sourceField;
536536

537537
/* set transform name */
538-
analyzedTransform->specField.transform = GenerateTransformName(analyzedTransform);
539-
analyzedTransform->specField.transform_length = strlen(analyzedTransform->specField.transform);
538+
analyzedTransform->specField->transform = GenerateTransformName(analyzedTransform);
539+
analyzedTransform->specField->transform_length = strlen(analyzedTransform->specField->transform);
540540

541541
/* set partition field name */
542-
analyzedTransform->specField.name = GeneratePartitionFieldName(analyzedTransform, relationId);
543-
analyzedTransform->specField.name_length = strlen(analyzedTransform->specField.name);
542+
analyzedTransform->specField->name = GeneratePartitionFieldName(analyzedTransform, relationId);
543+
analyzedTransform->specField->name_length = strlen(analyzedTransform->specField->name);
544544

545545
/* set partition field id */
546-
analyzedTransform->specField.field_id = ++largestPartitionFieldId;
546+
analyzedTransform->specField->field_id = ++largestPartitionFieldId;
547547
/* set source field id */
548-
analyzedTransform->specField.source_id = sourceField->id;
549-
analyzedTransform->specField.source_ids_length = 1;
550-
analyzedTransform->specField.source_ids = palloc0(sizeof(int) * analyzedTransform->specField.source_ids_length);
551-
analyzedTransform->specField.source_ids[0] = analyzedTransform->specField.source_id;
548+
analyzedTransform->specField->source_id = sourceField->id;
549+
analyzedTransform->specField->source_ids_length = 1;
550+
analyzedTransform->specField->source_ids = palloc0(sizeof(int) * analyzedTransform->specField->source_ids_length);
551+
analyzedTransform->specField->source_ids[0] = analyzedTransform->specField->source_id;
552552

553553
/* 3) Check column type compatibility. */
554554
EnsureValidTypeForTransform(analyzedTransform->parsedTransform.type, analyzedTransform->pgType.postgresTypeOid);
555-
555+
556556
analyzedTransforms = lappend(analyzedTransforms, analyzedTransform);
557557
}
558558

@@ -832,11 +832,11 @@ EnsureNoDuplicateTransforms(List *transforms)
832832
IcebergPartitionTransform *other = list_nth(transforms, j);
833833

834834
if (transform->parsedTransform.type == other->parsedTransform.type &&
835-
transform->sourceField.id == other->sourceField.id)
835+
transform->sourceField->id == other->sourceField->id)
836836
ereport(ERROR,
837837
(errcode(ERRCODE_DUPLICATE_OBJECT),
838838
errmsg("\"%s\" transform on column \"%s\" appears multiple times in partition spec",
839-
transform->specField.transform, transform->parsedTransform.columnName)));
839+
transform->specField->transform, transform->parsedTransform.columnName)));
840840
}
841841
}
842842
}

0 commit comments

Comments
 (0)