Skip to content

Commit debe2e9

Browse files
committed
Enable INSERT..SELECT and COPY FROM pushdown for partitioned Iceberg tables
Partitioned writes previously went through the row-by-row PartitionedDestReceiver, which routes each row to a per-partition CSV file before converting to Parquet. This change enables DuckDB's PARTITION_BY in COPY TO for tables using pushdownable transforms (identity, year, month, day, hour), letting DuckDB split the data in a single pass. Bucket and truncate transforms continue to use the existing path. Key changes: - Add partition_pushdown.c with transform-to-DuckDB-SQL conversion, query wrapping with synthetic partition columns, and Hive-style path parsing for partition values - Extend WriteQueryResultTo with partitionByExprs parameter that wraps the query with synthetic columns AFTER validation/interval wrappers and adds PARTITION_BY to the COPY command - Modify AddQueryResultToTable to detect pushdownable partitions, pass expressions through, and parse per-file partition values from DuckDB output paths - Lift blanket partition blocks in IsPushdownableInsertSelectQuery and IsCopyFromPushdownable to allow pushdown when all transforms are supported - Disable FILE_SIZE_BYTES when PARTITION_BY is used (DuckDB limitation) Signed-off-by: Marco Slot <marco.slot@snowflake.com>
1 parent d0b054b commit debe2e9

15 files changed

Lines changed: 2465 additions & 28 deletions

File tree

pg_lake_copy/src/copy/copy.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
#include "pg_lake/extensions/pg_lake_copy.h"
4242
#include "pg_lake/extensions/pg_parquet.h"
4343
#include "pg_lake/extensions/postgis.h"
44+
#include "pg_lake/fdw/partition_pushdown.h"
45+
#include "pg_lake/fdw/partition_transform.h"
4446
#include "pg_lake/fdw/writable_table.h"
4547
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
4648
#include "pg_lake/partitioning/partition_by_parser.h"
@@ -651,9 +653,14 @@ IsCopyFromPushdownable(Relation relation, List *columnNameList,
651653
if (!RelationColumnsSuitableForPushdown(relation, sourceFormat))
652654
return false;
653655

656+
/*
657+
* Partitioned tables can be pushed down if all transforms are supported
658+
* by DuckDB PARTITION_BY (identity, year, month, day, hour).
659+
*/
654660
const char *partitionBy = GetIcebergTablePartitionByOption(relationId);
655661

656-
if (partitionBy != NULL)
662+
if (partitionBy != NULL &&
663+
GetPartitionByExpressionsForRelation(relationId) == NIL)
657664
return false;
658665

659666
return true;

pg_lake_engine/include/pg_lake/data_file/data_file_stats.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ typedef struct DataFileStats
9292

9393
/* for a new data file with row IDs, the start of the range */
9494
int64 rowIdStart;
95+
96+
/*
97+
* partition key values from COPY TO return_stats (NULL if not
98+
* partitioned)
99+
*/
100+
char *partitionKeysText;
95101
} DataFileStats;
96102

97103
typedef struct StatsCollector

pg_lake_engine/include/pg_lake/pgduck/map.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include "pg_lake/pgduck/type.h"
2121

22-
Oid GetOrCreatePGMapType(const char *name);
22+
extern PGDLLEXPORT Oid GetOrCreatePGMapType(const char *name);
2323
char *GetDuckDBMapDefinitionForPGType(Oid postgresTypeId,
2424
CopyDataFormat format);
2525

pg_lake_engine/include/pg_lake/pgduck/write_data.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ extern PGDLLEXPORT StatsCollector * WriteQueryResultTo(char *query,
5656
TupleDesc queryTupleDesc,
5757
List *leafFields,
5858
IcebergOutOfRangePolicy outOfRangePolicy,
59-
bool wrapNativeIntervals);
59+
bool wrapNativeIntervals,
60+
List *partitionByExprs);
6061
extern PGDLLEXPORT void AppendFields(StringInfo map, DataFileSchema * schema);
6162
extern PGDLLEXPORT char *TupleDescToColumnMapForWrite(TupleDesc tupleDesc, CopyDataFormat destinationFormat);
6263
extern PGDLLEXPORT char *TupleDescToProjectionListForWrite(TupleDesc tupleDesc,
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
-- Upgrade script for pg_lake_engine from 3.3 to 3.4
2+
3+
-- Pre-create MAP(TEXT,TEXT) type for partition_keys parsing in partitioned writes.
4+
-- This avoids runtime type creation during DML operations.
5+
SELECT map_type.create('text'::regtype, 'text'::regtype);

pg_lake_engine/src/data_file/data_file_stats.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ GetDataFileStatsListFromPGResult(PGresult *result, List *leafFields, DataFileSch
192192
{
193193
fileStats->dataFilePath = pstrdup(resultValue);
194194
}
195+
else if (strcmp(resultColName, "partition_keys") == 0)
196+
{
197+
if (!PQgetisnull(result, resultRowIndex, resultColIndex))
198+
fileStats->partitionKeysText = pstrdup(resultValue);
199+
}
195200
}
196201

197202
statsList = lappend(statsList, fileStats);

pg_lake_engine/src/pgduck/write_data.c

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ ConvertCSVFileTo(char *csvFilePath, TupleDesc csvTupleDesc, int maxLineSize,
104104
csvTupleDesc,
105105
leafFields,
106106
ICEBERG_OOR_NONE,
107-
false /* wrapNativeIntervals */ );
107+
false /* wrapNativeIntervals */ ,
108+
NIL /* partitionByExprs */ );
108109
}
109110

110111

@@ -124,7 +125,8 @@ WriteQueryResultTo(char *query,
124125
TupleDesc queryTupleDesc,
125126
List *leafFields,
126127
IcebergOutOfRangePolicy outOfRangePolicy,
127-
bool wrapNativeIntervals)
128+
bool wrapNativeIntervals,
129+
List *partitionByExprs)
128130
{
129131
if (outOfRangePolicy != ICEBERG_OOR_NONE)
130132
{
@@ -139,6 +141,34 @@ WriteQueryResultTo(char *query,
139141
queryHasRowId);
140142
}
141143

144+
/*
145+
* If partition expressions are given, wrap the (already validated) query
146+
* with synthetic partition columns. This must happen AFTER the validation
147+
* and interval wrappers, because those reconstruct the SELECT list from
148+
* queryTupleDesc and would drop any extra columns added earlier.
149+
*/
150+
if (partitionByExprs != NIL)
151+
{
152+
StringInfoData wrapped;
153+
154+
initStringInfo(&wrapped);
155+
appendStringInfoString(&wrapped, "SELECT *");
156+
157+
int partIndex = 0;
158+
ListCell *exprCell = NULL;
159+
160+
foreach(exprCell, partitionByExprs)
161+
{
162+
char *expr = strVal(lfirst(exprCell));
163+
164+
appendStringInfo(&wrapped, ", %s AS __part_%d", expr, partIndex);
165+
partIndex++;
166+
}
167+
168+
appendStringInfo(&wrapped, " FROM (%s) __partitioned_source", query);
169+
query = wrapped.data;
170+
}
171+
142172
StringInfoData command;
143173

144174
initStringInfo(&command);
@@ -364,6 +394,24 @@ WriteQueryResultTo(char *query,
364394
elog(ERROR, "unexpected format: %s", formatName);
365395
}
366396

397+
/* add PARTITION_BY if partitioning expressions were specified */
398+
if (partitionByExprs != NIL)
399+
{
400+
appendStringInfoString(&command, ", PARTITION_BY (");
401+
402+
int numExprs = list_length(partitionByExprs);
403+
404+
for (int i = 0; i < numExprs; i++)
405+
{
406+
if (i > 0)
407+
appendStringInfoString(&command, ", ");
408+
409+
appendStringInfo(&command, "__part_%d", i);
410+
}
411+
412+
appendStringInfoString(&command, ")");
413+
}
414+
367415
/* end WITH options */
368416
appendStringInfoString(&command, ")");
369417

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 Snowflake Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#pragma once
19+
20+
#include "postgres.h"
21+
22+
#include "pg_lake/iceberg/api/partitioning.h"
23+
#include "pg_lake/iceberg/manifest_spec.h"
24+
25+
extern PGDLLEXPORT List *GetPartitionByExpressionsForRelation(Oid relationId);
26+
extern Partition * ParsePartitionValuesFromPartitionKeys(char *partitionKeysText,
27+
List *transforms);

pg_lake_table/include/pg_lake/fdw/partition_transform.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ extern Partition * ComputePartitionTupleForTuple(List *transforms, TupleTableSlo
2727
extern void *ApplyBucketTransformToColumn(IcebergPartitionTransform * transform,
2828
Datum columnValue, bool isNull,
2929
size_t *bucketSize);
30-
extern List *CurrentPartitionTransformList(Oid relationId);
30+
extern PGDLLEXPORT List *CurrentPartitionTransformList(Oid relationId);
3131
extern IcebergPartitionSpec * GetPartitionSpecIfAlreadyExist(Oid relationId, List *partitionTransforms);
3232
extern List *AllPartitionTransformList(Oid relationId);
3333
extern List *GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields);

0 commit comments

Comments
 (0)