2424 */
2525#include "postgres.h"
2626
27+ #include "executor/executor.h"
28+ #include "utils/array.h"
2729#include "utils/builtins.h"
2830#include "utils/lsyscache.h"
2931
3032#include "pg_lake/fdw/partition_pushdown.h"
3133#include "pg_lake/fdw/partition_transform.h"
3234#include "pg_lake/iceberg/api/partitioning.h"
3335#include "pg_lake/iceberg/manifest_spec.h"
36+ #include "pg_lake/pgduck/map.h"
3437
3538
3639static char * PartitionTransformToDuckDBExpression (IcebergPartitionTransform * transform );
@@ -85,34 +88,90 @@ PartitionTransformToDuckDBExpression(IcebergPartitionTransform * transform)
8588 case PARTITION_TRANSFORM_IDENTITY :
8689 {
8790 /*
88- * Identity partitions use the column value directly for
89- * non-temporal types. For date/timestamp types, we produce
90- * epoch integers to avoid DuckDB text formatting issues (e.g.
91- * BC dates formatted as "4713-01-01 (BC)").
91+ * Only push down identity partitions for types whose DuckDB
92+ * VARCHAR representation can be parsed by PG's type input
93+ * function. Types like bytea are excluded because DuckDB's
94+ * BLOB-to-VARCHAR cast uses a format PG cannot parse (same
95+ * issue as column_statistics, which skips bytea via
96+ * ShouldSkipStatistics).
9297 *
93- * ParsePartitionValuesFromPath uses
98+ * For date/timestamp types, we produce epoch integers to
99+ * avoid DuckDB text formatting issues (e.g. BC dates
100+ * formatted as "4713-01-01 (BC)").
101+ *
102+ * ParsePartitionValuesFromPartitionKeys uses
94103 * DeserializePartitionValueFromEpochInteger to convert epoch
95104 * integers back to Iceberg binary.
96105 */
97106 if (typeOid == DATEOID )
98107 return psprintf ("datediff('day', date '1970-01-01', %s::date)" , col );
99108 else if (typeOid == TIMESTAMPOID || typeOid == TIMESTAMPTZOID )
100109 return psprintf ("epoch_us(%s)" , col );
101- else
110+ else if (typeOid == INT2OID || typeOid == INT4OID ||
111+ typeOid == INT8OID || typeOid == FLOAT4OID ||
112+ typeOid == FLOAT8OID || typeOid == NUMERICOID ||
113+ typeOid == BOOLOID || typeOid == TEXTOID ||
114+ typeOid == VARCHAROID || typeOid == BPCHAROID ||
115+ typeOid == UUIDOID || typeOid == TIMEOID ||
116+ typeOid == TIMETZOID )
102117 return psprintf ("%s" , col );
118+ else
119+ return NULL ;
103120 }
104121
105122 case PARTITION_TRANSFORM_YEAR :
106- return psprintf ("(year(%s) - 1970)" , col );
123+ {
124+ /*
125+ * Iceberg spec requires UTC for timestamptz. PG stores
126+ * timestamptz internally in UTC, so the non-pushdown path
127+ * works correctly. In DuckDB, year() uses session timezone,
128+ * so we must convert to UTC first.
129+ */
130+ if (typeOid == TIMESTAMPTZOID )
131+ return psprintf ("(year(timezone('UTC', %s)) - 1970)" , col );
132+ else
133+ return psprintf ("(year(%s) - 1970)" , col );
134+ }
107135
108136 case PARTITION_TRANSFORM_MONTH :
109- return psprintf ("((year(%s) - 1970) * 12 + month(%s) - 1)" , col , col );
137+ {
138+ if (typeOid == TIMESTAMPTZOID )
139+ return psprintf ("((year(timezone('UTC', %s)) - 1970) * 12 + "
140+ "month(timezone('UTC', %s)) - 1)" , col , col );
141+ else
142+ return psprintf ("((year(%s) - 1970) * 12 + month(%s) - 1)" ,
143+ col , col );
144+ }
110145
111146 case PARTITION_TRANSFORM_DAY :
112- return psprintf ("datediff('day', date '1970-01-01', %s::date)" , col );
147+ {
148+ /*
149+ * Iceberg spec requires UTC for day transforms. For
150+ * timestamptz, convert to UTC before computing the day.
151+ */
152+ if (typeOid == TIMESTAMPTZOID )
153+ return psprintf ("datediff('day', date '1970-01-01', "
154+ "timezone('UTC', %s)::date)" , col );
155+ else
156+ return psprintf ("datediff('day', date '1970-01-01', %s::date)" , col );
157+ }
113158
114159 case PARTITION_TRANSFORM_HOUR :
115- return psprintf ("datediff('hour', timestamp '1970-01-01', %s::timestamp)" , col );
160+ {
161+ /*
162+ * Only TIMESTAMP and TIMESTAMPTZ are pushdownable for hour
163+ * transforms. TIME/TIMETZ fall back to row-by-row processing.
164+ * Iceberg spec requires UTC for timestamptz.
165+ */
166+ if (typeOid == TIMESTAMPTZOID )
167+ return psprintf ("datediff('hour', timestamp '1970-01-01', "
168+ "timezone('UTC', %s)::timestamp)" , col );
169+ else if (typeOid == TIMESTAMPOID )
170+ return psprintf ("datediff('hour', timestamp '1970-01-01', "
171+ "%s::timestamp)" , col );
172+ else
173+ return NULL ;
174+ }
116175
117176 case PARTITION_TRANSFORM_BUCKET :
118177 case PARTITION_TRANSFORM_TRUNCATE :
@@ -150,59 +209,6 @@ GetPartitionByExpressions(List *transforms)
150209}
151210
152211
153- /*
154- * HexDigitToInt converts a hex character ('0'-'9', 'A'-'F', 'a'-'f') to its
155- * integer value (0-15). Returns -1 for invalid characters.
156- */
157- static int
158- HexDigitToInt (char c )
159- {
160- if (c >= '0' && c <= '9' )
161- return c - '0' ;
162- if (c >= 'A' && c <= 'F' )
163- return c - 'A' + 10 ;
164- if (c >= 'a' && c <= 'f' )
165- return c - 'a' + 10 ;
166- return -1 ;
167- }
168-
169-
170- /*
171- * UrlDecodePartitionValue decodes percent-encoded characters in a Hive-style
172- * partition value (e.g. "1e%2B20" -> "1e+20").
173- *
174- * DuckDB percent-encodes special characters when writing partition directory
175- * names. We must decode them before parsing the value.
176- */
177- static char *
178- UrlDecodePartitionValue (const char * encoded )
179- {
180- int len = strlen (encoded );
181- char * decoded = palloc (len + 1 );
182- int j = 0 ;
183-
184- for (int i = 0 ; i < len ; i ++ )
185- {
186- if (encoded [i ] == '%' && i + 2 < len )
187- {
188- int hi = HexDigitToInt (encoded [i + 1 ]);
189- int lo = HexDigitToInt (encoded [i + 2 ]);
190-
191- if (hi >= 0 && lo >= 0 )
192- {
193- decoded [j ++ ] = (char ) (hi * 16 + lo );
194- i += 2 ;
195- continue ;
196- }
197- }
198- decoded [j ++ ] = encoded [i ];
199- }
200-
201- decoded [j ] = '\0' ;
202- return decoded ;
203- }
204-
205-
206212/*
207213 * NormalizeDuckDBTextToPGText converts a DuckDB text representation of a value
208214 * to PostgreSQL's canonical text format by roundtripping through PG's type I/O.
@@ -230,62 +236,100 @@ NormalizeDuckDBTextToPGText(const char *duckdbText, Oid resultTypeOid,
230236
231237
232238/*
233- * ParsePartitionValuesFromPath extracts partition values from the Hive-style
234- * directory path produced by DuckDB COPY TO with PARTITION_BY.
239+ * ParsePartitionValuesFromPartitionKeys extracts partition values from the
240+ * partition_keys MAP(VARCHAR, VARCHAR) returned by DuckDB's COPY TO with
241+ * return_stats.
235242 *
236- * A path like:
237- * s3://bucket/data/abc123/ __part_0=54/ __part_1=us-east/data_0.parquet
243+ * The partition_keys map has entries like:
244+ * { __part_0=54, __part_1=us-east}
238245 *
239- * is parsed to extract __part_0=54 and __part_1=us-east, which are then
240- * converted to the proper Iceberg binary format using the partition transforms.
246+ * Each value is converted to the proper Iceberg binary format using the
247+ * partition transforms.
241248 */
242249Partition *
243- ParsePartitionValuesFromPath (char * filePath , List * transforms )
250+ ParsePartitionValuesFromPartitionKeys (char * partitionKeysText , List * transforms )
244251{
245252 int numTransforms = list_length (transforms );
246253 Partition * partition = palloc0 (sizeof (Partition ));
247254
248255 partition -> fields = palloc0 (sizeof (PartitionField ) * numTransforms );
249256 partition -> fields_length = numTransforms ;
250257
251- for (int partIndex = 0 ; partIndex < numTransforms ; partIndex ++ )
258+ /* parse the MAP(TEXT,TEXT) text into a datum */
259+ Oid mapTypeOid = GetOrCreatePGMapType ("MAP(TEXT,TEXT)" );
260+ Oid typoinput ;
261+ Oid typioparam ;
262+
263+ getTypeInputInfo (mapTypeOid , & typoinput , & typioparam );
264+ Datum mapDatum = OidInputFunctionCall (typoinput , partitionKeysText ,
265+ typioparam , -1 );
266+
267+ /*
268+ * Build an array of value texts indexed by partition index. We iterate
269+ * the map entries and match __part_N keys to their indices.
270+ */
271+ char * * valueTexts = palloc0 (sizeof (char * ) * numTransforms );
272+ bool * valueIsNull = palloc0 (sizeof (bool ) * numTransforms );
273+
274+ ArrayType * elementsArray = DatumGetArrayTypeP (mapDatum );
275+ ArrayIterator arrayIterator = array_create_iterator (elementsArray , 0 , NULL );
276+ Datum elemDatum ;
277+ bool isNull = false;
278+
279+ while (array_iterate (arrayIterator , & elemDatum , & isNull ))
252280 {
253- IcebergPartitionTransform * transform = list_nth (transforms , partIndex );
281+ if (isNull )
282+ continue ;
283+
284+ HeapTupleHeader tupleHeader = DatumGetHeapTupleHeader (elemDatum );
285+ bool keyIsNull = false;
286+ bool valIsNull = false;
287+
288+ Datum keyDatum = GetAttributeByNum (tupleHeader , 1 , & keyIsNull );
289+ Datum valDatum = GetAttributeByNum (tupleHeader , 2 , & valIsNull );
254290
255- /* build the search key: "__part_N=" */
256- char * searchKey = psprintf ("__part_%d=" , partIndex );
257- int searchKeyLen = strlen (searchKey );
291+ if (keyIsNull )
292+ continue ;
258293
259- /* find this key in the path */
260- char * found = strstr (filePath , searchKey );
294+ char * key = TextDatumGetCString (keyDatum );
261295
262- if (found == NULL )
296+ /* parse __part_N to get the partition index */
297+ if (strncmp (key , "__part_" , 7 ) != 0 )
298+ continue ;
299+
300+ int partIndex = pg_strtoint32 (key + 7 );
301+
302+ if (partIndex < 0 || partIndex >= numTransforms )
263303 {
264304 ereport (ERROR ,
265305 (errcode (ERRCODE_INTERNAL_ERROR ),
266- errmsg ("could not find partition key %s in path %s " ,
267- searchKey , filePath )));
306+ errmsg ("unexpected partition key %s (expected 0..%d) " ,
307+ key , numTransforms - 1 )));
268308 }
269309
270- /* extract the value (from after '=' up to the next '/' or end) */
271- char * valueStart = found + searchKeyLen ;
272- char * valueEnd = strchr (valueStart , '/' );
273- int valueLen = (valueEnd != NULL ) ?
274- (valueEnd - valueStart ) : strlen (valueStart );
275-
276- char * valueText = pnstrdup (valueStart , valueLen );
310+ if (valIsNull )
311+ {
312+ valueIsNull [partIndex ] = true;
313+ }
314+ else
315+ {
316+ valueTexts [partIndex ] = TextDatumGetCString (valDatum );
317+ }
318+ }
277319
278- /* URL-decode (DuckDB percent-encodes special chars in Hive paths) */
279- valueText = UrlDecodePartitionValue (valueText );
320+ array_free_iterator (arrayIterator );
280321
281- /* populate the partition field */
322+ /* convert each partition value to Iceberg binary format */
323+ for (int partIndex = 0 ; partIndex < numTransforms ; partIndex ++ )
324+ {
325+ IcebergPartitionTransform * transform = list_nth (transforms , partIndex );
282326 PartitionField * field = & partition -> fields [partIndex ];
283327
284328 field -> field_id = transform -> partitionFieldId ;
285329 field -> field_name = pstrdup (transform -> partitionFieldName );
286330 field -> value_type = GetTransformResultAvroType (transform );
287331
288- if (strcmp ( valueText , "NULL" ) == 0 )
332+ if (valueIsNull [ partIndex ] || valueTexts [ partIndex ] == NULL )
289333 {
290334 /* NULL partition value */
291335 field -> value = NULL ;
@@ -297,11 +341,12 @@ ParsePartitionValuesFromPath(char *filePath, List *transforms)
297341 transform -> pgType .postgresTypeOid == TIMESTAMPTZOID ))
298342 {
299343 /*
300- * Identity temporal types use epoch integers in the path (days
301- * for date, microseconds for timestamp).
344+ * Identity temporal types use epoch integers (days for date,
345+ * microseconds for timestamp).
302346 */
303347 field -> value = DeserializePartitionValueFromEpochInteger (
304- transform , valueText , & field -> value_length );
348+ transform , valueTexts [partIndex ],
349+ & field -> value_length );
305350 }
306351 else
307352 {
@@ -310,12 +355,14 @@ ParsePartitionValuesFromPath(char *filePath, List *transforms)
310355 * for numeric) so the roundtrip assertion in
311356 * DeserializePartitionValueFromPGText passes.
312357 */
313- valueText = NormalizeDuckDBTextToPGText (valueText ,
314- transform -> resultPgType .postgresTypeOid ,
315- transform -> resultPgType .postgresTypeMod );
358+ char * normalizedText =
359+ NormalizeDuckDBTextToPGText (valueTexts [partIndex ],
360+ transform -> resultPgType .postgresTypeOid ,
361+ transform -> resultPgType .postgresTypeMod );
316362
317363 field -> value = DeserializePartitionValueFromPGText (
318- transform , valueText , & field -> value_length );
364+ transform , normalizedText ,
365+ & field -> value_length );
319366 }
320367 }
321368
0 commit comments