Skip to content

Commit 9660379

Browse files
committed
[core] Introduce SupportsBranches to put branch management to Catalog
1 parent 07b9776 commit 9660379

File tree

28 files changed

+612
-351
lines changed

28 files changed

+612
-351
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
6060
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
6161
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
62-
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
62+
import static org.apache.paimon.utils.FileSystemBranchManager.DEFAULT_MAIN_BRANCH;
6363
import static org.apache.paimon.utils.Preconditions.checkArgument;
6464

6565
/** Common implementation of {@link Catalog}. */
@@ -366,16 +366,14 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>
366366

367367
@Override
368368
public Table getTable(Identifier identifier) throws TableNotExistException {
369-
SnapshotCommit.Factory commitFactory =
370-
new RenamingSnapshotCommit.Factory(
371-
lockFactory().orElse(null), lockContext().orElse(null));
372369
return CatalogUtils.loadTable(
373370
this,
374371
identifier,
375372
p -> fileIO(),
376373
this::fileIO,
377374
this::loadTableMetadata,
378-
commitFactory);
375+
lockFactory().orElse(null),
376+
lockContext().orElse(null));
379377
}
380378

381379
/**
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.partition.Partition;
23+
import org.apache.paimon.utils.SnapshotManager;
24+
25+
import java.util.List;
26+
27+
/** A {@link SnapshotCommit} using {@link Catalog} to commit. */
28+
public class CatalogSnapshotCommit implements SnapshotCommit {
29+
30+
private final SupportsSnapshots supportsSnapshots;
31+
private final Identifier identifier;
32+
33+
public CatalogSnapshotCommit(SupportsSnapshots supportsSnapshots, Identifier identifier) {
34+
this.supportsSnapshots = supportsSnapshots;
35+
this.identifier = identifier;
36+
}
37+
38+
@Override
39+
public boolean commit(Snapshot snapshot, String branch, List<Partition> statistics)
40+
throws Exception {
41+
Identifier newIdentifier =
42+
new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branch);
43+
return supportsSnapshots.commitSnapshot(newIdentifier, snapshot, statistics);
44+
}
45+
46+
@Override
47+
public void close() throws Exception {
48+
supportsSnapshots.close();
49+
}
50+
51+
/** Factory to create {@link CatalogSnapshotCommit}. */
52+
public static class Factory implements SnapshotCommit.Factory {
53+
54+
private static final long serialVersionUID = 1L;
55+
56+
private final CatalogLoader catalogLoader;
57+
58+
public Factory(CatalogLoader catalogLoader) {
59+
this.catalogLoader = catalogLoader;
60+
}
61+
62+
@Override
63+
public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) {
64+
return new CatalogSnapshotCommit((SupportsSnapshots) catalogLoader.load(), identifier);
65+
}
66+
}
67+
}

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.apache.paimon.utils.InternalRowPartitionComputer;
4040
import org.apache.paimon.utils.Preconditions;
4141

42+
import javax.annotation.Nullable;
43+
4244
import java.util.ArrayList;
4345
import java.util.HashMap;
4446
import java.util.List;
@@ -169,7 +171,8 @@ public static Table loadTable(
169171
Function<Path, FileIO> internalFileIO,
170172
Function<Path, FileIO> externalFileIO,
171173
TableMetadata.Loader metadataLoader,
172-
SnapshotCommit.Factory commitFactory)
174+
@Nullable CatalogLockFactory lockFactory,
175+
@Nullable CatalogLockContext lockContext)
173176
throws Catalog.TableNotExistException {
174177
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
175178
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
@@ -190,8 +193,10 @@ public static Table loadTable(
190193
identifier,
191194
metadata.uuid(),
192195
catalog.catalogLoader(),
193-
commitFactory,
194-
catalog instanceof SupportsSnapshots);
196+
lockFactory,
197+
lockContext,
198+
catalog instanceof SupportsSnapshots,
199+
catalog instanceof SupportsBranches);
195200
Path path = new Path(schema.options().get(PATH.key()));
196201
FileStoreTable table =
197202
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);

paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.paimon.types.DataField;
2323
import org.apache.paimon.types.DataTypes;
2424
import org.apache.paimon.types.RowType;
25-
import org.apache.paimon.utils.BranchManager;
25+
import org.apache.paimon.utils.FileSystemBranchManager;
2626
import org.apache.paimon.utils.Preconditions;
2727
import org.apache.paimon.utils.StringUtils;
2828

@@ -139,7 +139,7 @@ public String getTableName() {
139139
@JsonIgnore
140140
public String getBranchNameOrDefault() {
141141
String branch = getBranchName();
142-
return branch == null ? BranchManager.DEFAULT_MAIN_BRANCH : branch;
142+
return branch == null ? FileSystemBranchManager.DEFAULT_MAIN_BRANCH : branch;
143143
}
144144

145145
@JsonIgnore
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import javax.annotation.Nullable;
22+
23+
import java.util.List;
24+
25+
/** A {@link Catalog} supports creating and dropping table branches. */
26+
public interface SupportsBranches extends Catalog {
27+
28+
/**
29+
* Create a new branch for this table. By default, an empty branch will be created using the
30+
* latest schema. If you provide {@code #fromTag}, a branch will be created from the tag and the
31+
* data files will be inherited from it.
32+
*
33+
* @param identifier path of the table, cannot be system or branch name.
34+
* @param branch the branch name
35+
* @param fromTag from the tag
36+
* @throws TableNotExistException if the table in identifier doesn't exist
37+
* @throws DatabaseNotExistException if the database in identifier doesn't exist
38+
*/
39+
void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
40+
throws TableNotExistException, DatabaseNotExistException;
41+
42+
/**
43+
* Drop the branch for this table.
44+
*
45+
* @param identifier path of the table, cannot be system or branch name.
46+
* @param branch the branch name
47+
* @throws TableNotExistException if the table in identifier doesn't exist
48+
* @throws DatabaseNotExistException if the database in identifier doesn't exist
49+
*/
50+
void dropBranch(Identifier identifier, String branch)
51+
throws TableNotExistException, DatabaseNotExistException;
52+
53+
/**
54+
* Fast-forward a branch to main branch.
55+
*
56+
* @param identifier path of the table, cannot be system or branch name.
57+
* @param branch the branch name
58+
* @throws TableNotExistException if the table in identifier doesn't exist
59+
* @throws DatabaseNotExistException if the database in identifier doesn't exist
60+
*/
61+
void fastForward(Identifier identifier, String branch)
62+
throws TableNotExistException, DatabaseNotExistException;
63+
64+
/**
65+
* List all branches of the table.
66+
*
67+
* @param identifier path of the table, cannot be system or branch name.
68+
* @throws TableNotExistException if the table in identifier doesn't exist
69+
* @throws DatabaseNotExistException if the database in identifier doesn't exist
70+
*/
71+
List<String> listBranches(Identifier identifier)
72+
throws TableNotExistException, DatabaseNotExistException;
73+
}

paimon-core/src/main/java/org/apache/paimon/catalog/SupportsSnapshots.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,25 @@
1919
package org.apache.paimon.catalog;
2020

2121
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.partition.Partition;
2223

24+
import java.util.List;
2325
import java.util.Optional;
2426

25-
/** A {@link Catalog} supports loading table snapshots. */
26-
public interface SupportsSnapshots {
27+
/** A {@link Catalog} supports committing and loading table snapshots. */
28+
public interface SupportsSnapshots extends Catalog {
29+
30+
/**
31+
* Commit the {@link Snapshot} for table identified by the given {@link Identifier}.
32+
*
33+
* @param identifier Path of the table
34+
* @param snapshot Snapshot to be committed
35+
* @param statistics statistics information of this change
36+
* @return Success or not
37+
* @throws Catalog.TableNotExistException if the target does not exist
38+
*/
39+
boolean commitSnapshot(Identifier identifier, Snapshot snapshot, List<Partition> statistics)
40+
throws Catalog.TableNotExistException;
2741

2842
/**
2943
* Return the snapshot of table identified by the given {@link Identifier}.

paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.paimon.manifest.ManifestList;
3131
import org.apache.paimon.schema.SchemaManager;
3232
import org.apache.paimon.table.FileStoreTable;
33-
import org.apache.paimon.utils.BranchManager;
3433
import org.apache.paimon.utils.DateTimeUtils;
3534
import org.apache.paimon.utils.FileStorePathFactory;
35+
import org.apache.paimon.utils.FileSystemBranchManager;
3636
import org.apache.paimon.utils.Pair;
3737
import org.apache.paimon.utils.Preconditions;
3838
import org.apache.paimon.utils.SnapshotManager;
@@ -103,7 +103,7 @@ public OrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryR
103103

104104
protected List<String> validBranches() {
105105
List<String> branches = table.branchManager().branches();
106-
branches.add(BranchManager.DEFAULT_MAIN_BRANCH);
106+
branches.add(FileSystemBranchManager.DEFAULT_MAIN_BRANCH);
107107

108108
List<String> abnormalBranches = new ArrayList<>();
109109
for (String branch : branches) {

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
283283
path -> fileIOForData(path, identifier),
284284
this::fileIOFromOptions,
285285
this::loadTableMetadata,
286-
new RESTSnapshotCommitFactory(catalogLoader()));
286+
null,
287+
null);
287288
}
288289

289290
private FileIO fileIOForData(Path path, Identifier identifier) {
@@ -340,15 +341,26 @@ public Optional<Snapshot> loadSnapshot(Identifier identifier) throws TableNotExi
340341
return Optional.of(response.getSnapshot());
341342
}
342343

344+
@Override
343345
public boolean commitSnapshot(
344-
Identifier identifier, Snapshot snapshot, List<Partition> statistics) {
346+
Identifier identifier, Snapshot snapshot, List<Partition> statistics)
347+
throws TableNotExistException {
345348
CommitTableRequest request = new CommitTableRequest(identifier, snapshot, statistics);
346-
CommitTableResponse response =
347-
client.post(
348-
resourcePaths.commitTable(identifier.getDatabaseName()),
349-
request,
350-
CommitTableResponse.class,
351-
restAuthFunction);
349+
CommitTableResponse response;
350+
351+
try {
352+
response =
353+
client.post(
354+
resourcePaths.commitTable(identifier.getDatabaseName()),
355+
request,
356+
CommitTableResponse.class,
357+
restAuthFunction);
358+
} catch (NoSuchResourceException e) {
359+
throw new TableNotExistException(identifier);
360+
} catch (ForbiddenException e) {
361+
throw new TableNoPermissionException(identifier, e);
362+
}
363+
352364
return response.isSuccess();
353365
}
354366

paimon-core/src/main/java/org/apache/paimon/rest/RESTSnapshotCommitFactory.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)