Skip to content

Commit 3abfa15

Browse files
committed
Add execution metrics output for Delta Lake optimize procedure
1 parent ff1ef00 commit 3abfa15

File tree

4 files changed

+85
-6
lines changed

4 files changed

+85
-6
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2943,13 +2943,12 @@ public Map<String, Long> finishTableExecute(ConnectorSession session, ConnectorT
29432943
DeltaLakeTableExecuteHandle executeHandle = (DeltaLakeTableExecuteHandle) tableExecuteHandle;
29442944
switch (executeHandle.procedureId()) {
29452945
case OPTIMIZE:
2946-
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
2947-
return ImmutableMap.of();
2946+
return finishOptimize(session, executeHandle, fragments, splitSourceInfo).toMap();
29482947
}
29492948
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'");
29502949
}
29512950

2952-
private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
2951+
private OptimizeResult finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
29532952
{
29542953
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.procedureHandle();
29552954
String tableLocation = executeHandle.tableLocation();
@@ -2959,6 +2958,12 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
29592958
.map(DeltaLakeScannedDataFile.class::cast)
29602959
.collect(toImmutableSet());
29612960

2961+
// delete vector
2962+
Set<String> filesToDelete = scannedDataFiles.stream()
2963+
.map(scannedDataFile -> scannedDataFile.deletionVector().map(DeletionVectorEntry::uniqueId))
2964+
.flatMap(Optional::stream)
2965+
.collect(Collectors.toSet());
2966+
29622967
// files to be added
29632968
List<DataFileInfo> dataFileInfos = fragments.stream()
29642969
.map(Slice::getInput)
@@ -3002,6 +3007,19 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
30023007
}
30033008
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
30043009
}
3010+
return new OptimizeResult(scannedDataFiles.size(), filesToDelete.size(), dataFileInfos.size());
3011+
}
3012+
3013+
private record OptimizeResult(long rewrittenDataFiles, long removedDeleteFiles, long addedDataFiles)
3014+
{
3015+
Map<String, Long> toMap()
3016+
{
3017+
return ImmutableMap.<String, Long>builder()
3018+
.put("rewritten_data_files_count", rewrittenDataFiles)
3019+
.put("removed_delete_files_count", removedDeleteFiles)
3020+
.put("added_data_files_count", addedDataFiles)
3021+
.buildOrThrow();
3022+
}
30053023
}
30063024

30073025
private long commitOptimizeOperation(

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeScannedDataFile.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@
1414
package io.trino.plugin.deltalake;
1515

1616
import com.google.common.collect.ImmutableMap;
17+
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
1718

1819
import java.util.Map;
1920
import java.util.Optional;
2021

2122
import static java.util.Objects.requireNonNull;
2223

23-
public record DeltaLakeScannedDataFile(String path, Map<String, Optional<String>> partitionKeys)
24+
public record DeltaLakeScannedDataFile(String path, Map<String, Optional<String>> partitionKeys, Optional<DeletionVectorEntry> deletionVector)
2425
{
2526
public DeltaLakeScannedDataFile
2627
{
2728
requireNonNull(path, "path is null");
2829
partitionKeys = ImmutableMap.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
30+
requireNonNull(deletionVector, "deletionVector is null");
2931
}
3032
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitSource.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,13 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
146146
split.getStatisticsPredicate().overlaps(dynamicFilterPredicate))
147147
.collect(toImmutableList());
148148
if (recordScannedFiles) {
149-
filteredSplits.forEach(split -> scannedFilePaths.add(new DeltaLakeScannedDataFile(((DeltaLakeSplit) split).getPath(), ((DeltaLakeSplit) split).getPartitionKeys())));
149+
filteredSplits.forEach(split -> {
150+
DeltaLakeSplit deltaLakeSplit = (DeltaLakeSplit) split;
151+
scannedFilePaths.add(new DeltaLakeScannedDataFile(
152+
deltaLakeSplit.getPath(),
153+
deltaLakeSplit.getPartitionKeys(),
154+
deltaLakeSplit.getDeletionVector()));
155+
});
150156
}
151157
return new ConnectorSplitBatch(filteredSplits, noMoreSplits);
152158
},

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1999,7 +1999,8 @@ public void testOptimizeWithPartitionedTable()
19991999
Set<String> initialFiles = getActiveFiles(tableName);
20002000
assertThat(initialFiles).hasSize(9);
20012001

2002-
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
2002+
assertUpdate(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE",
2003+
"VALUES ('rewritten_data_files_count', 3), ('removed_delete_files_count', 0), ('added_data_files_count', 1)");
20032004

20042005
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY value) FROM " + tableName))
20052006
.matches("VALUES (BIGINT '508', VARCHAR 'ONE Three four one one one tHrEe three two')");
@@ -2014,6 +2015,58 @@ public void testOptimizeWithPartitionedTable()
20142015
}
20152016
}
20162017

2018+
@Test
2019+
public void testOptimizeWithDeletionVectors()
2020+
{
2021+
String tableName = "test_optimize_partitioned_table_" + randomNameSuffix();
2022+
String tableLocation = getLocationForTable(bucketName, tableName);
2023+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "CREATE TABLE " + tableName + " WITH (deletion_vectors_enabled = true, location = '" + tableLocation + "') AS SELECT * FROM tpch.tiny.nation");
2024+
try {
2025+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
2026+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
2027+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
2028+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
2029+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
2030+
Set<String> initFile = getActiveFiles(tableName);
2031+
assertThat(initFile).hasSize(6);
2032+
assertQuerySucceeds("DELETE FROM " + tableName + " WHERE nationkey < 5");
2033+
assertUpdate(withSingleWriterPerTask(getSession()),"ALTER TABLE " + tableName + " EXECUTE optimize",
2034+
"VALUES ('rewritten_data_files_count', 6), ('removed_delete_files_count', 6), ('added_data_files_count', 1)");
2035+
assertThat(getActiveFiles(tableName)).hasSize(1);
2036+
}
2037+
finally {
2038+
assertUpdate("DROP TABLE " + tableName);
2039+
}
2040+
}
2041+
2042+
@Test
2043+
public void testOptimizeWithPartitionedTableAndDeleteVector()
2044+
{
2045+
String tableName = "test_optimize_partitioned_table_" + randomNameSuffix();
2046+
String tableLocation = getLocationForTable(bucketName, tableName);
2047+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "CREATE TABLE " + tableName + " WITH (deletion_vectors_enabled = true, partitioned_by = ARRAY['regionkey'], location = '" + tableLocation + "') AS SELECT nationkey, regionkey FROM tpch.tiny.nation");
2048+
try {
2049+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
2050+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
2051+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
2052+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
2053+
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
2054+
2055+
Set<String> initialFiles = getActiveFiles(tableName);
2056+
assertThat(initialFiles).hasSize(30);
2057+
assertQuerySucceeds("DELETE FROM " + tableName + " WHERE nationkey < 5");
2058+
2059+
assertUpdate(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE",
2060+
"VALUES ('rewritten_data_files_count', 30), ('removed_delete_files_count', 18), ('added_data_files_count', 5)");
2061+
Set<String> updatedFiles = getActiveFiles(tableName);
2062+
assertThat(updatedFiles)
2063+
.hasSize(5);
2064+
}
2065+
finally {
2066+
assertUpdate("DROP TABLE " + tableName);
2067+
}
2068+
}
2069+
20172070
@Test
20182071
public void testOptimizeWithEnforcedRepartitioning()
20192072
{

0 commit comments

Comments
 (0)