Skip to content

Commit 2d760ba

Browse files
committed
fix
1 parent bc59de3 commit 2d760ba

File tree

22 files changed

+401
-370
lines changed

22 files changed

+401
-370
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,28 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.PagedList;
23+
import org.apache.paimon.Snapshot;
2324
import org.apache.paimon.factories.FactoryUtil;
2425
import org.apache.paimon.fs.FileIO;
2526
import org.apache.paimon.fs.FileStatus;
2627
import org.apache.paimon.fs.Path;
2728
import org.apache.paimon.options.Options;
2829
import org.apache.paimon.partition.Partition;
30+
import org.apache.paimon.partition.PartitionStatistics;
2931
import org.apache.paimon.schema.Schema;
3032
import org.apache.paimon.schema.SchemaChange;
3133
import org.apache.paimon.schema.SchemaManager;
3234
import org.apache.paimon.schema.TableSchema;
3335
import org.apache.paimon.table.FileStoreTable;
3436
import org.apache.paimon.table.FormatTable;
3537
import org.apache.paimon.table.Table;
38+
import org.apache.paimon.table.TableSnapshot;
3639
import org.apache.paimon.table.object.ObjectTable;
3740
import org.apache.paimon.table.system.SystemTableLoader;
3841
import org.apache.paimon.types.RowType;
42+
import org.apache.paimon.utils.FileSystemBranchManager;
3943

44+
import org.jetbrains.annotations.Nullable;
4045
import org.slf4j.Logger;
4146
import org.slf4j.LoggerFactory;
4247

@@ -435,6 +440,66 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
435440
lockContext().orElse(null));
436441
}
437442

443+
private FileSystemBranchManager fileSystemBranchManager(Identifier identifier)
444+
throws TableNotExistException {
445+
FileStoreTable table = (FileStoreTable) getTable(identifier);
446+
return new FileSystemBranchManager(
447+
table.fileIO(),
448+
table.location(),
449+
table.snapshotManager(),
450+
table.tagManager(),
451+
table.schemaManager());
452+
}
453+
454+
@Override
455+
public void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
456+
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException {
457+
FileSystemBranchManager branchManager = fileSystemBranchManager(identifier);
458+
if (fromTag == null) {
459+
branchManager.createBranch(branch);
460+
} else {
461+
branchManager.createBranch(branch, fromTag);
462+
}
463+
}
464+
465+
@Override
466+
public void dropBranch(Identifier identifier, String branch) throws BranchNotExistException {
467+
FileSystemBranchManager branchManager;
468+
try {
469+
branchManager = fileSystemBranchManager(identifier);
470+
} catch (TableNotExistException e) {
471+
throw new BranchNotExistException(identifier, branch);
472+
}
473+
branchManager.dropBranch(branch);
474+
}
475+
476+
@Override
477+
public void fastForward(Identifier identifier, String branch) throws BranchNotExistException {
478+
FileSystemBranchManager branchManager;
479+
try {
480+
branchManager = fileSystemBranchManager(identifier);
481+
} catch (TableNotExistException e) {
482+
throw new BranchNotExistException(identifier, branch);
483+
}
484+
branchManager.fastForward(branch);
485+
}
486+
487+
@Override
488+
public List<String> listBranches(Identifier identifier) throws TableNotExistException {
489+
return fileSystemBranchManager(identifier).branches();
490+
}
491+
492+
@Override
493+
public boolean commitSnapshot(
494+
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics) {
495+
throw new UnsupportedOperationException();
496+
}
497+
498+
@Override
499+
public Optional<TableSnapshot> loadSnapshot(Identifier identifier) {
500+
throw new UnsupportedOperationException();
501+
}
502+
438503
/**
439504
* Create a {@link FormatTable} identified by the given {@link Identifier}.
440505
*

paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.options.MemorySize;
2424
import org.apache.paimon.options.Options;
2525
import org.apache.paimon.partition.Partition;
26+
import org.apache.paimon.partition.PartitionStatistics;
2627
import org.apache.paimon.schema.SchemaChange;
2728
import org.apache.paimon.table.FileStoreTable;
2829
import org.apache.paimon.table.Table;
@@ -271,6 +272,24 @@ public List<Partition> listPartitions(Identifier identifier) throws TableNotExis
271272
return result;
272273
}
273274

275+
@Override
276+
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
277+
throws TableNotExistException {
278+
wrapped.dropPartitions(identifier, partitions);
279+
if (partitionCache != null) {
280+
partitionCache.invalidate(identifier);
281+
}
282+
}
283+
284+
@Override
285+
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
286+
throws TableNotExistException {
287+
wrapped.alterPartitions(identifier, partitions);
288+
if (partitionCache != null) {
289+
partitionCache.invalidate(identifier);
290+
}
291+
}
292+
274293
@Override
275294
public void invalidateTable(Identifier identifier) {
276295
tableCache.invalidate(identifier);

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,23 @@
1919
package org.apache.paimon.catalog;
2020

2121
import org.apache.paimon.PagedList;
22+
import org.apache.paimon.Snapshot;
2223
import org.apache.paimon.annotation.Public;
2324
import org.apache.paimon.partition.Partition;
25+
import org.apache.paimon.partition.PartitionStatistics;
2426
import org.apache.paimon.schema.Schema;
2527
import org.apache.paimon.schema.SchemaChange;
2628
import org.apache.paimon.table.Table;
29+
import org.apache.paimon.table.TableSnapshot;
30+
import org.apache.paimon.table.sink.BatchTableCommit;
2731
import org.apache.paimon.view.View;
2832

2933
import javax.annotation.Nullable;
3034

3135
import java.util.Collections;
3236
import java.util.List;
3337
import java.util.Map;
38+
import java.util.Optional;
3439

3540
/**
3641
* This interface is responsible for reading and writing metadata such as database/table from a
@@ -458,6 +463,114 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
458463
throw new UnsupportedOperationException();
459464
}
460465

466+
// ==================== Branch methods ==========================
467+
468+
/**
469+
* Create a new branch for this table. By default, an empty branch will be created using the
470+
* latest schema. If you provide {@code #fromTag}, a branch will be created from the tag and the
471+
* data files will be inherited from it.
472+
*
473+
* @param identifier path of the table, cannot be system or branch name.
474+
* @param branch the branch name
475+
* @param fromTag from the tag
476+
* @throws TableNotExistException if the table in identifier doesn't exist
477+
* @throws BranchAlreadyExistException if the branch already exists
478+
* @throws TagNotExistException if the tag doesn't exist
479+
*/
480+
void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
481+
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException;
482+
483+
/**
484+
* Drop the branch for this table.
485+
*
486+
* @param identifier path of the table, cannot be system or branch name.
487+
* @param branch the branch name
488+
* @throws BranchNotExistException if the branch doesn't exist
489+
*/
490+
void dropBranch(Identifier identifier, String branch) throws BranchNotExistException;
491+
492+
/**
493+
* Fast-forward a branch to main branch.
494+
*
495+
* @param identifier path of the table, cannot be system or branch name.
496+
* @param branch the branch name
497+
* @throws BranchNotExistException if the branch doesn't exist
498+
*/
499+
void fastForward(Identifier identifier, String branch) throws BranchNotExistException;
500+
501+
/**
502+
* List all branches of the table.
503+
*
504+
* @param identifier path of the table, cannot be system or branch name.
505+
* @throws TableNotExistException if the table in identifier doesn't exist
506+
*/
507+
List<String> listBranches(Identifier identifier) throws TableNotExistException;
508+
509+
// ==================== Snapshot Operations ==========================
510+
511+
/**
512+
* Commit the {@link Snapshot} for table identified by the given {@link Identifier}.
513+
*
514+
* @param identifier Path of the table
515+
* @param snapshot Snapshot to be committed
516+
* @param statistics statistics information of this change
517+
* @return Success or not
518+
* @throws Catalog.TableNotExistException if the target does not exist
519+
*/
520+
boolean commitSnapshot(
521+
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
522+
throws Catalog.TableNotExistException;
523+
524+
/**
525+
* Return the snapshot of table identified by the given {@link Identifier}.
526+
*
527+
* @param identifier Path of the table
528+
* @return The requested snapshot of the table
529+
* @throws Catalog.TableNotExistException if the target does not exist
530+
*/
531+
Optional<TableSnapshot> loadSnapshot(Identifier identifier)
532+
throws Catalog.TableNotExistException;
533+
534+
// ==================== Partition Modifications ==========================
535+
536+
/**
537+
* Create partitions of the specify table. Ignore existing partitions.
538+
*
539+
* @param identifier path of the table to create partitions
540+
* @param partitions partitions to be created
541+
* @throws TableNotExistException if the table does not exist
542+
*/
543+
default void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
544+
throws TableNotExistException {}
545+
546+
/**
547+
* Drop partitions of the specify table. Ignore non-existent partitions.
548+
*
549+
* @param identifier path of the table to drop partitions
550+
* @param partitions partitions to be deleted
551+
* @throws TableNotExistException if the table does not exist
552+
*/
553+
default void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
554+
throws TableNotExistException {
555+
Table table = getTable(identifier);
556+
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
557+
commit.truncatePartitions(partitions);
558+
} catch (Exception e) {
559+
throw new RuntimeException(e);
560+
}
561+
}
562+
563+
/**
564+
* Alter partitions of the specify table. For non-existent partitions, partitions will be
565+
* created directly.
566+
*
567+
* @param identifier path of the table to alter partitions
568+
* @param partitions partitions to be altered
569+
* @throws TableNotExistException if the table does not exist
570+
*/
571+
default void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
572+
throws TableNotExistException {}
573+
461574
// ==================== Catalog Information ==========================
462575

463576
/** Catalog options for re-creating this catalog. */
@@ -741,4 +854,85 @@ public Identifier identifier() {
741854
return identifier;
742855
}
743856
}
857+
858+
/** Exception for trying to create a branch that already exists. */
859+
class BranchAlreadyExistException extends Exception {
860+
861+
private static final String MSG = "Branch %s in table %s already exists.";
862+
863+
private final Identifier identifier;
864+
private final String branch;
865+
866+
public BranchAlreadyExistException(Identifier identifier, String branch) {
867+
this(identifier, branch, null);
868+
}
869+
870+
public BranchAlreadyExistException(Identifier identifier, String branch, Throwable cause) {
871+
super(String.format(MSG, branch, identifier.getFullName()), cause);
872+
this.identifier = identifier;
873+
this.branch = branch;
874+
}
875+
876+
public Identifier identifier() {
877+
return identifier;
878+
}
879+
880+
public String branch() {
881+
return branch;
882+
}
883+
}
884+
885+
/** Exception for trying to operate on a branch that doesn't exist. */
886+
class BranchNotExistException extends Exception {
887+
888+
private static final String MSG = "Branch %s in table %s doesn't exist.";
889+
890+
private final Identifier identifier;
891+
private final String branch;
892+
893+
public BranchNotExistException(Identifier identifier, String branch) {
894+
this(identifier, branch, null);
895+
}
896+
897+
public BranchNotExistException(Identifier identifier, String branch, Throwable cause) {
898+
super(String.format(MSG, branch, identifier.getFullName()), cause);
899+
this.identifier = identifier;
900+
this.branch = branch;
901+
}
902+
903+
public Identifier identifier() {
904+
return identifier;
905+
}
906+
907+
public String branch() {
908+
return branch;
909+
}
910+
}
911+
912+
/** Exception for trying to operate on a tag that doesn't exist. */
913+
class TagNotExistException extends Exception {
914+
915+
private static final String MSG = "Tag %s in table %s doesn't exist.";
916+
917+
private final Identifier identifier;
918+
private final String tag;
919+
920+
public TagNotExistException(Identifier identifier, String tag) {
921+
this(identifier, tag, null);
922+
}
923+
924+
public TagNotExistException(Identifier identifier, String tag, Throwable cause) {
925+
super(String.format(MSG, tag, identifier.getFullName()), cause);
926+
this.identifier = identifier;
927+
this.tag = tag;
928+
}
929+
930+
public Identifier identifier() {
931+
return identifier;
932+
}
933+
934+
public String tag() {
935+
return tag;
936+
}
937+
}
744938
}

0 commit comments

Comments
 (0)