Skip to content

Commit df10e65

Browse files
committed
fix
1 parent a5dc3ef commit df10e65

File tree

5 files changed

+112
-49
lines changed

5 files changed

+112
-49
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.paimon.table.Table;
3939
import org.apache.paimon.table.TableSnapshot;
4040
import org.apache.paimon.table.object.ObjectTable;
41+
import org.apache.paimon.table.sink.BatchTableCommit;
4142
import org.apache.paimon.table.system.SystemTableLoader;
4243
import org.apache.paimon.types.RowType;
4344

@@ -479,6 +480,31 @@ public void rollbackTo(Identifier identifier, Instant instant)
479480
throw new UnsupportedOperationException();
480481
}
481482

483+
@Override
484+
public boolean supportsVersionManagement() {
485+
return false;
486+
}
487+
488+
@Override
489+
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
490+
491+
}
492+
493+
@Override
494+
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
495+
Table table = getTable(identifier);
496+
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
497+
commit.truncatePartitions(partitions);
498+
} catch (Exception e) {
499+
throw new RuntimeException(e);
500+
}
501+
}
502+
503+
@Override
504+
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions) throws TableNotExistException {
505+
506+
}
507+
482508
/**
483509
* Create a {@link FormatTable} identified by the given {@link Identifier}.
484510
*

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 53 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,53 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
464464
throw new UnsupportedOperationException();
465465
}
466466

467-
// ==================== Branch methods ==========================
467+
// ==================== Version management methods ==========================
468+
469+
/**
470+
* Whether this catalog supports version management for tables. If not, corresponding methods will throw an {@link UnsupportedOperationException}, affect the following methods:
471+
*
472+
* <ul>
473+
* <li>{@link #commitSnapshot(Identifier, Snapshot, List)}.
474+
* <li>{@link #loadSnapshot(Identifier)}.
475+
* <li>{@link #rollbackTo(Identifier, Instant)}.
476+
* <li>{@link #createBranch(Identifier, String, String)}.
477+
* <li>{@link #dropBranch(Identifier, String)}.
478+
* <li>{@link #listBranches(Identifier)}.
479+
* </ul>
480+
*/
481+
boolean supportsVersionManagement();
482+
483+
/**
484+
* Commit the {@link Snapshot} for table identified by the given {@link Identifier}.
485+
*
486+
* @param identifier Path of the table
487+
* @param snapshot Snapshot to be committed
488+
* @param statistics statistics information of this change
489+
* @return Success or not
490+
* @throws Catalog.TableNotExistException if the target does not exist
491+
*/
492+
boolean commitSnapshot(
493+
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
494+
throws Catalog.TableNotExistException;
495+
496+
/**
497+
* Return the snapshot of table identified by the given {@link Identifier}.
498+
*
499+
* @param identifier Path of the table
500+
* @return The requested snapshot of the table
501+
* @throws Catalog.TableNotExistException if the target does not exist
502+
*/
503+
Optional<TableSnapshot> loadSnapshot(Identifier identifier)
504+
throws Catalog.TableNotExistException;
505+
506+
/**
507+
* rollback table by the given {@link Identifier} and instant.
508+
*
509+
* @param identifier path of the table
510+
* @param instant like snapshotId or tagName
511+
* @throws Catalog.TableNotExistException if the table does not exist
512+
*/
513+
void rollbackTo(Identifier identifier, Instant instant) throws Catalog.TableNotExistException;
468514

469515
/**
470516
* Create a new branch for this table. By default, an empty branch will be created using the
@@ -507,40 +553,6 @@ void createBranch(Identifier identifier, String branch, @Nullable String fromTag
507553
*/
508554
List<String> listBranches(Identifier identifier) throws TableNotExistException;
509555

510-
// ==================== Snapshot Operations ==========================
511-
512-
/**
513-
* Commit the {@link Snapshot} for table identified by the given {@link Identifier}.
514-
*
515-
* @param identifier Path of the table
516-
* @param snapshot Snapshot to be committed
517-
* @param statistics statistics information of this change
518-
* @return Success or not
519-
* @throws Catalog.TableNotExistException if the target does not exist
520-
*/
521-
boolean commitSnapshot(
522-
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
523-
throws Catalog.TableNotExistException;
524-
525-
/**
526-
* Return the snapshot of table identified by the given {@link Identifier}.
527-
*
528-
* @param identifier Path of the table
529-
* @return The requested snapshot of the table
530-
* @throws Catalog.TableNotExistException if the target does not exist
531-
*/
532-
Optional<TableSnapshot> loadSnapshot(Identifier identifier)
533-
throws Catalog.TableNotExistException;
534-
535-
/**
536-
* rollback table by the given {@link Identifier} and instant.
537-
*
538-
* @param identifier path of the table
539-
* @param instant like snapshotId or tagName
540-
* @throws Catalog.TableNotExistException if the table does not exist
541-
*/
542-
void rollbackTo(Identifier identifier, Instant instant) throws Catalog.TableNotExistException;
543-
544556
// ==================== Partition Modifications ==========================
545557

546558
/**
@@ -550,8 +562,8 @@ Optional<TableSnapshot> loadSnapshot(Identifier identifier)
550562
* @param partitions partitions to be created
551563
* @throws TableNotExistException if the table does not exist
552564
*/
553-
default void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
554-
throws TableNotExistException {}
565+
void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
566+
throws TableNotExistException;
555567

556568
/**
557569
* Drop partitions of the specify table. Ignore non-existent partitions.
@@ -560,15 +572,8 @@ default void createPartitions(Identifier identifier, List<Map<String, String>> p
560572
* @param partitions partitions to be deleted
561573
* @throws TableNotExistException if the table does not exist
562574
*/
563-
default void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
564-
throws TableNotExistException {
565-
Table table = getTable(identifier);
566-
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
567-
commit.truncatePartitions(partitions);
568-
} catch (Exception e) {
569-
throw new RuntimeException(e);
570-
}
571-
}
575+
void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
576+
throws TableNotExistException;
572577

573578
/**
574579
* Alter partitions of the specify table. For non-existent partitions, partitions will be
@@ -578,8 +583,8 @@ default void dropPartitions(Identifier identifier, List<Map<String, String>> par
578583
* @param partitions partitions to be altered
579584
* @throws TableNotExistException if the table does not exist
580585
*/
581-
default void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
582-
throws TableNotExistException {}
586+
void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
587+
throws TableNotExistException;
583588

584589
// ==================== Catalog Information ==========================
585590

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ public void alterTable(
135135
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
136136
}
137137

138+
@Override
139+
public boolean supportsVersionManagement() {
140+
return wrapped.supportsVersionManagement();
141+
}
142+
138143
@Override
139144
public Optional<TableSnapshot> loadSnapshot(Identifier identifier)
140145
throws TableNotExistException {

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.apache.paimon.table.Instant;
7777
import org.apache.paimon.table.Table;
7878
import org.apache.paimon.table.TableSnapshot;
79+
import org.apache.paimon.table.sink.BatchTableCommit;
7980
import org.apache.paimon.table.system.SystemTableLoader;
8081
import org.apache.paimon.utils.Pair;
8182
import org.apache.paimon.view.View;
@@ -378,6 +379,11 @@ public Optional<TableSnapshot> loadSnapshot(Identifier identifier)
378379
return Optional.of(response.getSnapshot());
379380
}
380381

382+
@Override
383+
public boolean supportsVersionManagement() {
384+
return true;
385+
}
386+
381387
@Override
382388
public boolean commitSnapshot(
383389
Identifier identifier, Snapshot snapshot, List<PartitionStatistics> statistics)
@@ -715,6 +721,26 @@ public List<String> listBranches(Identifier identifier) throws TableNotExistExce
715721
}
716722
}
717723

724+
@Override
725+
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
726+
// partitions of the REST Catalog server are automatically calculated and do not require special creating.
727+
}
728+
729+
@Override
730+
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions) throws TableNotExistException {
731+
Table table = getTable(identifier);
732+
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
733+
commit.truncatePartitions(partitions);
734+
} catch (Exception e) {
735+
throw new RuntimeException(e);
736+
}
737+
}
738+
739+
@Override
740+
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions) throws TableNotExistException {
741+
// The partition statistics of the REST Catalog server are automatically calculated and do not require special reporting.
742+
}
743+
718744
@Override
719745
public View getView(Identifier identifier) throws ViewNotExistException {
720746
try {

paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public CatalogEnvironment(
5050
@Nullable String uuid,
5151
@Nullable CatalogLoader catalogLoader,
5252
@Nullable CatalogLockFactory lockFactory,
53-
@Nullable CatalogLockContext lockContext) {
53+
@Nullable CatalogLockContext lockContext,
54+
boolean supportsVersionManagement) {
5455
this.identifier = identifier;
5556
this.uuid = uuid;
5657
this.catalogLoader = catalogLoader;

0 commit comments

Comments
 (0)