Skip to content

Commit e3110ee

Browse files
Core: Enhance remove snapshots efficiency by executing them in bulk
1 parent af99da6 commit e3110ee

File tree

7 files changed

+43
-32
lines changed

7 files changed

+43
-32
lines changed

.palantir/revapi.yml

+5
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,11 @@ acceptedBreaks:
11781178
new: "class org.apache.iceberg.Metrics"
11791179
justification: "Java serialization across versions is not guaranteed"
11801180
org.apache.iceberg:iceberg-core:
1181+
- code: "java.class.removed"
1182+
old: "class org.apache.iceberg.MetadataUpdate.RemoveSnapshot"
1183+
justification: "Changing the RemoveSnapshot class to receive a list of snapshots\
1184+
\ IDs instead of a single snapshot ID. This will make the remove snapshots\
1185+
\ more efficient."
11811186
- code: "java.method.removed"
11821187
old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\
11831188
\ org.apache.hadoop.conf.Configuration)"

core/src/main/java/org/apache/iceberg/MetadataUpdate.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -328,20 +328,20 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
328328
}
329329
}
330330

331-
class RemoveSnapshot implements MetadataUpdate {
332-
private final long snapshotId;
331+
class RemoveSnapshots implements MetadataUpdate {
332+
private final Set<Long> snapshotIds;
333333

334-
public RemoveSnapshot(long snapshotId) {
335-
this.snapshotId = snapshotId;
334+
public RemoveSnapshots(Set<Long> snapshotIds) {
335+
this.snapshotIds = snapshotIds;
336336
}
337337

338-
public long snapshotId() {
339-
return snapshotId;
338+
public Set<Long> snapshotIds() {
339+
return snapshotIds;
340340
}
341341

342342
@Override
343343
public void applyTo(TableMetadata.Builder metadataBuilder) {
344-
metadataBuilder.removeSnapshots(ImmutableSet.of(snapshotId));
344+
metadataBuilder.removeSnapshots(snapshotIds);
345345
}
346346
}
347347

core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private MetadataUpdateParser() {}
9797
// AddSnapshot
9898
private static final String SNAPSHOT = "snapshot";
9999

100-
// RemoveSnapshot
100+
// RemoveSnapshots
101101
private static final String SNAPSHOT_IDS = "snapshot-ids";
102102

103103
// SetSnapshotRef
@@ -150,7 +150,7 @@ private MetadataUpdateParser() {}
150150
.put(MetadataUpdate.SetPartitionStatistics.class, SET_PARTITION_STATISTICS)
151151
.put(MetadataUpdate.RemovePartitionStatistics.class, REMOVE_PARTITION_STATISTICS)
152152
.put(MetadataUpdate.AddSnapshot.class, ADD_SNAPSHOT)
153-
.put(MetadataUpdate.RemoveSnapshot.class, REMOVE_SNAPSHOTS)
153+
.put(MetadataUpdate.RemoveSnapshots.class, REMOVE_SNAPSHOTS)
154154
.put(MetadataUpdate.RemoveSnapshotRef.class, REMOVE_SNAPSHOT_REF)
155155
.put(MetadataUpdate.SetSnapshotRef.class, SET_SNAPSHOT_REF)
156156
.put(MetadataUpdate.SetProperties.class, SET_PROPERTIES)
@@ -229,7 +229,7 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
229229
writeAddSnapshot((MetadataUpdate.AddSnapshot) metadataUpdate, generator);
230230
break;
231231
case REMOVE_SNAPSHOTS:
232-
writeRemoveSnapshots((MetadataUpdate.RemoveSnapshot) metadataUpdate, generator);
232+
writeRemoveSnapshots((MetadataUpdate.RemoveSnapshots) metadataUpdate, generator);
233233
break;
234234
case REMOVE_SNAPSHOT_REF:
235235
writeRemoveSnapshotRef((MetadataUpdate.RemoveSnapshotRef) metadataUpdate, generator);
@@ -419,9 +419,9 @@ private static void writeAddSnapshot(MetadataUpdate.AddSnapshot update, JsonGene
419419

420420
// TODO - Reconcile the spec's set-based removal with the current class implementation that only
421421
// handles one value.
422-
private static void writeRemoveSnapshots(MetadataUpdate.RemoveSnapshot update, JsonGenerator gen)
422+
private static void writeRemoveSnapshots(MetadataUpdate.RemoveSnapshots update, JsonGenerator gen)
423423
throws IOException {
424-
JsonUtil.writeLongArray(SNAPSHOT_IDS, ImmutableSet.of(update.snapshotId()), gen);
424+
JsonUtil.writeLongArray(SNAPSHOT_IDS, update.snapshotIds(), gen);
425425
}
426426

427427
private static void writeSetSnapshotRef(MetadataUpdate.SetSnapshotRef update, JsonGenerator gen)
@@ -557,11 +557,10 @@ private static MetadataUpdate readAddSnapshot(JsonNode node) {
557557
private static MetadataUpdate readRemoveSnapshots(JsonNode node) {
558558
Set<Long> snapshotIds = JsonUtil.getLongSetOrNull(SNAPSHOT_IDS, node);
559559
Preconditions.checkArgument(
560-
snapshotIds != null && snapshotIds.size() == 1,
560+
snapshotIds != null,
561561
"Invalid set of snapshot ids to remove. Expected one value but received: %s",
562562
snapshotIds);
563-
Long snapshotId = Iterables.getOnlyElement(snapshotIds);
564-
return new MetadataUpdate.RemoveSnapshot(snapshotId);
563+
return new MetadataUpdate.RemoveSnapshots(snapshotIds);
565564
}
566565

567566
private static MetadataUpdate readSetSnapshotRef(JsonNode node) {

core/src/main/java/org/apache/iceberg/TableMetadata.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -1436,12 +1436,14 @@ public Builder removeSnapshots(Collection<Long> idsToRemove) {
14361436
private Builder rewriteSnapshotsInternal(Collection<Long> idsToRemove, boolean suppress) {
14371437
List<Snapshot> retainedSnapshots =
14381438
Lists.newArrayListWithExpectedSize(snapshots.size() - idsToRemove.size());
1439+
Set<Long> removeSnapshotIds = Sets.newHashSet();
1440+
14391441
for (Snapshot snapshot : snapshots) {
14401442
long snapshotId = snapshot.snapshotId();
14411443
if (idsToRemove.contains(snapshotId)) {
14421444
snapshotsById.remove(snapshotId);
14431445
if (!suppress) {
1444-
changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId));
1446+
removeSnapshotIds.add(snapshotId);
14451447
}
14461448
removeStatistics(snapshotId);
14471449
removePartitionStatistics(snapshotId);
@@ -1450,6 +1452,7 @@ private Builder rewriteSnapshotsInternal(Collection<Long> idsToRemove, boolean s
14501452
}
14511453
}
14521454

1455+
changes.add(new MetadataUpdate.RemoveSnapshots(removeSnapshotIds));
14531456
this.snapshots = retainedSnapshots;
14541457

14551458
// remove any refs that are no longer valid
@@ -1865,7 +1868,7 @@ private static List<HistoryEntry> updateSnapshotLog(
18651868
Set<Long> intermediateSnapshotIds = intermediateSnapshotIdSet(changes, currentSnapshotId);
18661869
boolean hasIntermediateSnapshots = !intermediateSnapshotIds.isEmpty();
18671870
boolean hasRemovedSnapshots =
1868-
changes.stream().anyMatch(MetadataUpdate.RemoveSnapshot.class::isInstance);
1871+
changes.stream().anyMatch(MetadataUpdate.RemoveSnapshots.class::isInstance);
18691872

18701873
if (!hasIntermediateSnapshots && !hasRemovedSnapshots) {
18711874
return snapshotLog;

core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
3737
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
3838
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
39+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3940
import org.apache.iceberg.types.Types;
4041
import org.apache.iceberg.util.Pair;
4142
import org.apache.iceberg.view.ImmutableViewVersion;
@@ -419,18 +420,18 @@ public void testAddSnapshotFromJson() throws IOException {
419420
@Test
420421
public void testRemoveSnapshotsFromJson() {
421422
String action = MetadataUpdateParser.REMOVE_SNAPSHOTS;
422-
long snapshotId = 2L;
423-
String json = String.format("{\"action\":\"%s\",\"snapshot-ids\":[2]}", action);
424-
MetadataUpdate expected = new MetadataUpdate.RemoveSnapshot(snapshotId);
423+
Set<Long> snapshotIds = Sets.newHashSet(2L, 3L);
424+
String json = String.format("{\"action\":\"%s\",\"snapshot-ids\":[2,3]}", action);
425+
MetadataUpdate expected = new MetadataUpdate.RemoveSnapshots(snapshotIds);
425426
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
426427
}
427428

428429
@Test
429430
public void testRemoveSnapshotsToJson() {
430431
String action = MetadataUpdateParser.REMOVE_SNAPSHOTS;
431-
long snapshotId = 2L;
432-
String expected = String.format("{\"action\":\"%s\",\"snapshot-ids\":[2]}", action);
433-
MetadataUpdate update = new MetadataUpdate.RemoveSnapshot(snapshotId);
432+
Set<Long> snapshotIds = Sets.newHashSet(2L, 3L);
433+
String expected = String.format("{\"action\":\"%s\",\"snapshot-ids\":[2,3]}", action);
434+
MetadataUpdate update = new MetadataUpdate.RemoveSnapshots(snapshotIds);
434435
String actual = MetadataUpdateParser.toJson(update);
435436
assertThat(actual)
436437
.as("Remove snapshots should serialize to the correct JSON value")
@@ -1019,8 +1020,8 @@ public void assertEquals(
10191020
break;
10201021
case MetadataUpdateParser.REMOVE_SNAPSHOTS:
10211022
assertEqualsRemoveSnapshots(
1022-
(MetadataUpdate.RemoveSnapshot) expectedUpdate,
1023-
(MetadataUpdate.RemoveSnapshot) actualUpdate);
1023+
(MetadataUpdate.RemoveSnapshots) expectedUpdate,
1024+
(MetadataUpdate.RemoveSnapshots) actualUpdate);
10241025
break;
10251026
case MetadataUpdateParser.REMOVE_SNAPSHOT_REF:
10261027
assertEqualsRemoveSnapshotRef(
@@ -1225,10 +1226,10 @@ private static void assertEqualsAddSnapshot(
12251226
}
12261227

12271228
private static void assertEqualsRemoveSnapshots(
1228-
MetadataUpdate.RemoveSnapshot expected, MetadataUpdate.RemoveSnapshot actual) {
1229-
assertThat(actual.snapshotId())
1229+
MetadataUpdate.RemoveSnapshots expected, MetadataUpdate.RemoveSnapshots actual) {
1230+
assertThat(actual.snapshotIds())
12301231
.as("Snapshots to remove should be the same")
1231-
.isEqualTo(expected.snapshotId());
1232+
.isEqualTo(expected.snapshotIds());
12321233
}
12331234

12341235
private static void assertEqualsSetSnapshotRef(

core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.Mockito.when;
2626

2727
import java.util.List;
28+
import java.util.Set;
2829
import java.util.UUID;
2930
import org.apache.iceberg.catalog.Namespace;
3031
import org.apache.iceberg.exceptions.CommitFailedException;
@@ -690,7 +691,7 @@ public void setAndRemoveStatistics() {
690691
}
691692

692693
@Test
693-
public void addAndRemoveSnapshot() {
694+
public void addAndRemoveSnapshots() {
694695
List<UpdateRequirement> requirements =
695696
UpdateRequirements.forUpdateTable(
696697
metadata, ImmutableList.of(new MetadataUpdate.AddSnapshot(mock(Snapshot.class))));
@@ -704,7 +705,7 @@ public void addAndRemoveSnapshot() {
704705

705706
requirements =
706707
UpdateRequirements.forUpdateTable(
707-
metadata, ImmutableList.of(new MetadataUpdate.RemoveSnapshot(0L)));
708+
metadata, ImmutableList.of(new MetadataUpdate.RemoveSnapshots(Set.of(0L))));
708709

709710
assertThat(requirements)
710711
.hasSize(1)
@@ -747,7 +748,7 @@ public void setAndRemoveSnapshotRef() {
747748

748749
requirements =
749750
UpdateRequirements.forUpdateTable(
750-
metadata, ImmutableList.of(new MetadataUpdate.RemoveSnapshot(0L)));
751+
metadata, ImmutableList.of(new MetadataUpdate.RemoveSnapshots(Set.of(0L))));
751752
requirements.forEach(req -> req.validate(metadata));
752753

753754
assertThat(requirements)

core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2323

2424
import com.fasterxml.jackson.databind.JsonNode;
25+
import java.util.Set;
2526
import org.apache.iceberg.MetadataUpdate;
2627
import org.apache.iceberg.UpdateRequirement;
2728
import org.apache.iceberg.catalog.TableIdentifier;
@@ -96,7 +97,8 @@ public void roundTripSerde() {
9697
new UpdateRequirement.AssertDefaultSpecID(4),
9798
new UpdateRequirement.AssertCurrentSchemaID(24)),
9899
ImmutableList.of(
99-
new MetadataUpdate.RemoveSnapshot(101L), new MetadataUpdate.SetCurrentSchema(25)));
100+
new MetadataUpdate.RemoveSnapshots(Set.of(101L)),
101+
new MetadataUpdate.SetCurrentSchema(25)));
100102

101103
CommitTransactionRequest request =
102104
new CommitTransactionRequest(

0 commit comments

Comments
 (0)