Skip to content

Commit 455c4f2

Browse files
Use return_stats option to collect column statistics (#108)
Use duckdb output of COPY statements with return_stats option, to collect column statistics for parquet files.
1 parent 12ca789 commit 455c4f2

27 files changed

Lines changed: 1676 additions & 1155 deletions

pg_lake_copy/src/copy/copy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ ProcessPgLakeCopyTo(CopyStmt *copyStmt, ParseState *pstate, Relation relation,
916916
*/
917917
ConvertCSVFileTo(tempCSVPath, tupleDesc, maximumLineLength,
918918
destinationPath, destinationFormat, destinationCompression,
919-
copyStmt->options, schema);
919+
copyStmt->options, schema, NIL);
920920

921921
if (IsCopyToStdout(copyStmt))
922922
{

pg_lake_engine/include/pg_lake/data_file/data_file_stats.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,36 @@
2222
#include "datatype/timestamp.h"
2323

2424
#include "pg_lake/parquet/leaf_field.h"
25+
#include "pg_lake/pgduck/client.h"
26+
27+
28+
/*
29+
* ColumnStatsMode describes the mode of column stats.
30+
* - When truncate mode (default) is used, the column stats are truncated
31+
* to the given length.
32+
* - When none mode is used, the column stats are not collected.
33+
*/
34+
typedef enum ColumnStatsMode
35+
{
36+
COLUMN_STATS_MODE_TRUNCATE = 0,
37+
COLUMN_STATS_MODE_NONE = 1,
38+
} ColumnStatsMode;
39+
40+
/*
41+
* ColumnStatsConfig describes the configuration for column stats.
42+
* - mode: the mode of column stats.
43+
* - truncateLen: the length to truncate the column stats in truncate mode.
44+
*/
45+
typedef struct ColumnStatsConfig
46+
{
47+
ColumnStatsMode mode;
48+
49+
/* used for truncate mode */
50+
size_t truncateLen;
51+
} ColumnStatsConfig;
52+
53+
54+
2555

2656
/*
2757
* DataFileColumnStats stores column statistics for a data file.
@@ -43,6 +73,8 @@ typedef struct DataFileColumnStats
4373
*/
4474
typedef struct DataFileStats
4575
{
76+
char *dataFilePath;
77+
4678
/* number of bytes in the file */
4779
int64 fileSize;
4880

@@ -61,3 +93,26 @@ typedef struct DataFileStats
6193
/* for a new data file with row IDs, the start of the range */
6294
int64 rowIdStart;
6395
} DataFileStats;
96+
97+
typedef struct StatsCollector
98+
{
99+
int64 totalRowCount;
100+
List *dataFileStats;
101+
} StatsCollector;
102+
103+
extern PGDLLEXPORT DataFileStats * DeepCopyDataFileStats(const DataFileStats * stats);
104+
extern PGDLLEXPORT StatsCollector * GetDataFileStatsListFromPGResult(PGresult *result,
105+
List *leafFields,
106+
DataFileSchema * schema);
107+
extern PGDLLEXPORT StatsCollector * ExecuteCopyToCommandOnPGDuckConnection(char *copyCommand,
108+
List *leafFields,
109+
DataFileSchema * schema,
110+
bool disablePreserveInsertionOrder,
111+
char *destinationPath,
112+
CopyDataFormat destinationFormat);
113+
extern PGDLLEXPORT bool ShouldSkipStatistics(LeafField * leafField);
114+
extern PGDLLEXPORT DataFileStats * CreateDataFileStatsForDataFile(char *dataFilePath,
115+
int64 rowCount, int64 deletedRowCount,
116+
List *leafFields);
117+
extern PGDLLEXPORT void ApplyColumnStatsModeForAllFileStats(Oid relationId, List *dataFileStats);
118+
extern PGDLLEXPORT List *GetRemoteParquetColumnStats(char *path, List *leafFields);

pg_lake_engine/include/pg_lake/parquet/field.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#pragma once
3535

3636
#include "nodes/pg_list.h"
37+
#include "pg_lake/pgduck/type.h"
3738

3839
/*
3940
* Reserved _row_id field ID used for Iceberg
@@ -154,3 +155,5 @@ typedef FieldStruct DataFileSchema;
154155
typedef FieldStructElement DataFileSchemaField;
155156

156157
extern PGDLLEXPORT DataFileSchema * DeepCopyDataFileSchema(const DataFileSchema * schema);
158+
extern PGDLLEXPORT Field * DeepCopyField(const Field * field);
159+
extern PGDLLEXPORT bool PGTypeRequiresConversionToIcebergString(Field * field, PGType pgType);

pg_lake_engine/include/pg_lake/parquet/leaf_field.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ typedef struct LeafField
5151

5252
extern PGDLLEXPORT int LeafFieldCompare(const ListCell *a, const ListCell *b);
5353
extern PGDLLEXPORT bool SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * fieldB);
54+
extern PGDLLEXPORT LeafField DeepCopyLeafField(const LeafField * leafField);
55+
extern PGDLLEXPORT LeafField * FindLeafField(List *leafFieldList, int fieldId);
5456
#if PG_VERSION_NUM < 170000
5557
extern PGDLLEXPORT int pg_cmp_s32(int32 a, int32 b);
5658
#endif

pg_lake_engine/include/pg_lake/pgduck/delete_data.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
#include "pg_lake/copy/copy_format.h"
2323
#include "pg_lake/parquet/field.h"
2424
#include "pg_lake/pgduck/read_data.h"
25+
#include "pg_lake/data_file/data_file_stats.h"
2526

26-
extern PGDLLEXPORT void PerformDeleteFromParquet(char *sourceDataFilePath,
27-
List *positionDeleteFiles,
28-
char *deletionFilePath,
29-
char *destinationPath,
30-
CopyDataCompression destinationCompression,
31-
DataFileSchema * schema,
32-
ReadDataStats * stats);
27+
extern PGDLLEXPORT StatsCollector * PerformDeleteFromParquet(char *sourceDataFilePath,
28+
List *positionDeleteFiles,
29+
char *deletionFilePath,
30+
char *destinationPath,
31+
CopyDataCompression destinationCompression,
32+
DataFileSchema * schema,
33+
ReadDataStats * stats,
34+
List *leafFields);

pg_lake_engine/include/pg_lake/pgduck/write_data.h

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "access/tupdesc.h"
2121
#include "pg_lake/copy/copy_format.h"
22+
#include "pg_lake/data_file/data_file_stats.h"
2223
#include "pg_lake/parquet/field.h"
2324
#include "nodes/pg_list.h"
2425

@@ -35,20 +36,22 @@ typedef enum ParquetVersion
3536
/* pg_lake_table.default_parquet_version */
3637
extern PGDLLEXPORT int DefaultParquetVersion;
3738

38-
extern PGDLLEXPORT void ConvertCSVFileTo(char *csvFilePath,
39-
TupleDesc tupleDesc,
40-
int maxLineSize,
41-
char *destinationPath,
42-
CopyDataFormat destinationFormat,
43-
CopyDataCompression destinationCompression,
44-
List *formatOptions,
45-
DataFileSchema * schema);
46-
extern PGDLLEXPORT int64 WriteQueryResultTo(char *query,
47-
char *destinationPath,
48-
CopyDataFormat destinationFormat,
49-
CopyDataCompression destinationCompression,
50-
List *formatOptions,
51-
bool queryHasRowId,
52-
DataFileSchema * schema,
53-
TupleDesc queryTupleDesc);
39+
extern PGDLLEXPORT StatsCollector * ConvertCSVFileTo(char *csvFilePath,
40+
TupleDesc tupleDesc,
41+
int maxLineSize,
42+
char *destinationPath,
43+
CopyDataFormat destinationFormat,
44+
CopyDataCompression destinationCompression,
45+
List *formatOptions,
46+
DataFileSchema * schema,
47+
List *leafFields);
48+
extern PGDLLEXPORT StatsCollector * WriteQueryResultTo(char *query,
49+
char *destinationPath,
50+
CopyDataFormat destinationFormat,
51+
CopyDataCompression destinationCompression,
52+
List *formatOptions,
53+
bool queryHasRowId,
54+
DataFileSchema * schema,
55+
TupleDesc queryTupleDesc,
56+
List *leafFields);
5457
extern PGDLLEXPORT void AppendFields(StringInfo map, DataFileSchema * schema);

pg_lake_engine/pg_lake_engine--3.0--3.1.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,11 @@ CREATE FUNCTION __lake__internal__nsp__.from_hex(text)
3333
LANGUAGE C
3434
IMMUTABLE PARALLEL SAFE STRICT
3535
AS 'MODULE_PATHNAME', $function$pg_lake_internal_dummy_function$function$;
36+
37+
-- Register map types, will be used for parsing DuckDB maps for COPY .. (return_stats)
38+
-- we prefer to create in the extension script to avoid concurrent attempts to create
39+
-- the same map, which may throw errors
40+
WITH text_text_map_name AS
41+
(SELECT map_type.create('TEXT','TEXT') AS name)
42+
SELECT map_type.create('TEXT', name) AS text_map_of_text
43+
FROM text_text_map_name;

0 commit comments

Comments
 (0)