Skip to content

Commit 8322da8

Browse files
committed
Add support for branching in Iceberg
1 parent 2ae6797 commit 8322da8

8 files changed

+568
-25
lines changed

Diff for: plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

+156-23
Large diffs are not rendered by default.

Diff for: plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class IcebergTableHandle
4949
private final int formatVersion;
5050
private final String tableLocation;
5151
private final Map<String, String> storageProperties;
52+
private final Optional<String> branch;
5253

5354
// Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector
5455
private final TupleDomain<IcebergColumnHandle> unenforcedPredicate;
@@ -92,7 +93,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
9293
@JsonProperty("projectedColumns") Set<IcebergColumnHandle> projectedColumns,
9394
@JsonProperty("nameMappingJson") Optional<String> nameMappingJson,
9495
@JsonProperty("tableLocation") String tableLocation,
95-
@JsonProperty("storageProperties") Map<String, String> storageProperties)
96+
@JsonProperty("storageProperties") Map<String, String> storageProperties,
97+
@JsonProperty("branch") Optional<String> branch)
9698
{
9799
return new IcebergTableHandle(
98100
catalog,
@@ -111,6 +113,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
111113
tableLocation,
112114
storageProperties,
113115
Optional.empty(),
116+
branch,
114117
false,
115118
Optional.empty(),
116119
ImmutableSet.of(),
@@ -134,6 +137,7 @@ public IcebergTableHandle(
134137
String tableLocation,
135138
Map<String, String> storageProperties,
136139
Optional<IcebergTablePartitioning> tablePartitioning,
140+
Optional<String> branch,
137141
boolean recordScannedFiles,
138142
Optional<DataSize> maxScannedFileSize,
139143
Set<IcebergColumnHandle> constraintColumns,
@@ -155,6 +159,7 @@ public IcebergTableHandle(
155159
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
156160
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
157161
this.tablePartitioning = requireNonNull(tablePartitioning, "tablePartitioning is null");
162+
this.branch = requireNonNull(branch, "branch is null");
158163
this.recordScannedFiles = recordScannedFiles;
159164
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
160165
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
@@ -261,6 +266,12 @@ public Optional<IcebergTablePartitioning> getTablePartitioning()
261266
return tablePartitioning;
262267
}
263268

269+
@JsonProperty
270+
public Optional<String> getBranch()
271+
{
272+
return branch;
273+
}
274+
264275
@JsonIgnore
265276
public boolean isRecordScannedFiles()
266277
{
@@ -314,6 +325,7 @@ public IcebergTableHandle withProjectedColumns(Set<IcebergColumnHandle> projecte
314325
tableLocation,
315326
storageProperties,
316327
tablePartitioning,
328+
branch,
317329
recordScannedFiles,
318330
maxScannedFileSize,
319331
constraintColumns,
@@ -339,6 +351,7 @@ public IcebergTableHandle forAnalyze()
339351
tableLocation,
340352
storageProperties,
341353
tablePartitioning,
354+
branch,
342355
recordScannedFiles,
343356
maxScannedFileSize,
344357
constraintColumns,
@@ -364,6 +377,7 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc
364377
tableLocation,
365378
storageProperties,
366379
tablePartitioning,
380+
branch,
367381
recordScannedFiles,
368382
Optional.of(maxScannedFileSize),
369383
constraintColumns,
@@ -389,6 +403,7 @@ public IcebergTableHandle withTablePartitioning(Optional<IcebergTablePartitionin
389403
tableLocation,
390404
storageProperties,
391405
requiredTablePartitioning,
406+
branch,
392407
recordScannedFiles,
393408
maxScannedFileSize,
394409
constraintColumns,
@@ -422,6 +437,7 @@ public boolean equals(Object o)
422437
Objects.equals(nameMappingJson, that.nameMappingJson) &&
423438
Objects.equals(tableLocation, that.tableLocation) &&
424439
Objects.equals(storageProperties, that.storageProperties) &&
440+
Objects.equals(branch, that.branch) &&
425441
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
426442
Objects.equals(constraintColumns, that.constraintColumns) &&
427443
Objects.equals(forAnalyze, that.forAnalyze);
@@ -446,6 +462,7 @@ public int hashCode()
446462
nameMappingJson,
447463
tableLocation,
448464
storageProperties,
465+
branch,
449466
recordScannedFiles,
450467
maxScannedFileSize,
451468
constraintColumns,

Diff for: plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.Optional;
2526

2627
import static com.google.common.base.Preconditions.checkArgument;
2728
import static java.util.Objects.requireNonNull;
@@ -37,7 +38,8 @@ public record IcebergWritableTableHandle(
3738
IcebergFileFormat fileFormat,
3839
Map<String, String> storageProperties,
3940
RetryMode retryMode,
40-
Map<String, String> fileIoProperties)
41+
Map<String, String> fileIoProperties,
42+
Optional<String> branch)
4143
implements ConnectorInsertTableHandle, ConnectorOutputTableHandle
4244
{
4345
public IcebergWritableTableHandle
@@ -53,6 +55,7 @@ public record IcebergWritableTableHandle(
5355
requireNonNull(retryMode, "retryMode is null");
5456
checkArgument(partitionsSpecsAsJson.containsKey(partitionSpecId), "partitionSpecId missing from partitionSpecs");
5557
fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null"));
58+
requireNonNull(branch, "branch is null");
5659
}
5760

5861
@Override

Diff for: plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java

+1
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ private static IcebergTableHandle getIcebergTableHandle(PartitionSpec partitionS
265265
"dummy_table_location",
266266
ImmutableMap.of(),
267267
Optional.empty(),
268+
Optional.empty(),
268269
false,
269270
Optional.empty(),
270271
ImmutableSet.of(),

0 commit comments

Comments
 (0)