Skip to content

Commit d88cb83

Browse files
committed
[core][rest] Add tableId to commit snapshot to avoid wrong commit
1 parent ada017e commit d88cb83

File tree

11 files changed

+76
-22
lines changed

11 files changed

+76
-22
lines changed

docs/static/rest-catalog-open-api.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2395,6 +2395,8 @@ components:
23952395
CommitTableRequest:
23962396
type: object
23972397
properties:
2398+
tableId:
2399+
type: string
23982400
snapshot:
23992401
$ref: '#/components/schemas/Snapshot'
24002402
statistics:

paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ public PagedList<Snapshot> listSnapshotsPaged(
544544
* Commit snapshot for table.
545545
*
546546
* @param identifier database name and table name.
547+
* @param tableUuid Uuid of the table to avoid wrong commit
547548
* @param snapshot snapshot for committing
548549
* @param statistics statistics for this snapshot incremental
549550
* @return true if commit success
@@ -552,8 +553,11 @@ public PagedList<Snapshot> listSnapshotsPaged(
552553
* this table
553554
*/
554555
public boolean commitSnapshot(
555-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics) {
556-
CommitTableRequest request = new CommitTableRequest(snapshot, statistics);
556+
Identifier identifier,
557+
@Nullable String tableUuid,
558+
Snapshot snapshot,
559+
List<PartitionStatistics> statistics) {
560+
CommitTableRequest request = new CommitTableRequest(tableUuid, snapshot, statistics);
557561
CommitTableResponse response =
558562
client.post(
559563
resourcePaths.commitTable(

paimon-api/src/main/java/org/apache/paimon/rest/requests/CommitTableRequest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,13 @@
3333
@JsonIgnoreProperties(ignoreUnknown = true)
3434
public class CommitTableRequest implements RESTRequest {
3535

36+
private static final String FIELD_TABLE_ID = "tableId";
3637
private static final String FIELD_SNAPSHOT = "snapshot";
3738
private static final String FIELD_STATISTICS = "statistics";
3839

40+
@JsonProperty(FIELD_TABLE_ID)
41+
private final String tableId;
42+
3943
@JsonProperty(FIELD_SNAPSHOT)
4044
private final Snapshot snapshot;
4145

@@ -44,12 +48,19 @@ public class CommitTableRequest implements RESTRequest {
4448

4549
@JsonCreator
4650
public CommitTableRequest(
51+
@JsonProperty(FIELD_TABLE_ID) String tableId,
4752
@JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot,
4853
@JsonProperty(FIELD_STATISTICS) List<PartitionStatistics> statistics) {
54+
this.tableId = tableId;
4955
this.snapshot = snapshot;
5056
this.statistics = statistics;
5157
}
5258

59+
@JsonGetter(FIELD_TABLE_ID)
60+
public String getTableId() {
61+
return tableId;
62+
}
63+
5364
@JsonGetter(FIELD_SNAPSHOT)
5465
public Snapshot getSnapshot() {
5566
return snapshot;

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,10 @@ public List<String> listBranches(Identifier identifier) throws TableNotExistExce
477477

478478
@Override
479479
public boolean commitSnapshot(
480-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics) {
480+
Identifier identifier,
481+
@Nullable String tableUuid,
482+
Snapshot snapshot,
483+
List<PartitionStatistics> statistics) {
481484
throw new UnsupportedOperationException();
482485
}
483486

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@ default boolean supportsListByPattern() {
617617
* Commit the {@link Snapshot} for table identified by the given {@link Identifier}.
618618
*
619619
* @param identifier Path of the table
620+
* @param tableUuid Uuid of the table to avoid wrong commit
620621
* @param snapshot Snapshot to be committed
621622
* @param statistics statistics information of this change
622623
* @return Success or not
@@ -625,7 +626,10 @@ default boolean supportsListByPattern() {
625626
* #supportsVersionManagement()}
626627
*/
627628
boolean commitSnapshot(
628-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
629+
Identifier identifier,
630+
@Nullable String tableUuid,
631+
Snapshot snapshot,
632+
List<PartitionStatistics> statistics)
629633
throws Catalog.TableNotExistException;
630634

631635
/**

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,29 @@
2222
import org.apache.paimon.partition.PartitionStatistics;
2323
import org.apache.paimon.utils.SnapshotManager;
2424

25+
import javax.annotation.Nullable;
26+
2527
import java.util.List;
2628

2729
/** A {@link SnapshotCommit} using {@link Catalog} to commit. */
2830
public class CatalogSnapshotCommit implements SnapshotCommit {
2931

3032
private final Catalog catalog;
3133
private final Identifier identifier;
34+
@Nullable private final String uuid;
3235

33-
public CatalogSnapshotCommit(Catalog catalog, Identifier identifier) {
36+
public CatalogSnapshotCommit(Catalog catalog, Identifier identifier, @Nullable String uuid) {
3437
this.catalog = catalog;
3538
this.identifier = identifier;
39+
this.uuid = uuid;
3640
}
3741

3842
@Override
3943
public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics> statistics)
4044
throws Exception {
4145
Identifier newIdentifier =
4246
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branch);
43-
return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
47+
return catalog.commitSnapshot(newIdentifier, uuid, snapshot, statistics);
4448
}
4549

4650
@Override
@@ -54,14 +58,16 @@ public static class Factory implements SnapshotCommit.Factory {
5458
private static final long serialVersionUID = 1L;
5559

5660
private final CatalogLoader catalogLoader;
61+
@Nullable private final String uuid;
5762

58-
public Factory(CatalogLoader catalogLoader) {
63+
public Factory(CatalogLoader catalogLoader, @Nullable String uuid) {
5964
this.catalogLoader = catalogLoader;
65+
this.uuid = uuid;
6066
}
6167

6268
@Override
6369
public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) {
64-
return new CatalogSnapshotCommit(catalogLoader.load(), identifier);
70+
return new CatalogSnapshotCommit(catalogLoader.load(), identifier, uuid);
6571
}
6672
}
6773
}

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,12 @@ public List<String> listBranches(Identifier identifier) throws TableNotExistExce
212212

213213
@Override
214214
public boolean commitSnapshot(
215-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
215+
Identifier identifier,
216+
@Nullable String tableUuid,
217+
Snapshot snapshot,
218+
List<PartitionStatistics> statistics)
216219
throws TableNotExistException {
217-
return wrapped.commitSnapshot(identifier, snapshot, statistics);
220+
return wrapped.commitSnapshot(identifier, tableUuid, snapshot, statistics);
218221
}
219222

220223
@Override

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
4040
import org.apache.paimon.rest.exceptions.NotImplementedException;
4141
import org.apache.paimon.rest.exceptions.ServiceFailureException;
42-
import org.apache.paimon.rest.requests.RollbackTableRequest;
4342
import org.apache.paimon.rest.responses.ErrorResponse;
4443
import org.apache.paimon.rest.responses.GetDatabaseResponse;
4544
import org.apache.paimon.rest.responses.GetFunctionResponse;
@@ -331,10 +330,13 @@ public boolean supportsVersionManagement() {
331330

332331
@Override
333332
public boolean commitSnapshot(
334-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
333+
Identifier identifier,
334+
@Nullable String tableUuid,
335+
Snapshot snapshot,
336+
List<PartitionStatistics> statistics)
335337
throws TableNotExistException {
336338
try {
337-
return api.commitSnapshot(identifier, snapshot, statistics);
339+
return api.commitSnapshot(identifier, tableUuid, snapshot, statistics);
338340
} catch (NoSuchResourceException e) {
339341
throw new TableNotExistException(identifier);
340342
} catch (ForbiddenException e) {
@@ -347,7 +349,6 @@ public boolean commitSnapshot(
347349
@Override
348350
public void rollbackTo(Identifier identifier, Instant instant)
349351
throws Catalog.TableNotExistException {
350-
RollbackTableRequest request = new RollbackTableRequest(instant);
351352
try {
352353
api.rollbackTo(identifier, instant);
353354
} catch (NoSuchResourceException e) {

paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public boolean supportsVersionManagement() {
9595
public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) {
9696
SnapshotCommit.Factory factory;
9797
if (catalogLoader != null && supportsVersionManagement) {
98-
factory = new CatalogSnapshotCommit.Factory(catalogLoader);
98+
factory = new CatalogSnapshotCommit.Factory(catalogLoader, uuid);
9999
} else {
100100
factory = new RenamingSnapshotCommit.Factory(lockFactory, lockContext);
101101
}

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -773,10 +773,11 @@ private MockResponse commitTableHandle(Identifier identifier, String data) throw
773773
if (!tableMetadataStore.containsKey(identifier.getFullName())) {
774774
throw new Catalog.TableNotExistException(identifier);
775775
}
776-
boolean success =
777-
commitSnapshot(identifier, requestBody.getSnapshot(), requestBody.getStatistics());
778-
CommitTableResponse response = new CommitTableResponse(success);
779-
return mockResponse(response, 200);
776+
return commitSnapshot(
777+
identifier,
778+
requestBody.getTableId(),
779+
requestBody.getSnapshot(),
780+
requestBody.getStatistics());
780781
}
781782

782783
private MockResponse rollbackTableByIdHandle(Identifier identifier, long snapshotId)
@@ -1864,10 +1865,16 @@ private String geTableFullNameWithSnapshotId(Identifier identifier, long snapsho
18641865
return String.format("%s-%d", identifier.getFullName(), snapshotId);
18651866
}
18661867

1867-
private boolean commitSnapshot(
1868-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
1868+
private MockResponse commitSnapshot(
1869+
Identifier identifier,
1870+
String tableId,
1871+
Snapshot snapshot,
1872+
List<PartitionStatistics> statistics)
18691873
throws Catalog.TableNotExistException {
18701874
FileStoreTable table = getFileTable(identifier);
1875+
if (!tableId.equals(table.catalogEnvironment().uuid())) {
1876+
throw new Catalog.TableNotExistException(identifier);
1877+
}
18711878
RenamingSnapshotCommit commit =
18721879
new RenamingSnapshotCommit(table.snapshotManager(), Lock.empty());
18731880
String branchName = identifier.getBranchName();
@@ -1989,7 +1996,8 @@ private boolean commitSnapshot(
19891996
&& partition.recordCount() <= 0);
19901997
return partitions.isEmpty();
19911998
});
1992-
return success;
1999+
CommitTableResponse response = new CommitTableResponse(success);
2000+
return mockResponse(response, 200);
19932001
} catch (Exception e) {
19942002
throw new RuntimeException(e);
19952003
}

0 commit comments

Comments
 (0)