Skip to content

Commit e5bd8ba

Browse files
committed
Enable INSERT..SELECT and COPY FROM pushdown for partitioned Iceberg tables
Partitioned writes previously went through the row-by-row PartitionedDestReceiver, which routes each row to a per-partition CSV file before converting to Parquet. This change enables DuckDB's PARTITION_BY in COPY TO for tables using pushdownable transforms (identity, year, month, day, hour), letting DuckDB split the data in a single pass. Bucket and truncate transforms continue to use the existing path. Key changes: - Add partition_pushdown.c with transform-to-DuckDB-SQL conversion, query wrapping with synthetic partition columns, and Hive-style path parsing for partition values - Extend WriteQueryResultTo with partitionByExprs parameter that wraps the query with synthetic columns AFTER validation/interval wrappers and adds PARTITION_BY to the COPY command - Modify AddQueryResultToTable to detect pushdownable partitions, pass expressions through, and parse per-file partition values from DuckDB output paths - Lift blanket partition blocks in IsPushdownableInsertSelectQuery and IsCopyFromPushdownable to allow pushdown when all transforms are supported - Disable FILE_SIZE_BYTES when PARTITION_BY is used (DuckDB limitation) Signed-off-by: Marco Slot <marco.slot@snowflake.com>
1 parent d0b054b commit e5bd8ba

10 files changed

Lines changed: 1666 additions & 28 deletions

File tree

pg_lake_copy/src/copy/copy.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
#include "pg_lake/extensions/pg_lake_copy.h"
4242
#include "pg_lake/extensions/pg_parquet.h"
4343
#include "pg_lake/extensions/postgis.h"
44+
#include "pg_lake/fdw/partition_pushdown.h"
45+
#include "pg_lake/fdw/partition_transform.h"
4446
#include "pg_lake/fdw/writable_table.h"
4547
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
4648
#include "pg_lake/partitioning/partition_by_parser.h"
@@ -651,10 +653,19 @@ IsCopyFromPushdownable(Relation relation, List *columnNameList,
651653
if (!RelationColumnsSuitableForPushdown(relation, sourceFormat))
652654
return false;
653655

656+
/*
657+
* Partitioned tables can be pushed down if all transforms are supported
658+
* by DuckDB PARTITION_BY (identity, year, month, day, hour).
659+
*/
654660
const char *partitionBy = GetIcebergTablePartitionByOption(relationId);
655661

656662
if (partitionBy != NULL)
657-
return false;
663+
{
664+
List *transforms = CurrentPartitionTransformList(relationId);
665+
666+
if (!AllPartitionTransformsPushdownable(transforms))
667+
return false;
668+
}
658669

659670
return true;
660671
}

pg_lake_engine/include/pg_lake/pgduck/write_data.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ extern PGDLLEXPORT StatsCollector * WriteQueryResultTo(char *query,
5656
TupleDesc queryTupleDesc,
5757
List *leafFields,
5858
IcebergOutOfRangePolicy outOfRangePolicy,
59-
bool wrapNativeIntervals);
59+
bool wrapNativeIntervals,
60+
List *partitionByExprs);
6061
extern PGDLLEXPORT void AppendFields(StringInfo map, DataFileSchema * schema);
6162
extern PGDLLEXPORT char *TupleDescToColumnMapForWrite(TupleDesc tupleDesc, CopyDataFormat destinationFormat);
6263
extern PGDLLEXPORT char *TupleDescToProjectionListForWrite(TupleDesc tupleDesc,

pg_lake_engine/src/pgduck/write_data.c

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ ConvertCSVFileTo(char *csvFilePath, TupleDesc csvTupleDesc, int maxLineSize,
104104
csvTupleDesc,
105105
leafFields,
106106
ICEBERG_OOR_NONE,
107-
false /* wrapNativeIntervals */ );
107+
false /* wrapNativeIntervals */ ,
108+
NIL /* partitionByExprs */ );
108109
}
109110

110111

@@ -124,7 +125,8 @@ WriteQueryResultTo(char *query,
124125
TupleDesc queryTupleDesc,
125126
List *leafFields,
126127
IcebergOutOfRangePolicy outOfRangePolicy,
127-
bool wrapNativeIntervals)
128+
bool wrapNativeIntervals,
129+
List *partitionByExprs)
128130
{
129131
if (outOfRangePolicy != ICEBERG_OOR_NONE)
130132
{
@@ -139,6 +141,34 @@ WriteQueryResultTo(char *query,
139141
queryHasRowId);
140142
}
141143

144+
/*
145+
* If partition expressions are given, wrap the (already validated) query
146+
* with synthetic partition columns. This must happen AFTER the validation
147+
* and interval wrappers, because those reconstruct the SELECT list from
148+
* queryTupleDesc and would drop any extra columns added earlier.
149+
*/
150+
if (partitionByExprs != NIL)
151+
{
152+
StringInfoData wrapped;
153+
154+
initStringInfo(&wrapped);
155+
appendStringInfoString(&wrapped, "SELECT *");
156+
157+
int partIndex = 0;
158+
ListCell *exprCell = NULL;
159+
160+
foreach(exprCell, partitionByExprs)
161+
{
162+
char *expr = strVal(lfirst(exprCell));
163+
164+
appendStringInfo(&wrapped, ", %s AS __part_%d", expr, partIndex);
165+
partIndex++;
166+
}
167+
168+
appendStringInfo(&wrapped, " FROM (%s) __partitioned_source", query);
169+
query = wrapped.data;
170+
}
171+
142172
StringInfoData command;
143173

144174
initStringInfo(&command);
@@ -364,6 +394,24 @@ WriteQueryResultTo(char *query,
364394
elog(ERROR, "unexpected format: %s", formatName);
365395
}
366396

397+
/* add PARTITION_BY if partitioning expressions were specified */
398+
if (partitionByExprs != NIL)
399+
{
400+
appendStringInfoString(&command, ", PARTITION_BY (");
401+
402+
int numExprs = list_length(partitionByExprs);
403+
404+
for (int i = 0; i < numExprs; i++)
405+
{
406+
if (i > 0)
407+
appendStringInfoString(&command, ", ");
408+
409+
appendStringInfo(&command, "__part_%d", i);
410+
}
411+
412+
appendStringInfoString(&command, ")");
413+
}
414+
367415
/* end WITH options */
368416
appendStringInfoString(&command, ")");
369417

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 Snowflake Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#pragma once
19+
20+
#include "postgres.h"
21+
22+
#include "pg_lake/iceberg/api/partitioning.h"
23+
#include "pg_lake/iceberg/manifest_spec.h"
24+
25+
extern PGDLLEXPORT bool AllPartitionTransformsPushdownable(List *transforms);
26+
extern List *GetPartitionByExpressions(List *transforms);
27+
extern Partition * ParsePartitionValuesFromPath(char *filePath, List *transforms);

pg_lake_table/include/pg_lake/fdw/partition_transform.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ extern Partition * ComputePartitionTupleForTuple(List *transforms, TupleTableSlo
2727
extern void *ApplyBucketTransformToColumn(IcebergPartitionTransform * transform,
2828
Datum columnValue, bool isNull,
2929
size_t *bucketSize);
30-
extern List *CurrentPartitionTransformList(Oid relationId);
30+
extern PGDLLEXPORT List *CurrentPartitionTransformList(Oid relationId);
3131
extern IcebergPartitionSpec * GetPartitionSpecIfAlreadyExist(Oid relationId, List *partitionTransforms);
3232
extern List *AllPartitionTransformList(Oid relationId);
3333
extern List *GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields);
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright 2025 Snowflake Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/*
19+
* partition_pushdown.c
20+
*
21+
* Utilities for pushing down partitioned Iceberg writes to DuckDB using
22+
* the PARTITION_BY clause in COPY TO. Supports identity and temporal
23+
* (year, month, day, hour) partition transforms.
24+
*/
25+
#include "postgres.h"
26+
27+
#include "utils/builtins.h"
28+
29+
#include "pg_lake/fdw/partition_pushdown.h"
30+
#include "pg_lake/fdw/partition_transform.h"
31+
#include "pg_lake/iceberg/api/partitioning.h"
32+
#include "pg_lake/iceberg/manifest_spec.h"
33+
34+
/* prefix for synthetic partition columns in COPY TO queries */
35+
#define PARTITION_COLUMN_PREFIX "__part_"
36+
37+
38+
static char *PartitionTransformToDuckDBExpression(IcebergPartitionTransform * transform);
39+
40+
41+
/*
42+
* AllPartitionTransformsPushdownable returns true if every transform in the
43+
* list can be expressed as a DuckDB SQL expression for PARTITION_BY.
44+
*/
45+
bool
46+
AllPartitionTransformsPushdownable(List *transforms)
47+
{
48+
if (transforms == NIL)
49+
return false;
50+
51+
ListCell *cell = NULL;
52+
53+
foreach(cell, transforms)
54+
{
55+
IcebergPartitionTransform *transform = lfirst(cell);
56+
char *expr = PartitionTransformToDuckDBExpression(transform);
57+
58+
if (expr == NULL)
59+
return false;
60+
}
61+
62+
return true;
63+
}
64+
65+
66+
/*
67+
* PartitionTransformToDuckDBExpression returns a DuckDB SQL expression that
68+
* computes the Iceberg partition value for the given transform.
69+
*
70+
* Returns NULL for transforms that cannot be pushed down (bucket, truncate, void).
71+
*
72+
* The expressions produce Iceberg-compatible partition values:
73+
* - year: integer years since 1970
74+
* - month: integer months since Jan 1970
75+
* - day: integer days since 1970-01-01
76+
* - hour: integer hours since 1970-01-01T00:00:00
77+
* - identity: the column value (with type-specific casts for date/timestamp)
78+
*/
79+
static char *
80+
PartitionTransformToDuckDBExpression(IcebergPartitionTransform * transform)
81+
{
82+
const char *col = quote_identifier(transform->columnName);
83+
Oid typeOid = transform->pgType.postgresTypeOid;
84+
85+
switch (transform->type)
86+
{
87+
case PARTITION_TRANSFORM_IDENTITY:
88+
{
89+
/*
90+
* Identity partitions use the column value directly. DuckDB
91+
* writes the value in its default text format into the
92+
* Hive-style path (e.g. __part_0=2025-01-15), and
93+
* DeserializePartitionValueFromPGText parses it back using
94+
* the transform's result PG type.
95+
*/
96+
return psprintf("%s", col);
97+
}
98+
99+
case PARTITION_TRANSFORM_YEAR:
100+
return psprintf("(year(%s) - 1970)", col);
101+
102+
case PARTITION_TRANSFORM_MONTH:
103+
return psprintf("((year(%s) - 1970) * 12 + month(%s) - 1)", col, col);
104+
105+
case PARTITION_TRANSFORM_DAY:
106+
return psprintf("datediff('day', date '1970-01-01', %s::date)", col);
107+
108+
case PARTITION_TRANSFORM_HOUR:
109+
return psprintf("datediff('hour', timestamp '1970-01-01', %s::timestamp)", col);
110+
111+
case PARTITION_TRANSFORM_BUCKET:
112+
case PARTITION_TRANSFORM_TRUNCATE:
113+
case PARTITION_TRANSFORM_VOID:
114+
return NULL;
115+
}
116+
117+
return NULL;
118+
}
119+
120+
121+
/*
122+
* GetPartitionByExpressions returns a list of String values with the DuckDB
123+
* SQL expressions for each partition transform. These are passed to
124+
* WriteQueryResultTo which wraps the query with synthetic partition columns
125+
* after validation wrapping.
126+
*/
127+
List *
128+
GetPartitionByExpressions(List *transforms)
129+
{
130+
List *exprs = NIL;
131+
ListCell *cell = NULL;
132+
133+
foreach(cell, transforms)
134+
{
135+
IcebergPartitionTransform *transform = lfirst(cell);
136+
char *expr = PartitionTransformToDuckDBExpression(transform);
137+
138+
Assert(expr != NULL);
139+
140+
exprs = lappend(exprs, makeString(expr));
141+
}
142+
143+
return exprs;
144+
}
145+
146+
147+
/*
148+
* ParsePartitionValuesFromPath extracts partition values from the Hive-style
149+
* directory path produced by DuckDB COPY TO with PARTITION_BY.
150+
*
151+
* A path like:
152+
* s3://bucket/data/abc123/__part_0=54/__part_1=us-east/data_0.parquet
153+
*
154+
* is parsed to extract __part_0=54 and __part_1=us-east, which are then
155+
* converted to the proper Iceberg binary format using the partition transforms.
156+
*/
157+
Partition *
158+
ParsePartitionValuesFromPath(char *filePath, List *transforms)
159+
{
160+
int numTransforms = list_length(transforms);
161+
Partition *partition = palloc0(sizeof(Partition));
162+
163+
partition->fields = palloc0(sizeof(PartitionField) * numTransforms);
164+
partition->fields_length = numTransforms;
165+
166+
for (int partIndex = 0; partIndex < numTransforms; partIndex++)
167+
{
168+
IcebergPartitionTransform *transform = list_nth(transforms, partIndex);
169+
170+
/* build the search key: "__part_N=" */
171+
char *searchKey = psprintf(PARTITION_COLUMN_PREFIX "%d=", partIndex);
172+
int searchKeyLen = strlen(searchKey);
173+
174+
/* find this key in the path */
175+
char *found = strstr(filePath, searchKey);
176+
177+
if (found == NULL)
178+
{
179+
ereport(ERROR,
180+
(errcode(ERRCODE_INTERNAL_ERROR),
181+
errmsg("could not find partition key %s in path %s",
182+
searchKey, filePath)));
183+
}
184+
185+
/* extract the value (from after '=' up to the next '/' or end) */
186+
char *valueStart = found + searchKeyLen;
187+
char *valueEnd = strchr(valueStart, '/');
188+
int valueLen = (valueEnd != NULL) ?
189+
(valueEnd - valueStart) : strlen(valueStart);
190+
191+
char *valueText = pnstrdup(valueStart, valueLen);
192+
193+
/* populate the partition field */
194+
PartitionField *field = &partition->fields[partIndex];
195+
196+
field->field_id = transform->partitionFieldId;
197+
field->field_name = pstrdup(transform->partitionFieldName);
198+
field->value_type = GetTransformResultAvroType(transform);
199+
200+
if (strcmp(valueText, "NULL") == 0)
201+
{
202+
/* NULL partition value */
203+
field->value = NULL;
204+
field->value_length = 0;
205+
}
206+
else
207+
{
208+
/*
209+
* Convert the text value to Iceberg binary format. The text
210+
* representation matches the transform's result PG type.
211+
*/
212+
field->value = DeserializePartitionValueFromPGText(
213+
transform, valueText, &field->value_length);
214+
}
215+
}
216+
217+
return partition;
218+
}

0 commit comments

Comments
 (0)