Skip to content

Commit dd6013d

Browse files
committed
Improve logging by reducing noise and logging high-level events
1 parent 663413b commit dd6013d

8 files changed

Lines changed: 134 additions & 12 deletions

File tree

duckdb_pglake/src/fs/file_cache_manager.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ FileCacheManager::RemoveCacheFileInternal(FileSystem &file_system, string finalC
464464
fileExists = file_system.FileExists(filePath);
465465
if (fileExists)
466466
{
467-
PGDUCK_SERVER_LOG("removing %s from cache", filePath.c_str());
467+
PGDUCK_SERVER_DEBUG("removing %s from cache", filePath.c_str());
468468
file_system.RemoveFile(filePath);
469469
}
470470
}
@@ -621,8 +621,8 @@ FileCacheManager::ManageCache(ClientContext &context, int64_t maxCacheSize)
621621

622622
if (!cacheFile.isCandidate)
623623
{
624-
PGDUCK_SERVER_LOG("removing %s from cache (%" PRIu64 \
625-
" bytes)", cacheFile.cacheFilePath.c_str(), cacheFile.fileSize);
624+
PGDUCK_SERVER_DEBUG("removing %s from cache (%" PRIu64 \
625+
" bytes)", cacheFile.cacheFilePath.c_str(), cacheFile.fileSize);
626626

627627
/* for background tasks, we skip if lock cannot be acquired */
628628
bool waitForLock = false;

pg_lake_copy/src/copy/copy.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "pg_lake/copy/copy.h"
3838
#include "pg_lake/csv/csv_options.h"
3939
#include "pg_lake/csv/csv_writer.h"
40+
#include "pg_lake/data_file/data_file_stats.h"
4041
#include "pg_lake/describe/describe.h"
4142
#include "pg_lake/extensions/pg_lake_copy.h"
4243
#include "pg_lake/extensions/pg_parquet.h"
@@ -926,15 +927,32 @@ ProcessPgLakeCopyTo(CopyStmt *copyStmt, ParseState *pstate, Relation relation,
926927
/*
927928
* Copy the CSV file to the destination path in the desired format.
928929
*/
929-
ConvertCSVFileTo(tempCSVPath, tupleDesc, maximumLineLength,
930-
destinationPath, destinationFormat, destinationCompression,
931-
copyStmt->options, schema, NIL);
930+
StatsCollector *writeStats =
931+
ConvertCSVFileTo(tempCSVPath, tupleDesc, maximumLineLength,
932+
destinationPath, destinationFormat, destinationCompression,
933+
copyStmt->options, schema, NIL);
932934

933935
if (IsCopyToStdout(copyStmt))
934936
{
935937
/* send the temporary file in the target format to the client */
936938
CopyFileToOutput(destinationPath, tupleDesc->natts, true);
937939
}
940+
else if (IsSupportedURL(copyStmt->filename) && writeStats != NULL)
941+
{
942+
int64 totalBytes = 0;
943+
ListCell *cell = NULL;
944+
945+
foreach(cell, writeStats->dataFileStats)
946+
{
947+
DataFileStats *fileStats = lfirst(cell);
948+
949+
totalBytes += fileStats->fileSize;
950+
}
951+
952+
ereport(LOG,
953+
(errmsg("pg_lake: wrote " INT64_FORMAT " rows (" INT64_FORMAT " bytes) to %s",
954+
writeStats->totalRowCount, totalBytes, copyStmt->filename)));
955+
}
938956
}
939957

940958

pg_lake_table/include/pg_lake/fdw/writable_table.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@
3838

3939
#define DEFAULT_MIN_INPUT_FILES (5)
4040

41+
/*
42+
* CompactionStats aggregates file/byte/row totals across a single VACUUM
43+
* compaction run on one table.
44+
*/
45+
typedef struct CompactionStats
46+
{
47+
int64 filesRemoved;
48+
int64 bytesRemoved;
49+
int64 rowsRemoved;
50+
51+
int64 filesAdded;
52+
int64 bytesAdded;
53+
int64 rowsAdded;
54+
55+
/* position-deleted rows materialized away by this compaction */
56+
int64 positionDeletedRowsResolved;
57+
} CompactionStats;
58+
4159
/*
4260
* DataFileModificationType reflects a type of modification.
4361
*/
@@ -105,7 +123,8 @@ extern PGDLLEXPORT void ApplyDataFileModifications(Relation rel, List *modificat
105123
extern PGDLLEXPORT void RemoveAllDataFilesFromTable(Oid relationId);
106124
extern PGDLLEXPORT void RemoveAllDataFilesFromPgLakeCatalogFromTable(Oid relationId);
107125
extern PGDLLEXPORT bool CompactDataFiles(Oid relaitonId, TimestampTz compactionStartTime,
108-
bool forceMerge, bool isVerbose);
126+
bool forceMerge, bool isVerbose,
127+
CompactionStats * runStats);
109128
extern PGDLLEXPORT void CompactMetadata(Oid relationId, bool isVerbose);
110129
extern PGDLLEXPORT List *GetPositionDeleteFilesForDataFiles(Oid relationId, List *dataFiles,
111130
Snapshot snapshot, uint64 *rowCount);

pg_lake_table/src/ddl/create_table.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,6 +1026,10 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params)
10261026

10271027
ApplyDDLChanges(relationId, ddlOps);
10281028

1029+
ereport(LOG,
1030+
(errmsg("pg_lake: created iceberg table %s at %s",
1031+
GetQualifiedRelationName(relationId), location)));
1032+
10291033
/* signal that we already executed parent process utility */
10301034
return true;
10311035
}

pg_lake_table/src/ddl/drop_table.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,11 @@ DropTableAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
291291
}
292292
else
293293
{
294+
char *qualifiedName = GetQualifiedRelationName(objectId);
295+
char *queryArguments = "";
296+
char *tableLocation = GetWritableTableLocation(objectId, &queryArguments);
297+
int64 tableSize = GetTableSizeFromCatalog(objectId);
298+
294299
/* Cleanup rowid sequence, if we have it */
295300
DropRowIdSequenceForRelation(objectId);
296301

@@ -306,6 +311,11 @@ DropTableAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
306311

307312
if (catalogType == REST_CATALOG_READ_WRITE)
308313
RecordRestCatalogRequestInTx(objectId, REST_CATALOG_DROP_TABLE, NULL);
314+
315+
ereport(LOG,
316+
(errmsg("pg_lake: dropped iceberg table %s at %s "
317+
"(" INT64_FORMAT " bytes)",
318+
qualifiedName, tableLocation, tableSize)));
309319
}
310320
}
311321
else if (get_rel_type_id(objectId) != InvalidOid && subId != 0)

pg_lake_table/src/ddl/vacuum.c

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "storage/lmgr.h"
3535
#include "pg_lake/cleanup/deletion_queue.h"
3636
#include "pg_lake/cleanup/in_progress_files.h"
37+
#include "pg_lake/fdw/data_files_catalog.h"
3738
#include "pg_lake/ddl/utility_hook.h"
3839
#include "pg_lake/ddl/vacuum.h"
3940
#include "pg_lake/extensions/pg_lake_iceberg.h"
@@ -670,6 +671,7 @@ VacuumCompactDataFiles(Oid relationId, bool isFull, bool isVerbose)
670671
{
671672
volatile bool continueCompaction = false;
672673
int compactionCount = 0;
674+
CompactionStats runStats = {0};
673675

674676
/*
675677
* Compact files that are created before VACUUM starts. We do this because
@@ -710,7 +712,8 @@ VacuumCompactDataFiles(Oid relationId, bool isFull, bool isVerbose)
710712
INJECTION_POINT_COMPAT("compact-files-before-compact");
711713

712714
/* do compaction */
713-
continueCompaction = CompactDataFiles(relationId, compactionStartTime, isFull, isVerbose);
715+
continueCompaction = CompactDataFiles(relationId, compactionStartTime,
716+
isFull, isVerbose, &runStats);
714717

715718
VacuumConsumeTrackedIcebergMetadataChanges(isVerbose);
716719

@@ -749,6 +752,26 @@ VacuumCompactDataFiles(Oid relationId, bool isFull, bool isVerbose)
749752
compactionCount++;
750753
}
751754
while (continueCompaction && compactionCount < MaxCompactionsPerVacuum);
755+
756+
if (runStats.filesRemoved > 0 || runStats.filesAdded > 0)
757+
{
758+
char *positionDeleteSuffix = runStats.positionDeletedRowsResolved > 0
759+
? psprintf(", resolved " INT64_FORMAT " position-deleted rows",
760+
runStats.positionDeletedRowsResolved)
761+
: "";
762+
int64 tableSize = GetTableSizeFromCatalog(relationId);
763+
764+
ereport(LOG,
765+
(errmsg("pg_lake: compacted iceberg table %s: "
766+
"rewrote " INT64_FORMAT " files (" INT64_FORMAT " bytes, " INT64_FORMAT " rows) "
767+
"into " INT64_FORMAT " files (" INT64_FORMAT " bytes, " INT64_FORMAT " rows)%s; "
768+
"table is now " INT64_FORMAT " bytes",
769+
GetQualifiedRelationName(relationId),
770+
runStats.filesRemoved, runStats.bytesRemoved, runStats.rowsRemoved,
771+
runStats.filesAdded, runStats.bytesAdded, runStats.rowsAdded,
772+
positionDeleteSuffix,
773+
tableSize)));
774+
}
752775
}
753776

754777

@@ -909,6 +932,18 @@ VacuumRemoveDeletionQueueRecords(Oid relationId, bool isFull, bool isVerbose)
909932
while (!isFull /* when isFull, we'll remove all files */
910933
&& totalFilesRemoved < MaxFileRemovalsPerVacuum /* per-vacuum limit */
911934
&& hasRemainingFiles /* no more files to remove */ );
935+
936+
if (totalFilesRemoved > 0)
937+
{
938+
if (relationId != InvalidOid)
939+
ereport(LOG,
940+
(errmsg("pg_lake: expired %d files from iceberg table %s",
941+
totalFilesRemoved, GetQualifiedRelationName(relationId))));
942+
else
943+
ereport(LOG,
944+
(errmsg("pg_lake: expired %d files from dropped iceberg tables",
945+
totalFilesRemoved)));
946+
}
912947
}
913948

914949
/*
@@ -987,6 +1022,18 @@ VacuumRemoveInProgressFiles(Oid relationId, bool isFull, bool isVerbose)
9871022
while (!isFull /* when isFull, we'll remove all files */
9881023
&& totalFilesRemoved < MaxFileRemovalsPerVacuum /* per-vacuum limit */
9891024
&& hasRemainingFiles /* no more files to remove */ );
1025+
1026+
if (totalFilesRemoved > 0)
1027+
{
1028+
if (relationId != InvalidOid)
1029+
ereport(LOG,
1030+
(errmsg("pg_lake: cleaned up %d orphaned files from iceberg table %s",
1031+
totalFilesRemoved, GetQualifiedRelationName(relationId))));
1032+
else
1033+
ereport(LOG,
1034+
(errmsg("pg_lake: cleaned up %d orphaned files from dropped iceberg tables",
1035+
totalFilesRemoved)));
1036+
}
9901037
}
9911038

9921039

pg_lake_table/src/fdw/writable_table.c

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ static HTAB *GroupDataFilesByPartition(List *dataFiles, TimestampTz compactionSt
113113
static List *FilterCompactionCandidates(List *dataFiles, TimestampTz compactionStartTime, bool forceMerge,
114114
bool forceCompactDeletions);
115115
static List *TryCompactDataFiles(Oid relationId, TupleDesc tupleDescriptor, List *candidates,
116+
CompactionStats * runStats,
116117
PgLakeTableType tableType, List *options, bool forceMerge, bool isVerbose);
117118
#ifdef USE_ASSERT_CHECKING
118119
static void AssertAllFilesHaveSamePartition(List *dataFiles);
@@ -149,7 +150,7 @@ int TargetFileSizeMB = DEFAULT_TARGET_FILE_SIZE_MB;
149150
int VacuumCompactMinInputFiles = DEFAULT_MIN_INPUT_FILES;
150151

151152
/* pg_lake_table.write_log_level */
152-
int WriteLogLevel = LOG;
153+
int WriteLogLevel = DEBUG1;
153154

154155

155156
/*
@@ -710,7 +711,7 @@ RemoveAllDataFilesFromPgLakeCatalogFromTable(Oid relationId)
710711
*/
711712
bool
712713
CompactDataFiles(Oid relationId, TimestampTz compactionStartTime,
713-
bool forceMerge, bool isVerbose)
714+
bool forceMerge, bool isVerbose, CompactionStats * runStats)
714715
{
715716
/* prevent concurrent update/delete which might rewrite files too */
716717
LockTableForUpdate(relationId);
@@ -765,7 +766,7 @@ CompactDataFiles(Oid relationId, TimestampTz compactionStartTime,
765766
}
766767

767768
List *newFileOps = TryCompactDataFiles(relationId, tupleDescriptor, entry->dataFiles,
768-
tableType, options, forceMerge, isVerbose);
769+
runStats, tableType, options, forceMerge, isVerbose);
769770

770771
table_close(rel, NoLock);
771772
PopActiveSnapshot();
@@ -811,6 +812,7 @@ AssertAllFilesHaveSamePartition(List *dataFiles)
811812
*/
812813
static List *
813814
TryCompactDataFiles(Oid relationId, TupleDesc tupleDescriptor, List *candidates,
815+
CompactionStats * runStats,
814816
PgLakeTableType tableType, List *options, bool forceMerge, bool isVerbose)
815817
{
816818
#ifdef USE_ASSERT_CHECKING
@@ -867,6 +869,14 @@ TryCompactDataFiles(Oid relationId, TupleDesc tupleDescriptor, List *candidates,
867869

868870
fileSizeSum += dataFile->stats.fileSize;
869871

872+
if (runStats != NULL)
873+
{
874+
runStats->filesRemoved++;
875+
runStats->bytesRemoved += dataFile->stats.fileSize;
876+
runStats->rowsRemoved += dataFile->stats.rowCount - dataFile->stats.deletedRowCount;
877+
runStats->positionDeletedRowsResolved += dataFile->stats.deletedRowCount;
878+
}
879+
870880
filePathsToCompact = lappend(filePathsToCompact, dataFile->path);
871881
}
872882

@@ -953,6 +963,20 @@ TryCompactDataFiles(Oid relationId, TupleDesc tupleDescriptor, List *candidates,
953963

954964
metadataOperations = list_concat(metadataOperations, newFileOps);
955965

966+
if (runStats != NULL)
967+
{
968+
ListCell *newFileCell = NULL;
969+
970+
foreach(newFileCell, newFileOps)
971+
{
972+
TableMetadataOperation *addOp = lfirst(newFileCell);
973+
974+
runStats->filesAdded++;
975+
runStats->bytesAdded += addOp->dataFileStats.fileSize;
976+
runStats->rowsAdded += addOp->dataFileStats.rowCount;
977+
}
978+
}
979+
956980
if (hasRowIds)
957981
{
958982
/* get the row ID ranges from the compacted files */

pg_lake_table/src/init.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ _PG_init(void)
348348
"additions/removals.",
349349
NULL,
350350
&WriteLogLevel,
351-
LOG,
351+
DEBUG1,
352352
LogLevelOptions,
353353
PGC_SUSET,
354354
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,

0 commit comments

Comments
 (0)