Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2943,13 +2943,12 @@ public Map<String, Long> finishTableExecute(ConnectorSession session, ConnectorT
DeltaLakeTableExecuteHandle executeHandle = (DeltaLakeTableExecuteHandle) tableExecuteHandle;
switch (executeHandle.procedureId()) {
case OPTIMIZE:
finishOptimize(session, executeHandle, fragments, splitSourceInfo);
return ImmutableMap.of();
return finishOptimize(session, executeHandle, fragments, splitSourceInfo).toMap();
}
throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'");
}

private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
private OptimizeResult finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.procedureHandle();
String tableLocation = executeHandle.tableLocation();
Expand All @@ -2959,6 +2958,12 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
.map(DeltaLakeScannedDataFile.class::cast)
.collect(toImmutableSet());

// delete vector
Set<String> filesToDelete = scannedDataFiles.stream()
.map(scannedDataFile -> scannedDataFile.deletionVector().map(DeletionVectorEntry::uniqueId))
.flatMap(Optional::stream)
.collect(Collectors.toSet());

// files to be added
List<DataFileInfo> dataFileInfos = fragments.stream()
.map(Slice::getInput)
Expand Down Expand Up @@ -3002,6 +3007,19 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
}
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Failed to write Delta Lake transaction log entry", e);
}
return new OptimizeResult(scannedDataFiles.size(), filesToDelete.size(), dataFileInfos.size());
}

private record OptimizeResult(long rewrittenDataFiles, long removedDeleteFiles, long addedDataFiles)
Copy link
Copy Markdown
Member

@ebyhr ebyhr Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this record in this connector. Please remove.

Iceberg connector uses a record class because there are 2 return in finishOptimize method.

{
Map<String, Long> toMap()
{
return ImmutableMap.<String, Long>builder()
.put("rewritten_data_files_count", rewrittenDataFiles)
.put("removed_delete_files_count", removedDeleteFiles)
.put("added_data_files_count", addedDataFiles)
.buildOrThrow();
}
}

private long commitOptimizeOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record DeltaLakeScannedDataFile(String path, Map<String, Optional<String>> partitionKeys)
public record DeltaLakeScannedDataFile(String path, Map<String, Optional<String>> partitionKeys, Optional<DeletionVectorEntry> deletionVector)
{
public DeltaLakeScannedDataFile
{
requireNonNull(path, "path is null");
partitionKeys = ImmutableMap.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
requireNonNull(deletionVector, "deletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,13 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
split.getStatisticsPredicate().overlaps(dynamicFilterPredicate))
.collect(toImmutableList());
if (recordScannedFiles) {
filteredSplits.forEach(split -> scannedFilePaths.add(new DeltaLakeScannedDataFile(((DeltaLakeSplit) split).getPath(), ((DeltaLakeSplit) split).getPartitionKeys())));
filteredSplits.forEach(split -> {
DeltaLakeSplit deltaLakeSplit = (DeltaLakeSplit) split;
scannedFilePaths.add(new DeltaLakeScannedDataFile(
deltaLakeSplit.getPath(),
deltaLakeSplit.getPartitionKeys(),
deltaLakeSplit.getDeletionVector()));
});
}
return new ConnectorSplitBatch(filteredSplits, noMoreSplits);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,17 +385,14 @@ public void testOptimizeRewritesTable()
String tableLocation = getLocationForTable(bucketName, tableName);
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (location = '" + tableLocation + "')");
try {
// DistributedQueryRunner sets node-scheduler.include-coordinator by default, so include coordinator
int workerCount = getQueryRunner().getNodeCount();

assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one')", 1);

for (int i = 0; i < 3; i++) {
Set<String> initialFiles = getActiveFiles(tableName);
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rather do a preparatory commit that makes use of withSingleWriterPerTask to explain that we're aiming for stable results.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but, it seems an un-related to the change, is it required by the current(pr's) change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjian2664 @findinpath yep, this is an un-related change: to avoid flaky test results, I’ll split these commits into separate ones.

Set<String> filesAfterOptimize = getActiveFiles(tableName);
assertThat(filesAfterOptimize)
.hasSizeBetween(1, workerCount)
.hasSize(1)
.containsExactlyElementsOf(initialFiles);
}

Expand Down Expand Up @@ -444,18 +441,15 @@ public void testOptimizeRewritesPartitionedTable()
String tableLocation = getLocationForTable(bucketName, tableName);
assertUpdate("CREATE TABLE " + tableName + " (key integer, value varchar) WITH (location = '" + tableLocation + "', partitioned_by = ARRAY['key'])");
try {
// DistributedQueryRunner sets node-scheduler.include-coordinator by default, so include coordinator
int workerCount = getQueryRunner().getNodeCount();

assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one')", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'two')", 1);

for (int i = 0; i < 3; i++) {
Set<String> initialFiles = getActiveFiles(tableName);
computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
Set<String> filesAfterOptimize = getActiveFiles(tableName);
assertThat(filesAfterOptimize)
.hasSizeBetween(1, workerCount)
.hasSize(2)
.containsExactlyInAnyOrderElementsOf(initialFiles);
}
assertQuery("SELECT * FROM " + tableName, "VALUES(1, 'one'), (2, 'two')");
Expand Down Expand Up @@ -1949,12 +1943,12 @@ public void testOptimize()
// Verify we have sufficiently many test rows with respect to worker count.
.hasSizeGreaterThan(workerCount);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName))
.matches("VALUES (BIGINT '65', VARCHAR 'eleven zwölf trzynaście quatorze пʼятнадцять')");
Set<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles)
.hasSizeBetween(1, workerCount)
.hasSize(1)
.doesNotContainAnyElementsOf(initialFiles);
// No files should be removed (this is VACUUM's job)
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles));
Expand Down Expand Up @@ -2005,21 +1999,74 @@ public void testOptimizeWithPartitionedTable()
Set<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(9);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertUpdate(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE",
"VALUES ('rewritten_data_files_count', 3), ('removed_delete_files_count', 0), ('added_data_files_count', 1)");

assertThat(query("SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY value) FROM " + tableName))
.matches("VALUES (BIGINT '508', VARCHAR 'ONE Three four one one one tHrEe three two')");

Set<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles)
.hasSizeBetween(7, initialFiles.size());
.hasSize(7);
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles));
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@Test
public void testOptimizeWithDeletionVectors()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseDeltaLakeConnectorSmokeTest is used for running tests on several storages. Please consider reverting changes in this class, and updating TestDeltaLakeConnectorTest instead.

{
String tableName = "test_optimize_partitioned_table_" + randomNameSuffix();
String tableLocation = getLocationForTable(bucketName, tableName);
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "CREATE TABLE " + tableName + " WITH (deletion_vectors_enabled = true, location = '" + tableLocation + "') AS SELECT * FROM tpch.tiny.nation");
try {
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation");
Set<String> initFile = getActiveFiles(tableName);
assertThat(initFile).hasSize(6);
assertQuerySucceeds("DELETE FROM " + tableName + " WHERE nationkey < 5");
assertUpdate(withSingleWriterPerTask(getSession()),"ALTER TABLE " + tableName + " EXECUTE optimize",
"VALUES ('rewritten_data_files_count', 6), ('removed_delete_files_count', 6), ('added_data_files_count', 1)");
assertThat(getActiveFiles(tableName)).hasSize(1);
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@Test
public void testOptimizeWithPartitionedTableAndDeleteVector()
{
String tableName = "test_optimize_partitioned_table_" + randomNameSuffix();
String tableLocation = getLocationForTable(bucketName, tableName);
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "CREATE TABLE " + tableName + " WITH (deletion_vectors_enabled = true, partitioned_by = ARRAY['regionkey'], location = '" + tableLocation + "') AS SELECT nationkey, regionkey FROM tpch.tiny.nation");
try {
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");
assertQuerySucceeds(withSingleWriterPerTask(getSession()), "INSERT INTO " + tableName + " SELECT nationkey, regionkey FROM tpch.tiny.nation");

Set<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(30);
assertQuerySucceeds("DELETE FROM " + tableName + " WHERE nationkey < 5");

assertUpdate(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE",
"VALUES ('rewritten_data_files_count', 30), ('removed_delete_files_count', 18), ('added_data_files_count', 5)");
Set<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles)
.hasSize(5);
}
finally {
assertUpdate("DROP TABLE " + tableName);
}
}

@Test
public void testOptimizeWithEnforcedRepartitioning()
{
Expand All @@ -2046,7 +2093,7 @@ public void testOptimizeWithEnforcedRepartitioning()
Set<String> initialFiles = getActiveFiles(tableName, currentSession);
assertThat(initialFiles).hasSize(10);

computeActual(currentSession, "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(withSingleWriterPerTask(currentSession), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertThat(query(currentSession, "SELECT sum(key), listagg(value, ' ') WITHIN GROUP (ORDER BY value) FROM " + tableName))
.matches("VALUES (BIGINT '55', VARCHAR 'one one one one one one one three two two')");
Expand All @@ -2060,6 +2107,13 @@ public void testOptimizeWithEnforcedRepartitioning()
}
}

private Session withSingleWriterPerTask(Session session)
{
return Session.builder(session)
.setSystemProperty("task_min_writer_count", "1")
.build();
}

private void fillWithInserts(String tableName, String values, int toCreate)
{
for (int i = 0; i < toCreate; i++) {
Expand Down Expand Up @@ -2144,7 +2198,7 @@ public void testOptimizeUsingForcedPartitioning()
Set<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles).hasSize(10);

computeActual("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
computeActual(withSingleWriterPerTask(getSession()), "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");

assertThat(query("SELECT " +
"sum(value1), " +
Expand Down