Skip to content

Commit 49f55e6

Browse files
committed
[core] Introduce supportsPartitionModification to Catalog
1 parent 9ca80b3 commit 49f55e6

File tree

25 files changed

+269
-177
lines changed

25 files changed

+269
-177
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import org.apache.paimon.stats.StatsFileHandler;
5757
import org.apache.paimon.table.CatalogEnvironment;
5858
import org.apache.paimon.table.FileStoreTable;
59-
import org.apache.paimon.table.PartitionHandler;
59+
import org.apache.paimon.table.PartitionModification;
6060
import org.apache.paimon.table.sink.CallbackUtils;
6161
import org.apache.paimon.table.sink.CommitCallback;
6262
import org.apache.paimon.table.sink.CommitPreCallback;
@@ -385,23 +385,25 @@ private List<CommitCallback> createCommitCallbacks(String commitUser, FileStoreT
385385
List<CommitCallback> callbacks = new ArrayList<>();
386386

387387
if (options.partitionedTableInMetastore() && !schema.partitionKeys().isEmpty()) {
388-
PartitionHandler partitionHandler = catalogEnvironment.partitionHandler();
389-
if (partitionHandler != null) {
388+
PartitionModification partitionModification =
389+
catalogEnvironment.partitionModification();
390+
if (partitionModification != null) {
390391
callbacks.add(
391-
new AddPartitionCommitCallback(partitionHandler, partitionComputer()));
392+
new AddPartitionCommitCallback(partitionModification, partitionComputer()));
392393
}
393394
}
394395

395396
TagPreview tagPreview = TagPreview.create(options);
396397
if (options.tagToPartitionField() != null
397398
&& tagPreview != null
398399
&& schema.partitionKeys().isEmpty()) {
399-
PartitionHandler partitionHandler = catalogEnvironment.partitionHandler();
400-
if (partitionHandler != null) {
400+
PartitionModification partitionModification =
401+
catalogEnvironment.partitionModification();
402+
if (partitionModification != null) {
401403
TagPreviewCommitCallback callback =
402404
new TagPreviewCommitCallback(
403405
new AddPartitionTagCallback(
404-
partitionHandler, options.tagToPartitionField()),
406+
partitionModification, options.tagToPartitionField()),
405407
tagPreview);
406408
callbacks.add(callback);
407409
}
@@ -447,9 +449,9 @@ public PartitionExpire newPartitionExpire(
447449
Duration expirationTime,
448450
Duration checkInterval,
449451
PartitionExpireStrategy expireStrategy) {
450-
PartitionHandler partitionHandler = null;
452+
PartitionModification partitionModification = null;
451453
if (options.partitionedTableInMetastore()) {
452-
partitionHandler = catalogEnvironment.partitionHandler();
454+
partitionModification = catalogEnvironment.partitionModification();
453455
}
454456

455457
return new PartitionExpire(
@@ -458,7 +460,7 @@ public PartitionExpire newPartitionExpire(
458460
expireStrategy,
459461
newScan(),
460462
newCommit(commitUser, table),
461-
partitionHandler,
463+
partitionModification,
462464
options.endInputCheckPartitionExpire(),
463465
options.partitionExpireMaxNum(),
464466
options.partitionExpireBatchSize());
@@ -480,9 +482,10 @@ public List<TagCallback> createTagCallbacks(FileStoreTable table) {
480482
String partitionField = options.tagToPartitionField();
481483

482484
if (partitionField != null) {
483-
PartitionHandler partitionHandler = catalogEnvironment.partitionHandler();
484-
if (partitionHandler != null) {
485-
callbacks.add(new AddPartitionTagCallback(partitionHandler, partitionField));
485+
PartitionModification partitionModification =
486+
catalogEnvironment.partitionModification();
487+
if (partitionModification != null) {
488+
callbacks.add(new AddPartitionTagCallback(partitionModification, partitionField));
486489
}
487490
}
488491
if (options.tagCreateSuccessFile()) {

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.paimon.table.Instant;
4141
import org.apache.paimon.table.Table;
4242
import org.apache.paimon.table.TableSnapshot;
43-
import org.apache.paimon.table.sink.BatchTableCommit;
4443
import org.apache.paimon.table.system.SystemTableLoader;
4544
import org.apache.paimon.utils.SnapshotNotExistException;
4645

@@ -582,29 +581,15 @@ public boolean supportsVersionManagement() {
582581
}
583582

584583
@Override
585-
public TableQueryAuthResult authTableQuery(Identifier identifier, List<String> select) {
586-
throw new UnsupportedOperationException();
584+
public boolean supportsPartitionModification() {
585+
return false;
587586
}
588587

589588
@Override
590-
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
591-
throws TableNotExistException {}
592-
593-
@Override
594-
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
595-
throws TableNotExistException {
596-
Table table = getTable(identifier);
597-
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
598-
commit.truncatePartitions(partitions);
599-
} catch (Exception e) {
600-
throw new RuntimeException(e);
601-
}
589+
public TableQueryAuthResult authTableQuery(Identifier identifier, List<String> select) {
590+
throw new UnsupportedOperationException();
602591
}
603592

604-
@Override
605-
public void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
606-
throws TableNotExistException {}
607-
608593
@Override
609594
public List<String> listFunctions(String databaseName) {
610595
return Collections.emptyList();

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.paimon.table.Instant;
3333
import org.apache.paimon.table.Table;
3434
import org.apache.paimon.table.TableSnapshot;
35+
import org.apache.paimon.table.sink.BatchTableCommit;
3536
import org.apache.paimon.utils.SnapshotNotExistException;
3637
import org.apache.paimon.view.View;
3738
import org.apache.paimon.view.ViewChange;
@@ -869,15 +870,34 @@ void deleteTag(Identifier identifier, String tagName)
869870

870871
// ==================== Partition Modifications ==========================
871872

873+
/**
874+
* Whether this catalog supports partition modification for tables.
875+
*
876+
* <p>If not, following methods will do nothing:
877+
*
878+
* <ul>
879+
* <li>{@link #createPartitions(Identifier, List)}.
880+
* <li>{@link #alterPartitions(Identifier, List)}.
881+
* </ul>
882+
*
883+
* <p>If not, following method will be exactly the same as directly using {@link
884+
* BatchTableCommit#truncatePartitions}:
885+
*
886+
* <ul>
887+
* <li>{@link #dropPartitions(Identifier, List)}.
888+
* </ul>
889+
*/
890+
boolean supportsPartitionModification();
891+
872892
/**
873893
* Create partitions of the specify table. Ignore existing partitions.
874894
*
875895
* @param identifier path of the table to create partitions
876896
* @param partitions partitions to be created
877897
* @throws TableNotExistException if the table does not exist
878898
*/
879-
void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
880-
throws TableNotExistException;
899+
default void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
900+
throws TableNotExistException {}
881901

882902
/**
883903
* Drop partitions of the specify table. Ignore non-existent partitions.
@@ -886,8 +906,15 @@ void createPartitions(Identifier identifier, List<Map<String, String>> partition
886906
* @param partitions partitions to be deleted
887907
* @throws TableNotExistException if the table does not exist
888908
*/
889-
void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
890-
throws TableNotExistException;
909+
default void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
910+
throws TableNotExistException {
911+
Table table = getTable(identifier);
912+
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
913+
commit.truncatePartitions(partitions);
914+
} catch (Exception e) {
915+
throw new RuntimeException(e);
916+
}
917+
}
891918

892919
/**
893920
* Alter partitions of the specify table. For non-existent partitions, partitions will be
@@ -897,8 +924,8 @@ void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
897924
* @param partitions partitions to be altered
898925
* @throws TableNotExistException if the table does not exist
899926
*/
900-
void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
901-
throws TableNotExistException;
927+
default void alterPartitions(Identifier identifier, List<PartitionStatistics> partitions)
928+
throws TableNotExistException {}
902929

903930
// ======================= Function methods ===============================
904931

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ public static Table loadTable(
284284
isRestCatalog ? null : lockFactory,
285285
isRestCatalog ? null : lockContext,
286286
catalogContext,
287-
catalog.supportsVersionManagement());
287+
catalog.supportsVersionManagement(),
288+
catalog.supportsPartitionModification());
288289
Path path = new Path(schema.options().get(PATH.key()));
289290
FileStoreTable table =
290291
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);

paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,11 @@ public boolean commitSnapshot(
281281
return wrapped.commitSnapshot(identifier, tableUuid, snapshot, statistics);
282282
}
283283

284+
@Override
285+
public boolean supportsPartitionModification() {
286+
return wrapped.supportsPartitionModification();
287+
}
288+
284289
@Override
285290
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
286291
throws TableNotExistException {

paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.paimon.manifest.ManifestCommittable;
2626
import org.apache.paimon.manifest.ManifestEntry;
2727
import org.apache.paimon.manifest.SimpleFileEntry;
28-
import org.apache.paimon.table.PartitionHandler;
28+
import org.apache.paimon.table.PartitionModification;
2929
import org.apache.paimon.table.sink.CommitCallback;
3030
import org.apache.paimon.table.sink.CommitMessage;
3131
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -51,12 +51,13 @@ public class AddPartitionCommitCallback implements CommitCallback {
5151
.softValues()
5252
.build();
5353

54-
private final PartitionHandler partitionHandler;
54+
private final PartitionModification partitionModification;
5555
private final InternalRowPartitionComputer partitionComputer;
5656

5757
public AddPartitionCommitCallback(
58-
PartitionHandler partitionHandler, InternalRowPartitionComputer partitionComputer) {
59-
this.partitionHandler = partitionHandler;
58+
PartitionModification partitionModification,
59+
InternalRowPartitionComputer partitionComputer) {
60+
this.partitionModification = partitionModification;
6061
this.partitionComputer = partitionComputer;
6162
}
6263

@@ -92,7 +93,7 @@ private void addPartitions(Set<BinaryRow> partitions) {
9293
}
9394
}
9495
if (!newPartitions.isEmpty()) {
95-
partitionHandler.createPartitions(
96+
partitionModification.createPartitions(
9697
newPartitions.stream()
9798
.map(partitionComputer::generatePartValues)
9899
.collect(Collectors.toList()));
@@ -105,6 +106,6 @@ private void addPartitions(Set<BinaryRow> partitions) {
105106

106107
@Override
107108
public void close() throws Exception {
108-
partitionHandler.close();
109+
partitionModification.close();
109110
}
110111
}

paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.paimon.metastore;
2020

21-
import org.apache.paimon.table.PartitionHandler;
21+
import org.apache.paimon.table.PartitionModification;
2222
import org.apache.paimon.table.sink.TagCallback;
2323

2424
import java.util.Collections;
@@ -27,11 +27,12 @@
2727
/** A {@link TagCallback} to add newly created partitions to metastore. */
2828
public class AddPartitionTagCallback implements TagCallback {
2929

30-
private final PartitionHandler partitionHandler;
30+
private final PartitionModification partitionModification;
3131
private final String partitionField;
3232

33-
public AddPartitionTagCallback(PartitionHandler partitionHandler, String partitionField) {
34-
this.partitionHandler = partitionHandler;
33+
public AddPartitionTagCallback(
34+
PartitionModification partitionModification, String partitionField) {
35+
this.partitionModification = partitionModification;
3536
this.partitionField = partitionField;
3637
}
3738

@@ -40,7 +41,7 @@ public void notifyCreation(String tagName) {
4041
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
4142
partitionSpec.put(partitionField, tagName);
4243
try {
43-
partitionHandler.createPartitions(Collections.singletonList(partitionSpec));
44+
partitionModification.createPartitions(Collections.singletonList(partitionSpec));
4445
} catch (Exception e) {
4546
throw new RuntimeException(e);
4647
}
@@ -51,14 +52,14 @@ public void notifyDeletion(String tagName) {
5152
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
5253
partitionSpec.put(partitionField, tagName);
5354
try {
54-
partitionHandler.dropPartitions(Collections.singletonList(partitionSpec));
55+
partitionModification.dropPartitions(Collections.singletonList(partitionSpec));
5556
} catch (Exception e) {
5657
throw new RuntimeException(e);
5758
}
5859
}
5960

6061
@Override
6162
public void close() throws Exception {
62-
partitionHandler.close();
63+
partitionModification.close();
6364
}
6465
}

paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.paimon.manifest.PartitionEntry;
2525
import org.apache.paimon.partition.PartitionExpireStrategy;
2626
import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
27-
import org.apache.paimon.table.PartitionHandler;
27+
import org.apache.paimon.table.PartitionModification;
2828

2929
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
3030

@@ -53,7 +53,7 @@ public class PartitionExpire {
5353
private final Duration checkInterval;
5454
private final FileStoreScan scan;
5555
private final FileStoreCommit commit;
56-
@Nullable private final PartitionHandler partitionHandler;
56+
@Nullable private final PartitionModification partitionModification;
5757
private LocalDateTime lastCheck;
5858
private final PartitionExpireStrategy strategy;
5959
private final boolean endInputCheckPartitionExpire;
@@ -66,7 +66,7 @@ public PartitionExpire(
6666
PartitionExpireStrategy strategy,
6767
FileStoreScan scan,
6868
FileStoreCommit commit,
69-
@Nullable PartitionHandler partitionHandler,
69+
@Nullable PartitionModification partitionModification,
7070
boolean endInputCheckPartitionExpire,
7171
int maxExpireNum,
7272
int expireBatchSize) {
@@ -75,7 +75,7 @@ public PartitionExpire(
7575
this.strategy = strategy;
7676
this.scan = scan;
7777
this.commit = commit;
78-
this.partitionHandler = partitionHandler;
78+
this.partitionModification = partitionModification;
7979
// Avoid the execution time of stream jobs from being too short and preventing partition
8080
// expiration
8181
long rndSeconds = 0;
@@ -95,7 +95,7 @@ public PartitionExpire(
9595
PartitionExpireStrategy strategy,
9696
FileStoreScan scan,
9797
FileStoreCommit commit,
98-
@Nullable PartitionHandler partitionHandler,
98+
@Nullable PartitionModification partitionModification,
9999
int maxExpireNum,
100100
int expireBatchSize) {
101101
this(
@@ -104,7 +104,7 @@ public PartitionExpire(
104104
strategy,
105105
scan,
106106
commit,
107-
partitionHandler,
107+
partitionModification,
108108
false,
109109
maxExpireNum,
110110
expireBatchSize);
@@ -177,9 +177,9 @@ private List<Map<String, String>> doExpire(
177177

178178
private void doBatchExpire(
179179
List<Map<String, String>> expiredBatchPartitions, long commitIdentifier) {
180-
if (partitionHandler != null) {
180+
if (partitionModification != null) {
181181
try {
182-
partitionHandler.dropPartitions(expiredBatchPartitions);
182+
partitionModification.dropPartitions(expiredBatchPartitions);
183183
} catch (Catalog.TableNotExistException e) {
184184
throw new RuntimeException(e);
185185
}

0 commit comments

Comments
 (0)