Skip to content

Commit 2e76639

Browse files
committed
[core] Fast forward branch should refresh table in CachingCatalog
1 parent b5ee79a commit 2e76639

File tree

16 files changed

+291
-213
lines changed

16 files changed

+291
-213
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@
3737
import org.apache.paimon.table.sink.BatchTableCommit;
3838
import org.apache.paimon.table.system.SystemTableLoader;
3939
import org.apache.paimon.types.RowType;
40+
import org.apache.paimon.utils.BranchManager;
41+
import org.apache.paimon.utils.FileSystemBranchManager;
4042

4143
import org.slf4j.Logger;
4244
import org.slf4j.LoggerFactory;
4345

46+
import javax.annotation.Nullable;
47+
4448
import java.io.IOException;
4549
import java.util.ArrayList;
4650
import java.util.HashMap;
@@ -456,6 +460,56 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
456460
lockContext().orElse(null));
457461
}
458462

463+
@Override
464+
public void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
465+
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException {
466+
BranchManager branchManager = fileSystemBranchManager(identifier);
467+
if (fromTag == null) {
468+
branchManager.createBranch(branch);
469+
} else {
470+
branchManager.createBranch(branch, fromTag);
471+
}
472+
}
473+
474+
@Override
475+
public void dropBranch(Identifier identifier, String branch) throws BranchNotExistException {
476+
try {
477+
fileSystemBranchManager(identifier).dropBranch(branch);
478+
} catch (TableNotExistException e) {
479+
throw new BranchNotExistException(identifier, branch, e);
480+
}
481+
}
482+
483+
@Override
484+
public void fastForward(Identifier identifier, String branch) throws BranchNotExistException {
485+
try {
486+
fileSystemBranchManager(identifier).fastForward(branch);
487+
} catch (TableNotExistException e) {
488+
throw new BranchNotExistException(identifier, branch, e);
489+
}
490+
}
491+
492+
@Override
493+
public List<String> listBranches(Identifier identifier) throws TableNotExistException {
494+
return fileSystemBranchManager(identifier).branches();
495+
}
496+
497+
private BranchManager fileSystemBranchManager(Identifier identifier)
498+
throws TableNotExistException {
499+
Table originTable = getTable(identifier);
500+
if (!(originTable instanceof FileStoreTable)) {
501+
throw new UnsupportedOperationException(
502+
"Unsupported table type: " + originTable.getClass());
503+
}
504+
FileStoreTable table = (FileStoreTable) originTable;
505+
return new FileSystemBranchManager(
506+
table.fileIO(),
507+
table.location(),
508+
table.snapshotManager(),
509+
table.tagManager(),
510+
table.schemaManager());
511+
}
512+
459513
/**
460514
* Create a {@link FormatTable} identified by the given {@link Identifier}.
461515
*

paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,12 @@ public void alterPartitions(Identifier identifier, List<Partition> partitions)
289289
}
290290
}
291291

292+
@Override
293+
public void fastForward(Identifier identifier, String branch) throws BranchNotExistException {
294+
super.fastForward(identifier, branch);
295+
invalidateTable(identifier);
296+
}
297+
292298
@Override
293299
public void invalidateTable(Identifier identifier) {
294300
tableCache.invalidate(identifier);

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,49 @@ default void renameView(Identifier fromView, Identifier toView, boolean ignoreIf
463463
throw new UnsupportedOperationException();
464464
}
465465

466+
// ======================= branch methods ===============================
467+
468+
/**
469+
* Create a new branch for this table. By default, an empty branch will be created using the
470+
* latest schema. If you provide {@code #fromTag}, a branch will be created from the tag and the
471+
* data files will be inherited from it.
472+
*
473+
* @param identifier path of the table, cannot be system or branch name.
474+
* @param branch the branch name
475+
* @param fromTag from the tag
476+
* @throws TableNotExistException if the table in identifier doesn't exist
477+
* @throws BranchAlreadyExistException if the branch already exists
478+
* @throws TagNotExistException if the tag doesn't exist
479+
*/
480+
void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
481+
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException;
482+
483+
/**
484+
* Drop the branch for this table.
485+
*
486+
* @param identifier path of the table, cannot be system or branch name.
487+
* @param branch the branch name
488+
* @throws BranchNotExistException if the branch doesn't exist
489+
*/
490+
void dropBranch(Identifier identifier, String branch) throws BranchNotExistException;
491+
492+
/**
493+
* Fast-forward a branch to main branch.
494+
*
495+
* @param identifier path of the table, cannot be system or branch name.
496+
* @param branch the branch name
497+
* @throws BranchNotExistException if the branch doesn't exist
498+
*/
499+
void fastForward(Identifier identifier, String branch) throws BranchNotExistException;
500+
501+
/**
502+
* List all branches of the table.
503+
*
504+
* @param identifier path of the table, cannot be system or branch name.
505+
* @throws TableNotExistException if the table in identifier doesn't exist
506+
*/
507+
List<String> listBranches(Identifier identifier) throws TableNotExistException;
508+
466509
// ======================= repair methods ===============================
467510

468511
/**
@@ -772,4 +815,85 @@ public Identifier identifier() {
772815
return identifier;
773816
}
774817
}
818+
819+
/** Exception for trying to create a branch that already exists. */
820+
class BranchAlreadyExistException extends Exception {
821+
822+
private static final String MSG = "Branch %s in table %s already exists.";
823+
824+
private final Identifier identifier;
825+
private final String branch;
826+
827+
public BranchAlreadyExistException(Identifier identifier, String branch) {
828+
this(identifier, branch, null);
829+
}
830+
831+
public BranchAlreadyExistException(Identifier identifier, String branch, Throwable cause) {
832+
super(String.format(MSG, branch, identifier.getFullName()), cause);
833+
this.identifier = identifier;
834+
this.branch = branch;
835+
}
836+
837+
public Identifier identifier() {
838+
return identifier;
839+
}
840+
841+
public String branch() {
842+
return branch;
843+
}
844+
}
845+
846+
/** Exception for trying to operate on a branch that doesn't exist. */
847+
class BranchNotExistException extends Exception {
848+
849+
private static final String MSG = "Branch %s in table %s doesn't exist.";
850+
851+
private final Identifier identifier;
852+
private final String branch;
853+
854+
public BranchNotExistException(Identifier identifier, String branch) {
855+
this(identifier, branch, null);
856+
}
857+
858+
public BranchNotExistException(Identifier identifier, String branch, Throwable cause) {
859+
super(String.format(MSG, branch, identifier.getFullName()), cause);
860+
this.identifier = identifier;
861+
this.branch = branch;
862+
}
863+
864+
public Identifier identifier() {
865+
return identifier;
866+
}
867+
868+
public String branch() {
869+
return branch;
870+
}
871+
}
872+
873+
/** Exception for trying to operate on a tag that doesn't exist. */
874+
class TagNotExistException extends Exception {
875+
876+
private static final String MSG = "Tag %s in table %s doesn't exist.";
877+
878+
private final Identifier identifier;
879+
private final String tag;
880+
881+
public TagNotExistException(Identifier identifier, String tag) {
882+
this(identifier, tag, null);
883+
}
884+
885+
public TagNotExistException(Identifier identifier, String tag, Throwable cause) {
886+
super(String.format(MSG, tag, identifier.getFullName()), cause);
887+
this.identifier = identifier;
888+
this.tag = tag;
889+
}
890+
891+
public Identifier identifier() {
892+
return identifier;
893+
}
894+
895+
public String tag() {
896+
return tag;
897+
}
898+
}
775899
}

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,7 @@ public static Table loadTable(
195195
catalog.catalogLoader(),
196196
lockFactory,
197197
lockContext,
198-
catalog instanceof SupportsSnapshots,
199-
catalog instanceof SupportsBranches);
198+
catalog instanceof SupportsSnapshots);
200199
Path path = new Path(schema.options().get(PATH.key()));
201200
FileStoreTable table =
202201
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.paimon.table.Table;
2626
import org.apache.paimon.view.View;
2727

28+
import javax.annotation.Nullable;
29+
2830
import java.util.List;
2931
import java.util.Map;
3032

@@ -211,6 +213,27 @@ public PagedList<Partition> listPartitionsPaged(
211213
return wrapped.listPartitionsPaged(identifier, maxResults, pageToken);
212214
}
213215

216+
@Override
217+
public void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
218+
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException {
219+
wrapped.createBranch(identifier, branch, fromTag);
220+
}
221+
222+
@Override
223+
public void dropBranch(Identifier identifier, String branch) throws BranchNotExistException {
224+
wrapped.dropBranch(identifier, branch);
225+
}
226+
227+
@Override
228+
public void fastForward(Identifier identifier, String branch) throws BranchNotExistException {
229+
wrapped.fastForward(identifier, branch);
230+
}
231+
232+
@Override
233+
public List<String> listBranches(Identifier identifier) throws TableNotExistException {
234+
return wrapped.listBranches(identifier);
235+
}
236+
214237
@Override
215238
public void repairCatalog() {
216239
wrapped.repairCatalog();

0 commit comments

Comments
 (0)