Skip to content

Commit efc504c

Browse files
committed
[rest] Add fromSnapshot to rollback
1 parent 08aeb86 commit efc504c

File tree

10 files changed

+106
-30
lines changed

10 files changed

+106
-30
lines changed

docs/static/rest-catalog-open-api.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2763,6 +2763,10 @@ components:
27632763
properties:
27642764
instant:
27652765
$ref: '#/components/schemas/Instant'
2766+
fromSnapshot:
2767+
type: integer
2768+
format: int64
2769+
nullable: true
27662770
Instant:
27672771
anyOf:
27682772
- $ref: '#/components/schemas/SnapshotInstant'

paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,23 @@ public boolean commitSnapshot(
595595
* this table
596596
*/
597597
public void rollbackTo(Identifier identifier, Instant instant) {
598-
RollbackTableRequest request = new RollbackTableRequest(instant);
598+
rollbackTo(identifier, instant, null);
599+
}
600+
601+
/**
602+
* Rollback instant for table.
603+
*
604+
* @param identifier database name and table name.
605+
* @param instant instant to rollback
606+
* @param fromSnapshot snapshot from, success only occurs when the latest snapshot is this
607+
* snapshot.
608+
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table or the snapshot
609+
* or the tag not exists
610+
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
611+
* this table
612+
*/
613+
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot) {
614+
RollbackTableRequest request = new RollbackTableRequest(instant, fromSnapshot);
599615
client.post(
600616
resourcePaths.rollbackTable(
601617
identifier.getDatabaseName(), identifier.getObjectName()),

paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackTableRequest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,42 @@
2424
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2525
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
2626
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
2728
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
2829

30+
import javax.annotation.Nullable;
31+
2932
/** Request for rollback table. */
3033
@JsonIgnoreProperties(ignoreUnknown = true)
3134
public class RollbackTableRequest implements RESTRequest {
3235

3336
private static final String FIELD_INSTANT = "instant";
37+
private static final String FIELD_FROM_SNAPSHOT = "fromSnapshot";
3438

3539
@JsonProperty(FIELD_INSTANT)
3640
private final Instant instant;
3741

42+
@JsonProperty(FIELD_FROM_SNAPSHOT)
43+
@JsonInclude(JsonInclude.Include.NON_NULL)
44+
@Nullable
45+
private final Long fromSnapshot;
46+
3847
@JsonCreator
39-
public RollbackTableRequest(@JsonProperty(FIELD_INSTANT) Instant instant) {
48+
public RollbackTableRequest(
49+
@JsonProperty(FIELD_INSTANT) Instant instant,
50+
@JsonProperty(FIELD_FROM_SNAPSHOT) @Nullable Long fromSnapshot) {
4051
this.instant = instant;
52+
this.fromSnapshot = fromSnapshot;
4153
}
4254

4355
@JsonGetter(FIELD_INSTANT)
4456
public Instant getInstant() {
4557
return instant;
4658
}
59+
60+
@JsonGetter(FIELD_FROM_SNAPSHOT)
61+
@Nullable
62+
public Long getFromSnapshot() {
63+
return fromSnapshot;
64+
}
4765
}

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ public PagedList<Snapshot> listSnapshotsPaged(
558558
}
559559

560560
@Override
561-
public void rollbackTo(Identifier identifier, Instant instant)
561+
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
562562
throws Catalog.TableNotExistException {
563563
throw new UnsupportedOperationException();
564564
}

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,24 @@ PagedList<Snapshot> listSnapshotsPaged(
717717
* @throws UnsupportedOperationException if the catalog does not {@link
718718
* #supportsVersionManagement()}
719719
*/
720-
void rollbackTo(Identifier identifier, Instant instant) throws Catalog.TableNotExistException;
720+
default void rollbackTo(Identifier identifier, Instant instant)
721+
throws Catalog.TableNotExistException {
722+
rollbackTo(identifier, instant, null);
723+
}
724+
725+
/**
726+
* rollback table by the given {@link Identifier} and instant.
727+
*
728+
* @param identifier path of the table
729+
* @param instant like snapshotId or tagName
730+
* @param fromSnapshot snapshot from, success only occurs when the latest snapshot is this
731+
* snapshot.
732+
* @throws Catalog.TableNotExistException if the table does not exist
733+
* @throws UnsupportedOperationException if the catalog does not {@link
734+
* #supportsVersionManagement()}
735+
*/
736+
void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
737+
throws Catalog.TableNotExistException;
721738

722739
/**
723740
* Create a new branch for this table. By default, an empty branch will be created using the

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,9 @@ public PagedList<Snapshot> listSnapshotsPaged(
207207
}
208208

209209
@Override
210-
public void rollbackTo(Identifier identifier, Instant instant)
210+
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
211211
throws Catalog.TableNotExistException {
212-
wrapped.rollbackTo(identifier, instant);
212+
wrapped.rollbackTo(identifier, instant, fromSnapshot);
213213
}
214214

215215
@Override

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,10 @@ public boolean commitSnapshot(
373373
}
374374

375375
@Override
376-
public void rollbackTo(Identifier identifier, Instant instant)
376+
public void rollbackTo(Identifier identifier, Instant instant, @Nullable Long fromSnapshot)
377377
throws Catalog.TableNotExistException {
378378
try {
379-
api.rollbackTo(identifier, instant);
379+
api.rollbackTo(identifier, instant, fromSnapshot);
380380
} catch (NoSuchResourceException e) {
381381
if (StringUtils.equals(e.resourceType(), ErrorResponse.RESOURCE_TYPE_SNAPSHOT)) {
382382
throw new IllegalArgumentException(

paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,11 @@ public static GetTableTokenResponse getTableCredentialsResponse() {
267267
}
268268

269269
public static RollbackTableRequest rollbackTableRequestBySnapshot(long snapshotId) {
270-
return new RollbackTableRequest(Instant.snapshot(snapshotId));
270+
return new RollbackTableRequest(Instant.snapshot(snapshotId), null);
271271
}
272272

273273
public static RollbackTableRequest rollbackTableRequestByTag(String tagName) {
274-
return new RollbackTableRequest(Instant.tag(tagName));
274+
return new RollbackTableRequest(Instant.tag(tagName), null);
275275
}
276276

277277
public static AlterViewRequest alterViewRequest() {

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@
118118
import org.slf4j.LoggerFactory;
119119
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
120120

121+
import javax.annotation.Nullable;
122+
121123
import java.io.FileNotFoundException;
122124
import java.io.IOException;
123125
import java.io.UncheckedIOException;
@@ -478,7 +480,8 @@ public MockResponse dispatch(RecordedRequest request) {
478480
long snapshotId =
479481
((Instant.SnapshotInstant) requestBody.getInstant())
480482
.getSnapshotId();
481-
return rollbackTableByIdHandle(identifier, snapshotId);
483+
return rollbackTableByIdHandle(
484+
identifier, snapshotId, requestBody.getFromSnapshot());
482485
} else if (requestBody.getInstant() instanceof Instant.TagInstant) {
483486
String tagName =
484487
((Instant.TagInstant) requestBody.getInstant())
@@ -844,26 +847,35 @@ private MockResponse commitTableHandle(Identifier identifier, String data) throw
844847
requestBody.getStatistics());
845848
}
846849

847-
private MockResponse rollbackTableByIdHandle(Identifier identifier, long snapshotId)
848-
throws Exception {
850+
private MockResponse rollbackTableByIdHandle(
851+
Identifier identifier, long snapshotId, @Nullable Long fromSnapshot) throws Exception {
849852
FileStoreTable table = getFileTable(identifier);
850853
String identifierWithSnapshotId = geTableFullNameWithSnapshotId(identifier, snapshotId);
851-
if (tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) {
852-
table =
853-
table.copy(
854-
Collections.singletonMap(
855-
SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true"));
856-
long latestSnapshotId = table.snapshotManager().latestSnapshotId();
857-
table.rollbackTo(snapshotId);
858-
cleanSnapshot(identifier, snapshotId, latestSnapshotId);
859-
tableLatestSnapshotStore.put(
860-
identifier.getFullName(),
861-
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
862-
return new MockResponse().setResponseCode(200);
854+
TableSnapshot toSnapshot = tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId);
855+
if (toSnapshot == null) {
856+
return mockResponse(
857+
new ErrorResponse(
858+
ErrorResponse.RESOURCE_TYPE_SNAPSHOT, "" + snapshotId, "", 404),
859+
404);
863860
}
864-
return mockResponse(
865-
new ErrorResponse(ErrorResponse.RESOURCE_TYPE_SNAPSHOT, "" + snapshotId, "", 404),
866-
404);
861+
long latestSnapshotId = table.snapshotManager().latestSnapshotId();
862+
if (fromSnapshot != null && fromSnapshot != latestSnapshotId) {
863+
return mockResponse(
864+
new ErrorResponse(
865+
null,
866+
null,
867+
String.format(
868+
"Latest snapshot %s is not %s", latestSnapshotId, fromSnapshot),
869+
500),
870+
500);
871+
}
872+
table =
873+
table.copy(
874+
Collections.singletonMap(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true"));
875+
table.rollbackTo(snapshotId);
876+
cleanSnapshot(identifier, snapshotId, latestSnapshotId);
877+
tableLatestSnapshotStore.put(identifier.getFullName(), toSnapshot);
878+
return new MockResponse().setResponseCode(200);
867879
}
868880

869881
private MockResponse rollbackTableByTagNameHandle(Identifier identifier, String tagName)

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.paimon.schema.SchemaChange;
5151
import org.apache.paimon.schema.SchemaManager;
5252
import org.apache.paimon.table.FileStoreTable;
53+
import org.apache.paimon.table.Instant;
5354
import org.apache.paimon.table.Table;
5455
import org.apache.paimon.table.TableSnapshot;
5556
import org.apache.paimon.table.object.ObjectTable;
@@ -1751,24 +1752,32 @@ public void testTableRollback() throws Exception {
17511752
GenericRow record = GenericRow.of(i);
17521753
write.write(record);
17531754
commit.commit(i, write.prepareCommit(false, i));
1754-
table.createTag("tag-" + i);
1755+
table.createTag("tag-" + (i + 1));
17551756
}
17561757
write.close();
17571758
commit.close();
1759+
1760+
// rollback to snapshot 4
17581761
long rollbackToSnapshotId = 4;
17591762
table.rollbackTo(rollbackToSnapshotId);
17601763
assertThat(table.snapshotManager().snapshot(rollbackToSnapshotId))
17611764
.isEqualTo(restCatalog.loadSnapshot(identifier).get().snapshot());
17621765
assertThat(table.tagManager().tagExists("tag-" + (rollbackToSnapshotId + 2))).isFalse();
17631766
assertThat(table.snapshotManager().snapshotExists(rollbackToSnapshotId + 1)).isFalse();
1764-
17651767
assertThrows(
17661768
IllegalArgumentException.class, () -> table.rollbackTo(rollbackToSnapshotId + 1));
17671769

1770+
// rollback to snapshot 3
17681771
String rollbackToTagName = "tag-" + (rollbackToSnapshotId - 1);
17691772
table.rollbackTo(rollbackToTagName);
17701773
Snapshot tagSnapshot = table.tagManager().getOrThrow(rollbackToTagName).trimToSnapshot();
17711774
assertThat(tagSnapshot).isEqualTo(restCatalog.loadSnapshot(identifier).get().snapshot());
1775+
1776+
// rollback to snapshot 2 from snapshot
1777+
assertThatThrownBy(() -> catalog.rollbackTo(identifier, Instant.snapshot(2L), 4L))
1778+
.hasMessageContaining("Latest snapshot 3 is not 4");
1779+
catalog.rollbackTo(identifier, Instant.snapshot(2L), 3L);
1780+
assertThat(table.latestSnapshot().get().id()).isEqualTo(2);
17721781
}
17731782

17741783
@Test

0 commit comments

Comments
 (0)