Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pg_lake_copy/src/copy/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "pg_lake/extensions/pg_lake_copy.h"
#include "pg_lake/extensions/pg_parquet.h"
#include "pg_lake/extensions/postgis.h"
#include "pg_lake/fdw/partition_pushdown.h"
#include "pg_lake/fdw/partition_transform.h"
#include "pg_lake/fdw/writable_table.h"
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
#include "pg_lake/partitioning/partition_by_parser.h"
Expand Down Expand Up @@ -651,9 +653,17 @@ IsCopyFromPushdownable(Relation relation, List *columnNameList,
if (!RelationColumnsSuitableForPushdown(relation, sourceFormat))
return false;

/*
* Partitioned tables can be pushed down if all transforms are supported
* by DuckDB PARTITION_BY (identity, year, month, day, hour).
*/
const char *partitionBy = GetIcebergTablePartitionByOption(relationId);

if (partitionBy != NULL)
if (partitionBy != NULL && !EnablePartitionedWritePushdown)
return false;

if (partitionBy != NULL &&
GetPartitionByExpressionsForRelation(relationId) == NIL)
return false;

return true;
Expand Down
6 changes: 6 additions & 0 deletions pg_lake_engine/include/pg_lake/data_file/data_file_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ typedef struct DataFileStats

/* for a new data file with row IDs, the start of the range */
int64 rowIdStart;

/*
* partition key values from COPY TO return_stats (NULL if not
* partitioned)
*/
char *partitionKeysText;
} DataFileStats;

typedef struct StatsCollector
Expand Down
2 changes: 1 addition & 1 deletion pg_lake_engine/include/pg_lake/pgduck/map.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include "pg_lake/pgduck/type.h"

Oid GetOrCreatePGMapType(const char *name);
extern PGDLLEXPORT Oid GetOrCreatePGMapType(const char *name);
char *GetDuckDBMapDefinitionForPGType(Oid postgresTypeId,
CopyDataFormat format);

Expand Down
10 changes: 9 additions & 1 deletion pg_lake_engine/include/pg_lake/pgduck/write_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
#include "pg_lake/pgduck/iceberg_validation.h"
#include "nodes/pg_list.h"

/*
* Prefix for synthetic partition columns in DuckDB COPY TO queries.
* The PARTITION_BY expressions are aliased as __pglake_part_0, __pglake_part_1, ...
* and the partition_keys MAP returned by DuckDB uses these as keys.
*/
#define PARTITION_COLUMN_PREFIX "__pglake_part_"

/* pg_lake_table.target_row_group_size_mb */
#define DEFAULT_TARGET_ROW_GROUP_SIZE_MB 512
extern PGDLLEXPORT int TargetRowGroupSizeMB;
Expand Down Expand Up @@ -56,7 +63,8 @@ extern PGDLLEXPORT StatsCollector * WriteQueryResultTo(char *query,
TupleDesc queryTupleDesc,
List *leafFields,
IcebergOutOfRangePolicy outOfRangePolicy,
bool wrapNativeIntervals);
bool wrapNativeIntervals,
List *partitionByExprs);
extern PGDLLEXPORT void AppendFields(StringInfo map, DataFileSchema * schema);
extern PGDLLEXPORT char *TupleDescToColumnMapForWrite(TupleDesc tupleDesc, CopyDataFormat destinationFormat);
extern PGDLLEXPORT char *TupleDescToProjectionListForWrite(TupleDesc tupleDesc,
Expand Down
5 changes: 5 additions & 0 deletions pg_lake_engine/src/data_file/data_file_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ GetDataFileStatsListFromPGResult(PGresult *result, List *leafFields, DataFileSch
{
fileStats->dataFilePath = pstrdup(resultValue);
}
else if (strcmp(resultColName, "partition_keys") == 0)
{
if (!PQgetisnull(result, resultRowIndex, resultColIndex))
fileStats->partitionKeysText = pstrdup(resultValue);
}
}

statsList = lappend(statsList, fileStats);
Expand Down
61 changes: 59 additions & 2 deletions pg_lake_engine/src/pgduck/write_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ ConvertCSVFileTo(char *csvFilePath, TupleDesc csvTupleDesc, int maxLineSize,
csvTupleDesc,
leafFields,
ICEBERG_OOR_NONE,
false /* wrapNativeIntervals */ );
false /* wrapNativeIntervals */ ,
NIL /* partitionByExprs */ );
}


Expand All @@ -124,7 +125,8 @@ WriteQueryResultTo(char *query,
TupleDesc queryTupleDesc,
List *leafFields,
IcebergOutOfRangePolicy outOfRangePolicy,
bool wrapNativeIntervals)
bool wrapNativeIntervals,
List *partitionByExprs)
{
if (outOfRangePolicy != ICEBERG_OOR_NONE)
{
Expand All @@ -139,6 +141,34 @@ WriteQueryResultTo(char *query,
queryHasRowId);
}

/*
* If partition expressions are given, wrap the (already validated) query
* with synthetic partition columns. This must happen AFTER the validation
* and interval wrappers, because those reconstruct the SELECT list from
* queryTupleDesc and would drop any extra columns added earlier.
*/
if (partitionByExprs != NIL)
{
StringInfoData wrapped;

initStringInfo(&wrapped);
appendStringInfoString(&wrapped, "SELECT *");

int partIndex = 0;
ListCell *exprCell = NULL;

foreach(exprCell, partitionByExprs)
{
char *expr = strVal(lfirst(exprCell));

appendStringInfo(&wrapped, ", %s AS " PARTITION_COLUMN_PREFIX "%d", expr, partIndex);
partIndex++;
}

appendStringInfo(&wrapped, " FROM (%s) __partitioned_source", query);
query = wrapped.data;
}

StringInfoData command;

initStringInfo(&command);
Expand Down Expand Up @@ -364,6 +394,33 @@ WriteQueryResultTo(char *query,
elog(ERROR, "unexpected format: %s", formatName);
}

/* add PARTITION_BY if partitioning expressions were specified */
if (partitionByExprs != NIL)
{
appendStringInfoString(&command, ", PARTITION_BY (");

int numExprs = list_length(partitionByExprs);

for (int i = 0; i < numExprs; i++)
{
if (i > 0)
appendStringInfoString(&command, ", ");

appendStringInfo(&command, PARTITION_COLUMN_PREFIX "%d", i);
}

appendStringInfoString(&command, ")");

/*
* Disable Hive-style directory naming (col=val/) since we extract
* partition values from the partition_keys MAP, not from file paths.
* This avoids issues with long text values causing HTTP 400 errors
* and percent-encoding in directory names.
*/
appendStringInfoString(&command, ", WRITE_PARTITION_COLUMNS false"
", HIVE_FILE_PATTERN false");
}

/* end WITH options */
appendStringInfoString(&command, ")");

Expand Down
27 changes: 27 additions & 0 deletions pg_lake_table/include/pg_lake/fdw/partition_pushdown.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 Snowflake Inc.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "postgres.h"

#include "pg_lake/iceberg/api/partitioning.h"
#include "pg_lake/iceberg/manifest_spec.h"

extern PGDLLEXPORT List *GetPartitionByExpressionsForRelation(Oid relationId);
extern Partition * ParsePartitionValuesFromPartitionKeys(char *partitionKeysText,
List *transforms);
2 changes: 1 addition & 1 deletion pg_lake_table/include/pg_lake/fdw/partition_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extern Partition * ComputePartitionTupleForTuple(List *transforms, TupleTableSlo
extern void *ApplyBucketTransformToColumn(IcebergPartitionTransform * transform,
Datum columnValue, bool isNull,
size_t *bucketSize);
extern List *CurrentPartitionTransformList(Oid relationId);
extern PGDLLEXPORT List *CurrentPartitionTransformList(Oid relationId);
extern IcebergPartitionSpec * GetPartitionSpecIfAlreadyExist(Oid relationId, List *partitionTransforms);
extern List *AllPartitionTransformList(Oid relationId);
extern List *GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields);
Expand Down
3 changes: 3 additions & 0 deletions pg_lake_table/include/pg_lake/planner/insert_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
/* pg_lake_table.enable_insert_select_pushdown setting */
extern bool EnableInsertSelectPushdown;

/* pg_lake_table.enable_partitioned_write_pushdown setting */
extern PGDLLEXPORT bool EnablePartitionedWritePushdown;

bool IsPushdownableInsertSelectQuery(Query *query);
bool IsInsertSelectQuery(Query *query);
Oid GetInsertRelidFromInsertSelect(Query *query);
Expand Down
Loading
Loading