Skip to content

Commit ae8ceb4

Browse files
Ray ChengMax-Cheng
authored andcommitted
DeltaLake support optimzie output metrics in result
1 parent 9f34e75 commit ae8ceb4

File tree

3 files changed

+62
-12
lines changed

3 files changed

+62
-12
lines changed

docs/src/main/sphinx/connector/delta-lake.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,13 @@ EXECUTE <alter-table-execute>`.
714714

715715
```{include} optimize.fragment
716716
```
717+
```text
718+
metric_name | metric_value
719+
----------------------------+--------------
720+
rewritten_data_files_count | 2
721+
removed_delete_files_count | 0
722+
added_data_files_count | 1
723+
```
717724

718725
Use a `WHERE` clause with [metadata columns](delta-lake-special-columns) to filter
719726
which files are optimized.

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2943,13 +2943,12 @@ public Map<String, Long> finishTableExecute(ConnectorSession session, ConnectorT
29432943
DeltaLakeTableExecuteHandle executeHandle = (DeltaLakeTableExecuteHandle) tableExecuteHandle;
29442944
switch (executeHandle.procedureId()) {
29452945
case OPTIMIZE:
2946-
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
2947-
return ImmutableMap.of();
2946+
return finishOptimize(session, executeHandle, fragments, splitSourceInfo);
29482947
}
29492948
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'");
29502949
}
29512950

2952-
private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
2951+
private Map<String, Long> finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
29532952
{
29542953
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.procedureHandle();
29552954
String tableLocation = executeHandle.tableLocation();
@@ -3002,6 +3001,20 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
30023001
}
30033002
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
30043003
}
3004+
// No files should be removed (this is VACUUM's job)
3005+
return new OptimizeResult(scannedDataFiles.size(), 0, dataFileInfos.size()).toMap();
3006+
}
3007+
3008+
private record OptimizeResult(long rewrittenDataFiles, long removedDeleteFiles, long addedDataFiles)
3009+
{
3010+
Map<String, Long> toMap()
3011+
{
3012+
return ImmutableMap.<String, Long>builder()
3013+
.put("rewritten_data_files_count", rewrittenDataFiles)
3014+
.put("removed_delete_files_count", removedDeleteFiles)
3015+
.put("added_data_files_count", addedDataFiles)
3016+
.buildOrThrow();
3017+
}
30053018
}
30063019

30073020
private long commitOptimizeOperation(

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,10 @@ public void testOptimizeRewritesTable()
392392

393393
for (int i = 0; i < 3; i++) {
394394
Set<String> initialFiles = getActiveFiles(tableName);
395-
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
395+
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
396396
Set<String> filesAfterOptimize = getActiveFiles(tableName);
397397
assertThat(filesAfterOptimize)
398-
.hasSizeBetween(1, workerCount)
398+
.hasSize(1)
399399
.containsExactlyElementsOf(initialFiles);
400400
}
401401

@@ -1949,18 +1949,25 @@ public void testOptimize()
19491949
// Verify we have sufficiently many test rows with respect to worker count.
19501950
.hasSizeGreaterThan(workerCount);
19511951

1952-
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
1952+
computeActual(withSingleWriterPerTask(getSession()),"ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
19531953
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
19541954
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
19551955
Set<String> updatedFiles = getActiveFiles(tableName);
19561956
assertThat(updatedFiles)
1957-
.hasSizeBetween(1, workerCount)
1957+
.hasSize(1)
19581958
.doesNotContainAnyElementsOf(initialFiles);
19591959
// No files should be removed (this is VACUUM's job)
19601960
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles));
19611961

19621962
// optimize with low retention threshold, nothing should change
1963-
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')");
1963+
MaterializedResult lowThresholdResult = computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE (file_size_threshold => '33B')");
1964+
Map<String, Long> lowThresholdMetrics = lowThresholdResult.getMaterializedRows().stream()
1965+
.collect(ImmutableMap.toImmutableMap(
1966+
row -> (String) row.getField(0),
1967+
row -> (Long) row.getField(1)));
1968+
assertThat(lowThresholdMetrics.get("rewritten_data_files_count")).isEqualTo(0L);
1969+
assertThat(lowThresholdMetrics.get("added_data_files_count")).isEqualTo(0L);
1970+
assertThat(lowThresholdMetrics.get("removed_delete_files_count")).isEqualTo(0L);
19641971
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
19651972
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
19661973
assertThat(getActiveFiles(tableName)).isEqualTo(updatedFiles);
@@ -2005,14 +2012,22 @@ public void testOptimizeWithPartitionedTable()
20052012
Set<String> initialFiles = getActiveFiles(tableName);
20062013
assertThat(initialFiles).hasSize(9);
20072014

2008-
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
2015+
MaterializedResult optimizeResult = computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
2016+
// Verify optimize metrics
2017+
Map<String, Long> metrics = optimizeResult.getMaterializedRows().stream()
2018+
.collect(ImmutableMap.toImmutableMap(
2019+
row -> (String) row.getField(0),
2020+
row -> (Long) row.getField(1)));
2021+
assertThat(metrics.get("rewritten_data_files_count")).isEqualTo(3L);
2022+
assertThat(metrics.get("added_data_files_count")).isEqualTo(1L);
2023+
assertThat(metrics.get("removed_delete_files_count")).isEqualTo(0L);
20092024

20102025
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY value) FROM " + tableName))
20112026
.matches("VALUES (BIGINT '508', VARCHAR 'ONE Three four one one one tHrEe three two')");
20122027

20132028
Set<String> updatedFiles = getActiveFiles(tableName);
20142029
assertThat(updatedFiles)
2015-
.hasSizeBetween(7, initialFiles.size());
2030+
.hasSize(7);
20162031
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles));
20172032
}
20182033
finally {
@@ -2046,7 +2061,15 @@ public void testOptimizeWithEnforcedRepartitioning()
20462061
Set<String> initialFiles = getActiveFiles(tableName, currentSession);
20472062
assertThat(initialFiles).hasSize(10);
20482063

2049-
computeActual(currentSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
2064+
MaterializedResult optimizeResult = computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
2065+
2066+
Map<String, Long> metrics = optimizeResult.getMaterializedRows().stream()
2067+
.collect(ImmutableMap.toImmutableMap(
2068+
row -> (String) row.getField(0),
2069+
row -> (Long) row.getField(1)));
2070+
assertThat(metrics.get("rewritten_data_files_count")).isEqualTo(9L);
2071+
assertThat(metrics.get("added_data_files_count")).isEqualTo(2L);
2072+
assertThat(metrics.get("removed_delete_files_count")).isEqualTo(0L);
20502073

20512074
assertThat(query(currentSession, "SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY value) FROM " + tableName))
20522075
.matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')");
@@ -2144,7 +2167,7 @@ public void testOptimizeUsingForcedPartitioning()
21442167
Set<String> initialFiles = getActiveFiles(tableName);
21452168
assertThat(initialFiles).hasSize(10);
21462169

2147-
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
2170+
computeActual(withSingleWriterPerTask(getSession()),"ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
21482171

21492172
assertThat(query("SELECT " +
21502173
"sum(value1), " +
@@ -2878,6 +2901,13 @@ private Set<String> getActiveFiles(String tableName, Session session)
28782901
.collect(toImmutableSet());
28792902
}
28802903

2904+
private Session withSingleWriterPerTask(Session session)
2905+
{
2906+
return Session.builder(session)
2907+
.setSystemProperty("task_min_writer_count", "1")
2908+
.build();
2909+
}
2910+
28812911
private Set<String> getAllDataFilesFromTableDirectory(String tableName)
28822912
{
28832913
return getTableFiles(tableName).stream()

0 commit comments

Comments
 (0)