Skip to content

Commit a905847

Browse files
separate structs for parse and analyze
Signed-off-by: Aykut Bozkurt <aykut.bozkurt@snowflake.com>
1 parent 48d8555 commit a905847

8 files changed

Lines changed: 147 additions & 129 deletions

File tree

pg_lake_engine/src/data_file/data_file_stats.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ ExtractMinMaxForColumn(Datum map, char *colName, List **names, List **mins, List
256256

257257
if (minText != NULL && maxText != NULL)
258258
{
259-
*names = lappend(*names, pstrdup(colName));
259+
*names = lappend(*names, colName);
260260
*mins = lappend(*mins, minText);
261261
*maxs = lappend(*maxs, maxText);
262262
}

pg_lake_iceberg/include/pg_lake/iceberg/api/partitioning.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ typedef enum IcebergPartitionTransformType
3939
PARTITION_TRANSFORM_VOID
4040
} IcebergPartitionTransformType;
4141

42-
typedef struct IcebergPartitionTransform
42+
/* Represents a parsed partition transform from table's partition_by string option. */
43+
typedef struct ParsedIcebergPartitionTransform
4344
{
4445
IcebergPartitionTransformType type;
4546

@@ -52,13 +53,22 @@ typedef struct IcebergPartitionTransform
5253
size_t truncateLen;
5354
};
5455

55-
IcebergPartitionSpecField *specField;
56+
const char *columnName;
57+
} ParsedIcebergPartitionTransform;
58+
59+
/* Represents an analyzed partition transform with all necessary info. */
60+
typedef struct IcebergPartitionTransform
61+
{
62+
/* parsed transform info */
63+
ParsedIcebergPartitionTransform parsedTransform;
64+
65+
/* spec field info */
66+
IcebergPartitionSpecField specField;
5667

5768
/* source field of the column to which transform applies */
58-
DataFileSchemaField *sourceField;
69+
DataFileSchemaField sourceField;
5970

6071
/* Postgres column info to which transform applies */
61-
const char *columnName;
6272
AttrNumber attnum;
6373
PGType pgType;
6474

pg_lake_iceberg/src/iceberg/partitioning/partition.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ FindPartitionTransformById(List *transforms, int32_t partitionFieldId, bool erro
225225
{
226226
IcebergPartitionTransform *transform = (IcebergPartitionTransform *) lfirst(cell);
227227

228-
if (transform->specField->field_id == partitionFieldId)
228+
if (transform->specField.field_id == partitionFieldId)
229229
return transform;
230230
}
231231

pg_lake_iceberg/src/iceberg/partitioning/spec_generation.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ BuildPartitionSpecFromPartitionTransforms(Oid relationId, List *partitionTransfo
6060
{
6161
IcebergPartitionTransform *transform = lfirst(transformCell);
6262

63-
spec->fields[fieldIndex] = *(transform->specField);
63+
spec->fields[fieldIndex] = transform->specField;
6464
fieldIndex++;
6565
}
6666

pg_lake_table/src/fdw/data_file_pruning.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ GetColumnBoundConstraintsFromPartition(Oid relationId, ColumnToFieldIdMapping *
676676
continue;
677677

678678
/* skip if transform's sourceId does not match the entry's fieldId */
679-
if (partitionTransform->sourceField->id != entry->fieldId)
679+
if (partitionTransform->sourceField.id != entry->fieldId)
680680
continue;
681681

682682
Expr *boundsConstraint =
@@ -699,7 +699,7 @@ static Expr *
699699
PartitionFieldBoundConstraint(PartitionField * partitionField, IcebergPartitionTransform * partitionTransform,
700700
ColumnToFieldIdMapping * entry)
701701
{
702-
IcebergPartitionTransformType type = partitionTransform->type;
702+
IcebergPartitionTransformType type = partitionTransform->parsedTransform.type;
703703

704704
if (type != PARTITION_TRANSFORM_IDENTITY &&
705705
partitionField->value == NULL)
@@ -751,7 +751,7 @@ IdentityPartitionFieldBoundConstraint(PartitionField * partitionField,
751751
{
752752
bool isNull = false;
753753
Datum partitionDatum =
754-
PartitionValueToDatum(partitionTransform->type, partitionField->value, partitionField->value_length,
754+
PartitionValueToDatum(partitionTransform->parsedTransform.type, partitionField->value, partitionField->value_length,
755755
partitionTransform->resultPgType, &isNull);
756756

757757
OpExpr *columnBoundEquality = copyObject(entry->equalityOperatorExpression);
@@ -780,7 +780,7 @@ TruncatePartitionFieldBoundConstraint(PartitionField * partitionField,
780780
if (pgType.postgresTypeOid == INT4OID || pgType.postgresTypeOid == INT2OID)
781781
{
782782
int32 partitionValue = *(int32_t *) partitionField->value;
783-
int truncateLen = partitionTransform->truncateLen;
783+
int truncateLen = partitionTransform->parsedTransform.truncateLen;
784784

785785
int32 upperBound;
786786

@@ -798,7 +798,7 @@ TruncatePartitionFieldBoundConstraint(PartitionField * partitionField,
798798
else if (pgType.postgresTypeOid == INT8OID)
799799
{
800800
int64 partitionValue = *(int64_t *) partitionField->value;
801-
int truncateLen = partitionTransform->truncateLen;
801+
int truncateLen = partitionTransform->parsedTransform.truncateLen;
802802

803803
int64 upperBound;
804804

@@ -825,7 +825,7 @@ TruncatePartitionFieldBoundConstraint(PartitionField * partitionField,
825825
return NULL;
826826
}
827827

828-
int truncateLen = partitionTransform->truncateLen;
828+
int truncateLen = partitionTransform->parsedTransform.truncateLen;
829829
char *truncatedUpperBound = TruncateUpperBoundForText(pstrdup(partitionValue), truncateLen);
830830

831831
if (truncatedUpperBound == NULL)
@@ -848,7 +848,7 @@ TruncatePartitionFieldBoundConstraint(PartitionField * partitionField,
848848
memcpy(VARDATA_ANY(partitionValue), partitionField->value, partitionField->value_length);
849849

850850
bytea *partitionValueCopy = (bytea *) pg_detoast_datum_copy((struct varlena *) partitionValue);
851-
int truncateLen = partitionTransform->truncateLen;
851+
int truncateLen = partitionTransform->parsedTransform.truncateLen;
852852

853853
/* increment the last byte of the upper bound, which does not overflow */
854854
partitionValueCopy = TruncateUpperBoundForBytea(partitionValueCopy, truncateLen);
@@ -1813,7 +1813,7 @@ ExtendClausesForBucketPartitioning(Partition * partition, List *partitionTransfo
18131813
if (partitionTransform == NULL)
18141814
continue;
18151815

1816-
if (partitionTransform->type != PARTITION_TRANSFORM_BUCKET)
1816+
if (partitionTransform->parsedTransform.type != PARTITION_TRANSFORM_BUCKET)
18171817
{
18181818
/* only extend restrict info for bucket transform */
18191819
continue;

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms)
152152
* ErrorIfColumnEverUsedInIcebergPartitionSpec(). Still, let's be
153153
* defensive and also check source field ids.
154154
*/
155-
if (specField->source_id != transform->sourceField->id)
155+
if (specField->source_id != transform->sourceField.id)
156156
return false;
157157

158158
/*
@@ -162,7 +162,7 @@ PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms)
162162
* Iceberg does here:
163163
* https://github.com/apache/iceberg/blob/8b55ac834015ce664f879ecfe1e80a941a994420/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L239-L259
164164
*/
165-
if (strcasecmp(specField->name, transform->specField->name) != 0)
165+
if (strcasecmp(specField->name, transform->specField.name) != 0)
166166
{
167167
return false;
168168
}
@@ -251,31 +251,35 @@ GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * s
251251
{
252252
IcebergPartitionTransform *transform = palloc0(sizeof(IcebergPartitionTransform));
253253

254-
transform->specField = specField;
254+
transform->specField = *specField;
255255

256256
transform->attnum =
257257
GetAttributeForFieldId(relationId, specField->source_id);
258-
transform->columnName = get_attname(relationId, transform->attnum, false);
258+
transform->parsedTransform.columnName = get_attname(relationId, transform->attnum, false);
259259
transform->pgType = GetAttributePGType(relationId, transform->attnum);
260260

261261
if (IsInternalIcebergTable(relationId))
262262
{
263-
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
263+
DataFileSchemaField *sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
264+
265+
transform->sourceField = *sourceField;
264266
}
265267
else
266268
{
267269
Assert(IsExternalIcebergTable(relationId));
268270

269271
DataFileSchema *schema = GetDataFileSchemaForTable(relationId);
270272

271-
transform->sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);
273+
DataFileSchemaField *sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);
274+
275+
transform->sourceField = *sourceField;
272276
}
273277

274278
/* parse transform name */
275-
ParseTransformName(transform->specField->transform,
276-
&transform->type,
277-
&transform->bucketCount,
278-
&transform->truncateLen);
279+
ParseTransformName(transform->specField.transform,
280+
&transform->parsedTransform.type,
281+
&transform->parsedTransform.bucketCount,
282+
&transform->parsedTransform.truncateLen);
279283

280284
/* set transform's postgres type */
281285
transform->resultPgType = GetTransformResultPGType(transform);
@@ -411,13 +415,13 @@ ApplyPartitionTransformToTuple(IcebergPartitionTransform * transform, TupleTable
411415
{
412416
PartitionField *field = palloc0(sizeof(PartitionField));
413417

414-
field->field_name = pstrdup(transform->specField->name);
415-
field->field_id = transform->specField->field_id;
418+
field->field_name = pstrdup(transform->specField.name);
419+
field->field_id = transform->specField.field_id;
416420

417421
bool isNull = false;
418422
Datum columnValue = slot_getattr(slot, transform->attnum, &isNull);
419423

420-
switch (transform->type)
424+
switch (transform->parsedTransform.type)
421425
{
422426
case PARTITION_TRANSFORM_IDENTITY:
423427
field->value = ApplyIdentityTransformToColumn(transform, columnValue, isNull,
@@ -451,7 +455,7 @@ ApplyPartitionTransformToTuple(IcebergPartitionTransform * transform, TupleTable
451455
ereport(ERROR,
452456
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
453457
errmsg("applying transform %s is not yet support ",
454-
transform->specField->transform)));
458+
transform->specField.transform)));
455459
}
456460

457461
field->value_type = GetTransformResultAvroType(transform);
@@ -475,7 +479,7 @@ ApplyIdentityTransformToColumn(IcebergPartitionTransform * transform, Datum colu
475479
return NULL;
476480
}
477481

478-
return PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField->type,
482+
return PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField.type,
479483
transform->pgType, valueSize);
480484
}
481485

@@ -496,7 +500,7 @@ ApplyTruncateTransformToColumn(IcebergPartitionTransform * transform, Datum colu
496500

497501
PGType sourceType = transform->pgType;
498502
PGType resultType = transform->resultPgType;
499-
int64_t truncateLen = (int64_t) transform->truncateLen;
503+
int64_t truncateLen = (int64_t) transform->parsedTransform.truncateLen;
500504
Datum truncatedColumnValue = 0;
501505

502506
if (sourceType.postgresTypeOid == INT2OID)
@@ -765,7 +769,7 @@ ApplyBucketTransformToColumn(IcebergPartitionTransform * transform, Datum column
765769
{
766770
int64_t value = (int64_t) DatumGetInt16(columnValue);
767771

768-
*bucketValue = (MurmurHash3_32_Long(value) & INT32_MAX) % transform->bucketCount;
772+
*bucketValue = (MurmurHash3_32_Long(value) & INT32_MAX) % transform->parsedTransform.bucketCount;
769773
}
770774
else if (transform->pgType.postgresTypeOid == INT4OID)
771775
{
@@ -775,27 +779,27 @@ ApplyBucketTransformToColumn(IcebergPartitionTransform * transform, Datum column
775779
*/
776780
int64_t value = (int64_t) DatumGetInt32(columnValue);
777781

778-
*bucketValue = (MurmurHash3_32_Long(value) & INT32_MAX) % transform->bucketCount;
782+
*bucketValue = (MurmurHash3_32_Long(value) & INT32_MAX) % transform->parsedTransform.bucketCount;
779783
}
780784
else if (transform->pgType.postgresTypeOid == INT8OID)
781785
{
782786
int64_t value = DatumGetInt64(columnValue);
783787

784-
*bucketValue = (MurmurHash3_32_Long(value) & INT32_MAX) % transform->bucketCount;
788+
*bucketValue = (MurmurHash3_32_Long(value) & INT32_MAX) % transform->parsedTransform.bucketCount;
785789
}
786790
else if (transform->pgType.postgresTypeOid == TEXTOID ||
787791
transform->pgType.postgresTypeOid == VARCHAROID ||
788792
transform->pgType.postgresTypeOid == BPCHAROID)
789793
{
790794
const char *value = TextDatumGetCString(columnValue);
791795

792-
*bucketValue = (MurmurHash3_32_Bytes(value, strlen(value)) & INT32_MAX) % transform->bucketCount;
796+
*bucketValue = (MurmurHash3_32_Bytes(value, strlen(value)) & INT32_MAX) % transform->parsedTransform.bucketCount;
793797
}
794798
else if (transform->pgType.postgresTypeOid == BYTEAOID)
795799
{
796800
bytea *value = DatumGetByteaP(columnValue);
797801

798-
*bucketValue = (MurmurHash3_32_Bytes(VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value)) & INT32_MAX) % transform->bucketCount;
802+
*bucketValue = (MurmurHash3_32_Bytes(VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value)) & INT32_MAX) % transform->parsedTransform.bucketCount;
799803
}
800804
else if (transform->pgType.postgresTypeOid == DATEOID)
801805
{
@@ -807,47 +811,47 @@ ApplyBucketTransformToColumn(IcebergPartitionTransform * transform, Datum column
807811
* spec normally hashes int bytes for date type but spark hashes long
808812
* bytes of date. We follow spark here.
809813
*/
810-
*bucketValue = (MurmurHash3_32_Long(daysFromEpoch) & INT32_MAX) % transform->bucketCount;
814+
*bucketValue = (MurmurHash3_32_Long(daysFromEpoch) & INT32_MAX) % transform->parsedTransform.bucketCount;
811815
}
812816
else if (transform->pgType.postgresTypeOid == TIMESTAMPOID)
813817
{
814818
Timestamp value = DatumGetTimestamp(columnValue);
815819

816820
int64_t microsecsFromEpoch = AdjustTimestampFromPostgresToUnix(value);
817821

818-
*bucketValue = (MurmurHash3_32_Long(microsecsFromEpoch) & INT32_MAX) % transform->bucketCount;
822+
*bucketValue = (MurmurHash3_32_Long(microsecsFromEpoch) & INT32_MAX) % transform->parsedTransform.bucketCount;
819823
}
820824
else if (transform->pgType.postgresTypeOid == TIMESTAMPTZOID)
821825
{
822826
TimestampTz value = DatumGetTimestampTz(columnValue);
823827

824828
int64_t microsecsFromEpoch = AdjustTimestampFromPostgresToUnix(value);
825829

826-
*bucketValue = (MurmurHash3_32_Long(microsecsFromEpoch) & INT32_MAX) % transform->bucketCount;
830+
*bucketValue = (MurmurHash3_32_Long(microsecsFromEpoch) & INT32_MAX) % transform->parsedTransform.bucketCount;
827831
}
828832
else if (transform->pgType.postgresTypeOid == TIMEOID)
829833
{
830834
TimeADT value = DatumGetTimeADT(columnValue);
831835

832836
int64_t microsecsFromMidnight = value;
833837

834-
*bucketValue = (MurmurHash3_32_Long(microsecsFromMidnight) & INT32_MAX) % transform->bucketCount;
838+
*bucketValue = (MurmurHash3_32_Long(microsecsFromMidnight) & INT32_MAX) % transform->parsedTransform.bucketCount;
835839
}
836840
else if (transform->pgType.postgresTypeOid == UUIDOID)
837841
{
838842
size_t valueSize = 0;
839-
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField->type,
843+
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField.type,
840844
transform->pgType, &valueSize);
841845

842-
*bucketValue = (MurmurHash3_32_Bytes(value, valueSize) & INT32_MAX) % transform->bucketCount;
846+
*bucketValue = (MurmurHash3_32_Bytes(value, valueSize) & INT32_MAX) % transform->parsedTransform.bucketCount;
843847
}
844848
else if (transform->pgType.postgresTypeOid == NUMERICOID)
845849
{
846850
size_t valueSize = 0;
847-
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField->type,
851+
unsigned char *value = PGIcebergBinarySerializePartitionFieldValue(columnValue, transform->sourceField.type,
848852
transform->pgType, &valueSize);
849853

850-
*bucketValue = (MurmurHash3_32_Bytes(value, valueSize) & INT32_MAX) % transform->bucketCount;
854+
*bucketValue = (MurmurHash3_32_Bytes(value, valueSize) & INT32_MAX) % transform->parsedTransform.bucketCount;
851855
}
852856
else
853857
{
@@ -975,7 +979,7 @@ SerializePartitionValueToPGText(void *value, size_t valueLength, IcebergPartitio
975979
/* First, deserialize back */
976980
bool isNull = false;
977981
Datum partitionDatum =
978-
PartitionValueToDatum(transform->type, value, valueLength,
982+
PartitionValueToDatum(transform->parsedTransform.type, value, valueLength,
979983
transform->resultPgType, &isNull);
980984

981985
if (isNull)

0 commit comments

Comments
 (0)