Skip to content

Commit ab93e6b

Browse files
committed
Use partition_keys MAP instead of path parsing for partitioned pushdown writes
Replace ParsePartitionValuesFromPath with ParsePartitionValuesFromPartitionKeys which reads partition values directly from DuckDB's partition_keys MAP(VARCHAR, VARCHAR) column in COPY TO return_stats. This eliminates URL-decoding of Hive-style paths and correctly handles values containing special characters like forward slashes. Additional fixes from PR review: - Add UTC timezone conversion for timestamptz in year/month/day/hour transforms (DuckDB's temporal functions use session timezone, Iceberg spec requires UTC) - Gate hour transform pushdown to TIMESTAMP/TIMESTAMPTZ only (TIME/TIMETZ fall back to row-by-row) - Whitelist identity partition types to those with compatible DuckDB-to-PG text representations (excludes bytea whose BLOB-to-VARCHAR format PG cannot parse) - Pre-create MAP(TEXT,TEXT) type in extension SQL to avoid runtime creation - Export GetOrCreatePGMapType with PGDLLEXPORT for cross-library visibility Signed-off-by: Marco Slot <marco.slot@snowflake.com>
1 parent 986731f commit ab93e6b

8 files changed

Lines changed: 518 additions & 107 deletions

File tree

pg_lake_engine/include/pg_lake/data_file/data_file_stats.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ typedef struct DataFileStats
9292

9393
/* for a new data file with row IDs, the start of the range */
9494
int64 rowIdStart;
95+
96+
/* partition key values from COPY TO return_stats (NULL if not partitioned) */
97+
char *partitionKeysText;
9598
} DataFileStats;
9699

97100
typedef struct StatsCollector

pg_lake_engine/include/pg_lake/pgduck/map.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include "pg_lake/pgduck/type.h"
2121

22-
Oid GetOrCreatePGMapType(const char *name);
22+
extern PGDLLEXPORT Oid GetOrCreatePGMapType(const char *name);
2323
char *GetDuckDBMapDefinitionForPGType(Oid postgresTypeId,
2424
CopyDataFormat format);
2525

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
-- Upgrade script for pg_lake_engine from 3.3 to 3.4
2+
3+
-- Pre-create MAP(TEXT,TEXT) type for partition_keys parsing in partitioned writes.
4+
-- This avoids runtime type creation during DML operations.
5+
SELECT map_type.create('text'::regtype, 'text'::regtype);

pg_lake_engine/src/data_file/data_file_stats.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ GetDataFileStatsListFromPGResult(PGresult *result, List *leafFields, DataFileSch
192192
{
193193
fileStats->dataFilePath = pstrdup(resultValue);
194194
}
195+
else if (strcmp(resultColName, "partition_keys") == 0)
196+
{
197+
if (!PQgetisnull(result, resultRowIndex, resultColIndex))
198+
fileStats->partitionKeysText = pstrdup(resultValue);
199+
}
195200
}
196201

197202
statsList = lappend(statsList, fileStats);

pg_lake_table/include/pg_lake/fdw/partition_pushdown.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@
2424

2525
extern PGDLLEXPORT bool AllPartitionTransformsPushdownable(List *transforms);
2626
extern List *GetPartitionByExpressions(List *transforms);
27-
extern Partition * ParsePartitionValuesFromPath(char *filePath, List *transforms);
27+
extern Partition * ParsePartitionValuesFromPartitionKeys(char *partitionKeysText,
28+
List *transforms);

pg_lake_table/src/fdw/partition_pushdown.c

Lines changed: 149 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
*/
2525
#include "postgres.h"
2626

27+
#include "executor/executor.h"
28+
#include "utils/array.h"
2729
#include "utils/builtins.h"
2830
#include "utils/lsyscache.h"
2931

3032
#include "pg_lake/fdw/partition_pushdown.h"
3133
#include "pg_lake/fdw/partition_transform.h"
3234
#include "pg_lake/iceberg/api/partitioning.h"
3335
#include "pg_lake/iceberg/manifest_spec.h"
36+
#include "pg_lake/pgduck/map.h"
3437

3538

3639
static char *PartitionTransformToDuckDBExpression(IcebergPartitionTransform * transform);
@@ -85,34 +88,90 @@ PartitionTransformToDuckDBExpression(IcebergPartitionTransform * transform)
8588
case PARTITION_TRANSFORM_IDENTITY:
8689
{
8790
/*
88-
* Identity partitions use the column value directly for
89-
* non-temporal types. For date/timestamp types, we produce
90-
* epoch integers to avoid DuckDB text formatting issues (e.g.
91-
* BC dates formatted as "4713-01-01 (BC)").
91+
* Only push down identity partitions for types whose
92+
* DuckDB VARCHAR representation can be parsed by PG's
93+
* type input function. Types like bytea are excluded
94+
* because DuckDB's BLOB-to-VARCHAR cast uses a format
95+
* PG cannot parse (same issue as column_statistics,
96+
* which skips bytea via ShouldSkipStatistics).
9297
*
93-
* ParsePartitionValuesFromPath uses
94-
* DeserializePartitionValueFromEpochInteger to convert epoch
95-
* integers back to Iceberg binary.
98+
* For date/timestamp types, we produce epoch integers
99+
* to avoid DuckDB text formatting issues (e.g. BC
100+
* dates formatted as "4713-01-01 (BC)").
101+
*
102+
* ParsePartitionValuesFromPartitionKeys uses
103+
* DeserializePartitionValueFromEpochInteger to convert
104+
* epoch integers back to Iceberg binary.
96105
*/
97106
if (typeOid == DATEOID)
98107
return psprintf("datediff('day', date '1970-01-01', %s::date)", col);
99108
else if (typeOid == TIMESTAMPOID || typeOid == TIMESTAMPTZOID)
100109
return psprintf("epoch_us(%s)", col);
101-
else
110+
else if (typeOid == INT2OID || typeOid == INT4OID ||
111+
typeOid == INT8OID || typeOid == FLOAT4OID ||
112+
typeOid == FLOAT8OID || typeOid == NUMERICOID ||
113+
typeOid == BOOLOID || typeOid == TEXTOID ||
114+
typeOid == VARCHAROID || typeOid == BPCHAROID ||
115+
typeOid == UUIDOID || typeOid == TIMEOID ||
116+
typeOid == TIMETZOID)
102117
return psprintf("%s", col);
118+
else
119+
return NULL;
103120
}
104121

105122
case PARTITION_TRANSFORM_YEAR:
106-
return psprintf("(year(%s) - 1970)", col);
123+
{
124+
/*
125+
* Iceberg spec requires UTC for timestamptz. PG stores
126+
* timestamptz internally in UTC, so the non-pushdown path
127+
* works correctly. In DuckDB, year() uses session timezone,
128+
* so we must convert to UTC first.
129+
*/
130+
if (typeOid == TIMESTAMPTZOID)
131+
return psprintf("(year(timezone('UTC', %s)) - 1970)", col);
132+
else
133+
return psprintf("(year(%s) - 1970)", col);
134+
}
107135

108136
case PARTITION_TRANSFORM_MONTH:
109-
return psprintf("((year(%s) - 1970) * 12 + month(%s) - 1)", col, col);
137+
{
138+
if (typeOid == TIMESTAMPTZOID)
139+
return psprintf("((year(timezone('UTC', %s)) - 1970) * 12 + "
140+
"month(timezone('UTC', %s)) - 1)", col, col);
141+
else
142+
return psprintf("((year(%s) - 1970) * 12 + month(%s) - 1)",
143+
col, col);
144+
}
110145

111146
case PARTITION_TRANSFORM_DAY:
112-
return psprintf("datediff('day', date '1970-01-01', %s::date)", col);
147+
{
148+
/*
149+
* Iceberg spec requires UTC for day transforms. For
150+
* timestamptz, convert to UTC before computing the day.
151+
*/
152+
if (typeOid == TIMESTAMPTZOID)
153+
return psprintf("datediff('day', date '1970-01-01', "
154+
"timezone('UTC', %s)::date)", col);
155+
else
156+
return psprintf("datediff('day', date '1970-01-01', %s::date)", col);
157+
}
113158

114159
case PARTITION_TRANSFORM_HOUR:
115-
return psprintf("datediff('hour', timestamp '1970-01-01', %s::timestamp)", col);
160+
{
161+
/*
162+
* Only TIMESTAMP and TIMESTAMPTZ are pushdownable for hour
163+
* transforms. TIME/TIMETZ fall back to row-by-row processing.
164+
* Iceberg spec requires UTC for timestamptz.
165+
*/
166+
if (typeOid == TIMESTAMPTZOID)
167+
return psprintf("datediff('hour', timestamp '1970-01-01', "
168+
"timezone('UTC', %s)::timestamp)", col);
169+
else if (typeOid == TIMESTAMPOID)
170+
return psprintf("datediff('hour', timestamp '1970-01-01', "
171+
"%s::timestamp)", col);
172+
else
173+
return NULL;
174+
}
116175

117176
case PARTITION_TRANSFORM_BUCKET:
118177
case PARTITION_TRANSFORM_TRUNCATE:
@@ -150,59 +209,6 @@ GetPartitionByExpressions(List *transforms)
150209
}
151210

152211

153-
/*
154-
* HexDigitToInt converts a hex character ('0'-'9', 'A'-'F', 'a'-'f') to its
155-
* integer value (0-15). Returns -1 for invalid characters.
156-
*/
157-
static int
158-
HexDigitToInt(char c)
159-
{
160-
if (c >= '0' && c <= '9')
161-
return c - '0';
162-
if (c >= 'A' && c <= 'F')
163-
return c - 'A' + 10;
164-
if (c >= 'a' && c <= 'f')
165-
return c - 'a' + 10;
166-
return -1;
167-
}
168-
169-
170-
/*
171-
* UrlDecodePartitionValue decodes percent-encoded characters in a Hive-style
172-
* partition value (e.g. "1e%2B20" -> "1e+20").
173-
*
174-
* DuckDB percent-encodes special characters when writing partition directory
175-
* names. We must decode them before parsing the value.
176-
*/
177-
static char *
178-
UrlDecodePartitionValue(const char *encoded)
179-
{
180-
int len = strlen(encoded);
181-
char *decoded = palloc(len + 1);
182-
int j = 0;
183-
184-
for (int i = 0; i < len; i++)
185-
{
186-
if (encoded[i] == '%' && i + 2 < len)
187-
{
188-
int hi = HexDigitToInt(encoded[i + 1]);
189-
int lo = HexDigitToInt(encoded[i + 2]);
190-
191-
if (hi >= 0 && lo >= 0)
192-
{
193-
decoded[j++] = (char) (hi * 16 + lo);
194-
i += 2;
195-
continue;
196-
}
197-
}
198-
decoded[j++] = encoded[i];
199-
}
200-
201-
decoded[j] = '\0';
202-
return decoded;
203-
}
204-
205-
206212
/*
207213
* NormalizeDuckDBTextToPGText converts a DuckDB text representation of a value
208214
* to PostgreSQL's canonical text format by roundtripping through PG's type I/O.
@@ -230,62 +236,100 @@ NormalizeDuckDBTextToPGText(const char *duckdbText, Oid resultTypeOid,
230236

231237

232238
/*
233-
* ParsePartitionValuesFromPath extracts partition values from the Hive-style
234-
* directory path produced by DuckDB COPY TO with PARTITION_BY.
239+
* ParsePartitionValuesFromPartitionKeys extracts partition values from the
240+
* partition_keys MAP(VARCHAR, VARCHAR) returned by DuckDB's COPY TO with
241+
* return_stats.
235242
*
236-
* A path like:
237-
* s3://bucket/data/abc123/__part_0=54/__part_1=us-east/data_0.parquet
243+
* The partition_keys map has entries like:
244+
* {__part_0=54, __part_1=us-east}
238245
*
239-
* is parsed to extract __part_0=54 and __part_1=us-east, which are then
240-
* converted to the proper Iceberg binary format using the partition transforms.
246+
* Each value is converted to the proper Iceberg binary format using the
247+
* partition transforms.
241248
*/
242249
Partition *
243-
ParsePartitionValuesFromPath(char *filePath, List *transforms)
250+
ParsePartitionValuesFromPartitionKeys(char *partitionKeysText, List *transforms)
244251
{
245252
int numTransforms = list_length(transforms);
246253
Partition *partition = palloc0(sizeof(Partition));
247254

248255
partition->fields = palloc0(sizeof(PartitionField) * numTransforms);
249256
partition->fields_length = numTransforms;
250257

251-
for (int partIndex = 0; partIndex < numTransforms; partIndex++)
258+
/* parse the MAP(TEXT,TEXT) text into a datum */
259+
Oid mapTypeOid = GetOrCreatePGMapType("MAP(TEXT,TEXT)");
260+
Oid typoinput;
261+
Oid typioparam;
262+
263+
getTypeInputInfo(mapTypeOid, &typoinput, &typioparam);
264+
Datum mapDatum = OidInputFunctionCall(typoinput, partitionKeysText,
265+
typioparam, -1);
266+
267+
/*
268+
* Build an array of value texts indexed by partition index. We iterate the
269+
* map entries and match __part_N keys to their indices.
270+
*/
271+
char **valueTexts = palloc0(sizeof(char *) * numTransforms);
272+
bool *valueIsNull = palloc0(sizeof(bool) * numTransforms);
273+
274+
ArrayType *elementsArray = DatumGetArrayTypeP(mapDatum);
275+
ArrayIterator arrayIterator = array_create_iterator(elementsArray, 0, NULL);
276+
Datum elemDatum;
277+
bool isNull = false;
278+
279+
while (array_iterate(arrayIterator, &elemDatum, &isNull))
252280
{
253-
IcebergPartitionTransform *transform = list_nth(transforms, partIndex);
281+
if (isNull)
282+
continue;
283+
284+
HeapTupleHeader tupleHeader = DatumGetHeapTupleHeader(elemDatum);
285+
bool keyIsNull = false;
286+
bool valIsNull = false;
287+
288+
Datum keyDatum = GetAttributeByNum(tupleHeader, 1, &keyIsNull);
289+
Datum valDatum = GetAttributeByNum(tupleHeader, 2, &valIsNull);
254290

255-
/* build the search key: "__part_N=" */
256-
char *searchKey = psprintf("__part_%d=", partIndex);
257-
int searchKeyLen = strlen(searchKey);
291+
if (keyIsNull)
292+
continue;
258293

259-
/* find this key in the path */
260-
char *found = strstr(filePath, searchKey);
294+
char *key = TextDatumGetCString(keyDatum);
261295

262-
if (found == NULL)
296+
/* parse __part_N to get the partition index */
297+
if (strncmp(key, "__part_", 7) != 0)
298+
continue;
299+
300+
int partIndex = pg_strtoint32(key + 7);
301+
302+
if (partIndex < 0 || partIndex >= numTransforms)
263303
{
264304
ereport(ERROR,
265305
(errcode(ERRCODE_INTERNAL_ERROR),
266-
errmsg("could not find partition key %s in path %s",
267-
searchKey, filePath)));
306+
errmsg("unexpected partition key %s (expected 0..%d)",
307+
key, numTransforms - 1)));
268308
}
269309

270-
/* extract the value (from after '=' up to the next '/' or end) */
271-
char *valueStart = found + searchKeyLen;
272-
char *valueEnd = strchr(valueStart, '/');
273-
int valueLen = (valueEnd != NULL) ?
274-
(valueEnd - valueStart) : strlen(valueStart);
275-
276-
char *valueText = pnstrdup(valueStart, valueLen);
310+
if (valIsNull)
311+
{
312+
valueIsNull[partIndex] = true;
313+
}
314+
else
315+
{
316+
valueTexts[partIndex] = TextDatumGetCString(valDatum);
317+
}
318+
}
277319

278-
/* URL-decode (DuckDB percent-encodes special chars in Hive paths) */
279-
valueText = UrlDecodePartitionValue(valueText);
320+
array_free_iterator(arrayIterator);
280321

281-
/* populate the partition field */
322+
/* convert each partition value to Iceberg binary format */
323+
for (int partIndex = 0; partIndex < numTransforms; partIndex++)
324+
{
325+
IcebergPartitionTransform *transform = list_nth(transforms, partIndex);
282326
PartitionField *field = &partition->fields[partIndex];
283327

284328
field->field_id = transform->partitionFieldId;
285329
field->field_name = pstrdup(transform->partitionFieldName);
286330
field->value_type = GetTransformResultAvroType(transform);
287331

288-
if (strcmp(valueText, "NULL") == 0)
332+
if (valueIsNull[partIndex] || valueTexts[partIndex] == NULL)
289333
{
290334
/* NULL partition value */
291335
field->value = NULL;
@@ -297,25 +341,28 @@ ParsePartitionValuesFromPath(char *filePath, List *transforms)
297341
transform->pgType.postgresTypeOid == TIMESTAMPTZOID))
298342
{
299343
/*
300-
* Identity temporal types use epoch integers in the path (days
301-
* for date, microseconds for timestamp).
344+
* Identity temporal types use epoch integers (days for date,
345+
* microseconds for timestamp).
302346
*/
303347
field->value = DeserializePartitionValueFromEpochInteger(
304-
transform, valueText, &field->value_length);
348+
transform, valueTexts[partIndex],
349+
&field->value_length);
305350
}
306351
else
307352
{
308353
/*
309-
* Normalize DuckDB text to PG canonical format (e.g. "1.0" -> "1"
310-
* for numeric) so the roundtrip assertion in
354+
* Normalize DuckDB text to PG canonical format (e.g. "1.0" ->
355+
* "1" for numeric) so the roundtrip assertion in
311356
* DeserializePartitionValueFromPGText passes.
312357
*/
313-
valueText = NormalizeDuckDBTextToPGText(valueText,
314-
transform->resultPgType.postgresTypeOid,
315-
transform->resultPgType.postgresTypeMod);
358+
char *normalizedText =
359+
NormalizeDuckDBTextToPGText(valueTexts[partIndex],
360+
transform->resultPgType.postgresTypeOid,
361+
transform->resultPgType.postgresTypeMod);
316362

317363
field->value = DeserializePartitionValueFromPGText(
318-
transform, valueText, &field->value_length);
364+
transform, normalizedText,
365+
&field->value_length);
319366
}
320367
}
321368

0 commit comments

Comments
 (0)