Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public record CommitTaskData(
Optional<String> partitionDataJson,
FileContent content,
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets)
Optional<List<Long>> fileSplitOffsets,
Optional<byte[]> serializedDeletionVector)
{
public CommitTaskData
{
Expand All @@ -41,5 +42,6 @@ public record CommitTaskData(
requireNonNull(content, "content is null");
requireNonNull(referencedDataFile, "referencedDataFile is null");
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
requireNonNull(serializedDeletionVector, "serializedDeletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public enum IcebergErrorCode
ICEBERG_WRITER_CLOSE_ERROR(14, EXTERNAL),
ICEBERG_MISSING_METADATA(15, EXTERNAL),
ICEBERG_WRITER_DATA_ERROR(16, EXTERNAL),
ICEBERG_UNSUPPORTED_VIEW_DIALECT(17, EXTERNAL)
ICEBERG_UNSUPPORTED_VIEW_DIALECT(17, EXTERNAL),
ICEBERG_DELETION_VECTOR_TOO_LARGE(18, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.delete.DeletionVector;
import io.trino.plugin.iceberg.delete.PositionDeleteWriter;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
Expand All @@ -26,13 +28,13 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.MergePage;
import io.trino.spi.type.VarcharType;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
import org.roaringbitmap.longlong.Roaring64Bitmap;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -42,6 +44,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
import static io.trino.spi.connector.MergePage.createDeleteAndInsertPages;
Expand All @@ -53,6 +56,7 @@
public class IcebergMergeSink
implements ConnectorMergeSink
{
private final int formatVersion;
private final LocationProvider locationProvider;
private final IcebergFileWriterFactory fileWriterFactory;
private final TrinoFileSystem fileSystem;
Expand All @@ -68,6 +72,7 @@ public class IcebergMergeSink
private long writtenBytes;

public IcebergMergeSink(
int formatVersion,
LocationProvider locationProvider,
IcebergFileWriterFactory fileWriterFactory,
TrinoFileSystem fileSystem,
Expand All @@ -80,6 +85,7 @@ public IcebergMergeSink(
ConnectorPageSink insertPageSink,
int columnCount)
{
this.formatVersion = formatVersion;
this.locationProvider = requireNonNull(locationProvider, "locationProvider is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
Expand Down Expand Up @@ -117,7 +123,7 @@ public void storeMergedRows(Page page)
return new FileDeletion(partitionSpecId, partitionData);
});

deletion.rowsToDelete().addLong(rowPosition);
deletion.rowsToDelete().add(rowPosition);
}
});

Expand All @@ -136,14 +142,40 @@ public CompletableFuture<Collection<Slice>> finish()
List<Slice> fragments = new ArrayList<>(insertPageSink.finish().join());
writtenBytes = insertPageSink.getCompletedBytes();

fileDeletions.forEach((dataFilePath, deletion) -> {
PositionDeleteWriter writer = createPositionDeleteWriter(
dataFilePath.toStringUtf8(),
partitionsSpecs.get(deletion.partitionSpecId()),
deletion.partitionDataJson());

fragments.addAll(writePositionDeletes(writer, deletion.rowsToDelete()));
});
if (formatVersion < 2) {
// position deletes are only supported in Iceberg format v2 and above
verify(fileDeletions.isEmpty(), "Position deletes are not supported in Iceberg format version %s", formatVersion);
}
else if (formatVersion == 2) {
fileDeletions.forEach((dataFilePath, deletion) -> deletion.rowsToDelete().build().ifPresent(deletionVector -> {
PositionDeleteWriter writer = createPositionDeleteWriter(
dataFilePath.toStringUtf8(),
partitionsSpecs.get(deletion.partitionSpecId()),
deletion.partitionDataJson());
fragments.add(writePositionDeletes(writer, deletionVector));
}));
}
else if (formatVersion == 3) {
fileDeletions.forEach((dataFilePath, deletion) -> deletion.rowsToDelete().build().ifPresent(deletionVector -> {
PartitionSpec partitionSpec = partitionsSpecs.get(deletion.partitionSpecId());
Optional<PartitionData> partitionData = createPartitionData(partitionSpec, deletion.partitionDataJson());
CommitTaskData task = new CommitTaskData(
"", // path of the v2 delete file
fileFormat,
0, // size of the v2 delete file
new MetricsWrapper(new Metrics(deletionVector.cardinality())),
PartitionSpecParser.toJson(partitionSpec),
partitionData.map(PartitionData::toJson),
FileContent.POSITION_DELETES,
Optional.of(dataFilePath.toStringUtf8()),
Optional.empty(), // unused for v3
Optional.of(deletionVector.serialize().getBytes()));
fragments.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
}));
}
else {
throw new VerifyException("Unsupported Iceberg format version: " + formatVersion);
}

return completedFuture(fragments);
}
Expand All @@ -156,18 +188,10 @@ public void abort()

private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson)
{
Optional<PartitionData> partitionData = Optional.empty();
if (partitionSpec.isPartitioned()) {
Type[] columnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
.toArray(Type[]::new);
partitionData = Optional.of(PartitionData.fromJson(partitionDataJson, columnTypes));
}

return new PositionDeleteWriter(
dataFilePath,
partitionSpec,
partitionData,
createPartitionData(partitionSpec, partitionDataJson),
locationProvider,
fileWriterFactory,
fileSystem,
Expand All @@ -176,24 +200,36 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
storageProperties);
}

private Collection<Slice> writePositionDeletes(PositionDeleteWriter writer, ImmutableLongBitmapDataProvider rowsToDelete)
private Slice writePositionDeletes(PositionDeleteWriter writer, DeletionVector rowsToDelete)
{
try {
CommitTaskData task = writer.write(rowsToDelete);
writtenBytes += task.fileSizeInBytes();
return List.of(wrappedBuffer(jsonCodec.toJsonBytes(task)));
return wrappedBuffer(jsonCodec.toJsonBytes(task));
}
catch (Throwable t) {
closeAllSuppress(t, writer::abort);
throw t;
}
}

private Optional<PartitionData> createPartitionData(PartitionSpec partitionSpec, String partitionDataAsJson)
{
if (!partitionSpec.isPartitioned()) {
return Optional.empty();
}

Type[] columnTypes = partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
.toArray(Type[]::new);
return Optional.of(PartitionData.fromJson(partitionDataAsJson, columnTypes));
}

private static class FileDeletion
{
private final int partitionSpecId;
private final String partitionDataJson;
private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap();
private final DeletionVector.Builder rowsToDelete = DeletionVector.builder();

public FileDeletion(int partitionSpecId, String partitionDataJson)
{
Expand All @@ -211,7 +247,7 @@ public String partitionDataJson()
return partitionDataJson;
}

public LongBitmapDataProvider rowsToDelete()
public DeletionVector.Builder rowsToDelete()
{
return rowsToDelete;
}
Expand Down
Loading
Loading