Skip to content

Commit 5a50eac

Browse files
committed
[core] Add supportsVersionManagement to Catalog
1 parent df10e65 commit 5a50eac

File tree

10 files changed

+64
-84
lines changed

10 files changed

+64
-84
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -486,12 +486,12 @@ public boolean supportsVersionManagement() {
486486
}
487487

488488
@Override
489-
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
490-
491-
}
489+
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
490+
throws TableNotExistException {}
492491

493492
@Override
494-
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
493+
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
494+
throws TableNotExistException {
495495
Table table = getTable(identifier);
496496
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
497497
commit.truncatePartitions(partitions);
@@ -501,9 +501,8 @@ public void dropPartitions(Identifier identifier, List<Map<String, String>> part
501501
}
502502

503503
@Override
504-
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions) throws TableNotExistException {
505-
506-
}
504+
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
505+
throws TableNotExistException {}
507506

508507
/**
509508
* Create a {@link FormatTable} identified by the given {@link Identifier}.

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.paimon.table.Instant;
2929
import org.apache.paimon.table.Table;
3030
import org.apache.paimon.table.TableSnapshot;
31-
import org.apache.paimon.table.sink.BatchTableCommit;
3231
import org.apache.paimon.view.View;
3332

3433
import javax.annotation.Nullable;
@@ -467,7 +466,8 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
467466
// ==================== Version management methods ==========================
468467

469468
/**
470-
* Whether this catalog supports version management for tables. If not, corresponding methods will throw an {@link UnsupportedOperationException}, affect the following methods:
469+
* Whether this catalog supports version management for tables. If not, corresponding methods
470+
* will throw an {@link UnsupportedOperationException}, affect the following methods:
471471
*
472472
* <ul>
473473
* <li>{@link #commitSnapshot(Identifier, Snapshot, List)}.
@@ -488,6 +488,8 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
488488
* @param statistics statistics information of this change
489489
* @return Success or not
490490
* @throws Catalog.TableNotExistException if the target does not exist
491+
* @throws UnsupportedOperationException if the catalog does not {@link
492+
* #supportsVersionManagement()}
491493
*/
492494
boolean commitSnapshot(
493495
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
@@ -499,6 +501,8 @@ boolean commitSnapshot(
499501
* @param identifier Path of the table
500502
* @return The requested snapshot of the table
501503
* @throws Catalog.TableNotExistException if the target does not exist
504+
* @throws UnsupportedOperationException if the catalog does not {@link
505+
* #supportsVersionManagement()}
502506
*/
503507
Optional<TableSnapshot> loadSnapshot(Identifier identifier)
504508
throws Catalog.TableNotExistException;
@@ -509,6 +513,8 @@ Optional<TableSnapshot> loadSnapshot(Identifier identifier)
509513
* @param identifier path of the table
510514
* @param instant like snapshotId or tagName
511515
* @throws Catalog.TableNotExistException if the table does not exist
516+
* @throws UnsupportedOperationException if the catalog does not {@link
517+
* #supportsVersionManagement()}
512518
*/
513519
void rollbackTo(Identifier identifier, Instant instant) throws Catalog.TableNotExistException;
514520

@@ -523,6 +529,8 @@ Optional<TableSnapshot> loadSnapshot(Identifier identifier)
523529
* @throws TableNotExistException if the table in identifier doesn't exist
524530
* @throws BranchAlreadyExistException if the branch already exists
525531
* @throws TagNotExistException if the tag doesn't exist
532+
* @throws UnsupportedOperationException if the catalog does not {@link
533+
* #supportsVersionManagement()}
526534
*/
527535
void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
528536
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException;
@@ -533,6 +541,8 @@ void createBranch(Identifier identifier, String branch, @Nullable String fromTag
533541
* @param identifier path of the table, cannot be system or branch name.
534542
* @param branch the branch name
535543
* @throws BranchNotExistException if the branch doesn't exist
544+
* @throws UnsupportedOperationException if the catalog does not {@link
545+
* #supportsVersionManagement()}
536546
*/
537547
void dropBranch(Identifier identifier, String branch) throws BranchNotExistException;
538548

@@ -542,6 +552,8 @@ void createBranch(Identifier identifier, String branch, @Nullable String fromTag
542552
* @param identifier path of the table, cannot be system or branch name.
543553
* @param branch the branch name
544554
* @throws BranchNotExistException if the branch doesn't exist
555+
* @throws UnsupportedOperationException if the catalog does not {@link
556+
* #supportsVersionManagement()}
545557
*/
546558
void fastForward(Identifier identifier, String branch) throws BranchNotExistException;
547559

@@ -550,6 +562,8 @@ void createBranch(Identifier identifier, String branch, @Nullable String fromTag
550562
*
551563
* @param identifier path of the table, cannot be system or branch name.
552564
* @throws TableNotExistException if the table in identifier doesn't exist
565+
* @throws UnsupportedOperationException if the catalog does not {@link
566+
* #supportsVersionManagement()}
553567
*/
554568
List<String> listBranches(Identifier identifier) throws TableNotExistException;
555569

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,25 @@
2222
import org.apache.paimon.partition.PartitionStatistics;
2323
import org.apache.paimon.utils.SnapshotManager;
2424

25-
import javax.annotation.Nullable;
26-
2725
import java.util.List;
2826

2927
/** A {@link SnapshotCommit} using {@link Catalog} to commit. */
3028
public class CatalogSnapshotCommit implements SnapshotCommit {
3129

3230
private final Catalog catalog;
3331
private final Identifier identifier;
34-
private final RenamingSnapshotCommit renamingCommit;
3532

36-
public CatalogSnapshotCommit(
37-
Catalog catalog, Identifier identifier, RenamingSnapshotCommit renamingCommit) {
33+
public CatalogSnapshotCommit(Catalog catalog, Identifier identifier) {
3834
this.catalog = catalog;
3935
this.identifier = identifier;
40-
this.renamingCommit = renamingCommit;
4136
}
4237

4338
@Override
4439
public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics> statistics)
4540
throws Exception {
46-
try {
47-
Identifier newIdentifier =
48-
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branch);
49-
return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
50-
} catch (UnsupportedOperationException e) {
51-
return renamingCommit.commit(snapshot, branch, statistics);
52-
}
41+
Identifier newIdentifier =
42+
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branch);
43+
return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
5344
}
5445

5546
@Override
@@ -63,24 +54,14 @@ public static class Factory implements SnapshotCommit.Factory {
6354
private static final long serialVersionUID = 1L;
6455

6556
private final CatalogLoader catalogLoader;
66-
@Nullable private final CatalogLockFactory lockFactory;
67-
@Nullable private final CatalogLockContext lockContext;
6857

69-
public Factory(
70-
CatalogLoader catalogLoader,
71-
@Nullable CatalogLockFactory lockFactory,
72-
@Nullable CatalogLockContext lockContext) {
58+
public Factory(CatalogLoader catalogLoader) {
7359
this.catalogLoader = catalogLoader;
74-
this.lockFactory = lockFactory;
75-
this.lockContext = lockContext;
7660
}
7761

7862
@Override
7963
public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) {
80-
RenamingSnapshotCommit renamingCommit =
81-
new RenamingSnapshotCommit.Factory(lockFactory, lockContext)
82-
.create(identifier, snapshotManager);
83-
return new CatalogSnapshotCommit(catalogLoader.load(), identifier, renamingCommit);
64+
return new CatalogSnapshotCommit(catalogLoader.load(), identifier);
8465
}
8566
}
8667
}

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ public static Table loadTable(
194194
metadata.uuid(),
195195
catalog.catalogLoader(),
196196
lockFactory,
197-
lockContext);
197+
lockContext,
198+
catalog.supportsVersionManagement());
198199
Path path = new Path(schema.options().get(PATH.key()));
199200
FileStoreTable table =
200201
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -722,12 +722,15 @@ public List<String> listBranches(Identifier identifier) throws TableNotExistExce
722722
}
723723

724724
@Override
725-
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
726-
// partitions of the REST Catalog server are automatically calculated and do not require special creating.
725+
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
726+
throws TableNotExistException {
727+
// partitions of the REST Catalog server are automatically calculated and do not require
728+
// special creating.
727729
}
728730

729731
@Override
730-
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
732+
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
733+
throws TableNotExistException {
731734
Table table = getTable(identifier);
732735
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
733736
commit.truncatePartitions(partitions);
@@ -737,8 +740,10 @@ public void dropPartitions(Identifier identifier, List<Map<String, String>> part
737740
}
738741

739742
@Override
740-
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions) throws TableNotExistException {
741-
// The partition statistics of the REST Catalog server are automatically calculated and do not require special reporting.
743+
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
744+
throws TableNotExistException {
745+
// The partition statistics of the REST Catalog server are automatically calculated and do
746+
// not require special reporting.
742747
}
743748

744749
@Override

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -711,14 +711,12 @@ public TagManager tagManager() {
711711

712712
@Override
713713
public BranchManager branchManager() {
714-
FileSystemBranchManager branchManager =
715-
new FileSystemBranchManager(
716-
fileIO, path, snapshotManager(), tagManager(), schemaManager());
717-
if (catalogEnvironment.catalogLoader() != null) {
718-
return new CatalogBranchManager(
719-
catalogEnvironment.catalogLoader(), identifier(), branchManager);
714+
if (catalogEnvironment.catalogLoader() != null
715+
&& catalogEnvironment.supportsVersionManagement()) {
716+
return new CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier());
720717
}
721-
return branchManager;
718+
return new FileSystemBranchManager(
719+
fileIO, path, snapshotManager(), tagManager(), schemaManager());
722720
}
723721

724722
@Override

paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class CatalogEnvironment implements Serializable {
4444
@Nullable private final CatalogLoader catalogLoader;
4545
@Nullable private final CatalogLockFactory lockFactory;
4646
@Nullable private final CatalogLockContext lockContext;
47+
private final boolean supportsVersionManagement;
4748

4849
public CatalogEnvironment(
4950
@Nullable Identifier identifier,
@@ -57,10 +58,11 @@ public CatalogEnvironment(
5758
this.catalogLoader = catalogLoader;
5859
this.lockFactory = lockFactory;
5960
this.lockContext = lockContext;
61+
this.supportsVersionManagement = supportsVersionManagement;
6062
}
6163

6264
public static CatalogEnvironment empty() {
63-
return new CatalogEnvironment(null, null, null, null, null);
65+
return new CatalogEnvironment(null, null, null, null, null, false);
6466
}
6567

6668
@Nullable
@@ -82,13 +84,17 @@ public PartitionHandler partitionHandler() {
8284
return PartitionHandler.create(catalog, identifier);
8385
}
8486

87+
public boolean supportsVersionManagement() {
88+
return supportsVersionManagement;
89+
}
90+
8591
@Nullable
8692
public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) {
8793
SnapshotCommit.Factory factory;
88-
if (catalogLoader == null) {
89-
factory = new RenamingSnapshotCommit.Factory(lockFactory, lockContext);
94+
if (catalogLoader != null && supportsVersionManagement) {
95+
factory = new CatalogSnapshotCommit.Factory(catalogLoader);
9096
} else {
91-
factory = new CatalogSnapshotCommit.Factory(catalogLoader, lockFactory, lockContext);
97+
factory = new RenamingSnapshotCommit.Factory(lockFactory, lockContext);
9298
}
9399
return factory.create(identifier, snapshotManager);
94100
}

paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,10 @@ public class CatalogBranchManager implements BranchManager {
3131

3232
private final CatalogLoader catalogLoader;
3333
private final Identifier identifier;
34-
private final FileSystemBranchManager branchManager;
3534

36-
public CatalogBranchManager(
37-
CatalogLoader catalogLoader,
38-
Identifier identifier,
39-
FileSystemBranchManager branchManager) {
35+
public CatalogBranchManager(CatalogLoader catalogLoader, Identifier identifier) {
4036
this.catalogLoader = catalogLoader;
4137
this.identifier = identifier;
42-
this.branchManager = branchManager;
4338
}
4439

4540
private void executePost(ThrowingConsumer<Catalog, Exception> func) {
@@ -62,46 +57,26 @@ private <T> T executeGet(FunctionWithException<Catalog, T, Exception> func) {
6257

6358
@Override
6459
public void createBranch(String branchName) {
65-
try {
66-
executePost(catalog -> catalog.createBranch(identifier, branchName, null));
67-
} catch (UnsupportedOperationException e) {
68-
branchManager.createBranch(branchName);
69-
}
60+
executePost(catalog -> catalog.createBranch(identifier, branchName, null));
7061
}
7162

7263
@Override
7364
public void createBranch(String branchName, @Nullable String tagName) {
74-
try {
75-
executePost(catalog -> catalog.createBranch(identifier, branchName, tagName));
76-
} catch (UnsupportedOperationException e) {
77-
branchManager.createBranch(branchName, tagName);
78-
}
65+
executePost(catalog -> catalog.createBranch(identifier, branchName, tagName));
7966
}
8067

8168
@Override
8269
public void dropBranch(String branchName) {
83-
try {
84-
executePost(catalog -> catalog.dropBranch(identifier, branchName));
85-
} catch (UnsupportedOperationException e) {
86-
branchManager.dropBranch(branchName);
87-
}
70+
executePost(catalog -> catalog.dropBranch(identifier, branchName));
8871
}
8972

9073
@Override
9174
public void fastForward(String branchName) {
92-
try {
93-
executePost(catalog -> catalog.fastForward(identifier, branchName));
94-
} catch (UnsupportedOperationException e) {
95-
branchManager.fastForward(branchName);
96-
}
75+
executePost(catalog -> catalog.fastForward(identifier, branchName));
9776
}
9877

9978
@Override
10079
public List<String> branches() {
101-
try {
102-
return executeGet(catalog -> catalog.listBranches(identifier));
103-
} catch (UnsupportedOperationException e) {
104-
return branchManager.branches();
105-
}
80+
return executeGet(catalog -> catalog.listBranches(identifier));
10681
}
10782
}

paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void close() throws Exception {}
134134
};
135135

136136
CatalogEnvironment env =
137-
new CatalogEnvironment(null, null, null, null, null) {
137+
new CatalogEnvironment(null, null, null, null, null, false) {
138138

139139
@Override
140140
public PartitionHandler partitionHandler() {

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1541,7 +1541,8 @@ private FileStoreTable getFileTable(Identifier identifier)
15411541
tableMetadata.uuid(),
15421542
catalog.catalogLoader(),
15431543
catalog.lockFactory().orElse(null),
1544-
catalog.lockContext().orElse(null));
1544+
catalog.lockContext().orElse(null),
1545+
false);
15451546
Path path = new Path(schema.options().get(PATH.key()));
15461547
FileIO dataFileIO = catalog.fileIO();
15471548
FileStoreTable table =

0 commit comments

Comments
 (0)