Skip to content

Commit d2ccd1b

Browse files
committed
[core] Custom Partition expire factory should be invoked first
1 parent 87bafe8 commit d2ccd1b

File tree

7 files changed

+49
-62
lines changed

7 files changed

+49
-62
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -895,9 +895,9 @@
895895
</tr>
896896
<tr>
897897
<td><h5>partition.expiration-strategy</h5></td>
898-
<td style="word-wrap: break-word;">values-time</td>
899-
<td><p>Enum</p></td>
900-
<td>The strategy determines how to extract the partition time and compare it with the current time.<br /><br />Possible values:<ul><li>"values-time": This strategy compares the time extracted from the partition value with the current time.</li><li>"update-time": This strategy compares the last update time of the partition with the current time.</li><li>"custom": This strategy use custom class to expire partitions.</li></ul></td>
898+
<td style="word-wrap: break-word;">"values-time"</td>
899+
<td>String</td>
900+
<td>The strategy determines how to extract the partition time and compare it with the current time.<ul><li>"values-time": This strategy compares the time extracted from the partition value with the current time.</li><li>"update-time": This strategy compares the last update time of the partition with the current time.</li></ul></td>
901901
</tr>
902902
<tr>
903903
<td><h5>partition.expiration-time</h5></td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,12 +1007,20 @@ public InlineElement getDescription() {
10071007
"Whether only overwrite dynamic partition when overwriting a partitioned table with "
10081008
+ "dynamic partition columns. Works only when the table has partition keys.");
10091009

1010-
public static final ConfigOption<PartitionExpireStrategy> PARTITION_EXPIRATION_STRATEGY =
1010+
public static final ConfigOption<String> PARTITION_EXPIRATION_STRATEGY =
10111011
key("partition.expiration-strategy")
1012-
.enumType(PartitionExpireStrategy.class)
1013-
.defaultValue(PartitionExpireStrategy.VALUES_TIME)
1012+
.stringType()
1013+
.defaultValue("values-time")
10141014
.withDescription(
1015-
"The strategy determines how to extract the partition time and compare it with the current time.");
1015+
Description.builder()
1016+
.text(
1017+
"The strategy determines how to extract the partition time and compare it with the current time.")
1018+
.list(
1019+
text(
1020+
"\"values-time\": This strategy compares the time extracted from the partition value with the current time."),
1021+
text(
1022+
"\"update-time\": This strategy compares the last update time of the partition with the current time."))
1023+
.build());
10161024

10171025
public static final ConfigOption<Duration> PARTITION_EXPIRATION_TIME =
10181026
key("partition.expiration-time")
@@ -2848,7 +2856,7 @@ public int partitionExpireBatchSize() {
28482856
.orElse(options.get(PARTITION_EXPIRATION_MAX_NUM));
28492857
}
28502858

2851-
public PartitionExpireStrategy partitionExpireStrategy() {
2859+
public String partitionExpireStrategy() {
28522860
return options.get(PARTITION_EXPIRATION_STRATEGY);
28532861
}
28542862

@@ -3802,38 +3810,6 @@ public InlineElement getDescription() {
38023810
}
38033811
}
38043812

3805-
/** Specifies the expiration strategy for partition expiration. */
3806-
public enum PartitionExpireStrategy implements DescribedEnum {
3807-
VALUES_TIME(
3808-
"values-time",
3809-
"This strategy compares the time extracted from the partition value with the current time."),
3810-
3811-
UPDATE_TIME(
3812-
"update-time",
3813-
"This strategy compares the last update time of the partition with the current time."),
3814-
3815-
CUSTOM("custom", "This strategy use custom class to expire partitions.");
3816-
3817-
private final String value;
3818-
3819-
private final String description;
3820-
3821-
PartitionExpireStrategy(String value, String description) {
3822-
this.value = value;
3823-
this.description = description;
3824-
}
3825-
3826-
@Override
3827-
public String toString() {
3828-
return value;
3829-
}
3830-
3831-
@Override
3832-
public InlineElement getDescription() {
3833-
return text(description);
3834-
}
3835-
}
3836-
38373813
/** Specifies the strategy for selecting external storage paths. */
38383814
public enum ExternalPathStrategy implements DescribedEnum {
38393815
NONE(

paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.ArrayList;
2828
import java.util.Iterator;
2929
import java.util.List;
30+
import java.util.Optional;
3031
import java.util.ServiceLoader;
3132
import java.util.stream.Collectors;
3233

@@ -149,13 +150,11 @@ public static <T> List<T> discoverFactories(ClassLoader classLoader, Class<T> kl
149150
* @param <T> the type of the factory
150151
* @return the factory
151152
*/
152-
public static <T> T discoverSingletonFactory(ClassLoader classLoader, Class<T> klass) {
153+
public static <T> Optional<T> discoverSingletonFactory(
154+
ClassLoader classLoader, Class<T> klass) {
153155
List<T> factories = FactoryUtil.discoverFactories(classLoader, klass);
154156
if (factories.isEmpty()) {
155-
throw new FactoryException(
156-
String.format(
157-
"Could not find any factories that implement '%s' in the classpath.",
158-
klass.getName()));
157+
return Optional.empty();
159158
}
160159

161160
if (factories.size() > 1) {
@@ -171,6 +170,6 @@ public static <T> T discoverSingletonFactory(ClassLoader classLoader, Class<T> k
171170
.collect(Collectors.joining("\n"))));
172171
}
173172

174-
return factories.get(0);
173+
return Optional.of(factories.get(0));
175174
}
176175
}

paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.LinkedHashMap;
3535
import java.util.List;
3636
import java.util.Map;
37+
import java.util.Optional;
3738

3839
/** Strategy for partition expiration. */
3940
public abstract class PartitionExpireStrategy {
@@ -80,18 +81,23 @@ public static PartitionExpireStrategy createPartitionExpireStrategy(
8081
RowType partitionType,
8182
@Nullable CatalogLoader catalogLoader,
8283
@Nullable Identifier identifier) {
83-
switch (options.partitionExpireStrategy()) {
84-
case UPDATE_TIME:
84+
Optional<PartitionExpireStrategyFactory> custom =
85+
PartitionExpireStrategyFactory.INSTANCE.get();
86+
if (custom.isPresent()) {
87+
try {
88+
return custom.get().create(catalogLoader, identifier, options, partitionType);
89+
} catch (UnsupportedOperationException ignored) {
90+
}
91+
}
92+
93+
String strategy = options.partitionExpireStrategy();
94+
switch (strategy) {
95+
case "update-time":
8596
return new PartitionUpdateTimeExpireStrategy(options, partitionType);
86-
case VALUES_TIME:
97+
case "values-time":
8798
return new PartitionValuesTimeExpireStrategy(options, partitionType);
88-
case CUSTOM:
89-
return PartitionExpireStrategyFactory.INSTANCE
90-
.get()
91-
.create(catalogLoader, identifier, options, partitionType);
9299
default:
93-
throw new IllegalArgumentException(
94-
"Unknown partitionExpireStrategy: " + options.partitionExpireStrategy());
100+
throw new IllegalArgumentException("Unknown partitionExpireStrategy: " + strategy);
95101
}
96102
}
97103
}

paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategyFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.paimon.shade.guava30.com.google.common.base.Supplier;
2828
import org.apache.paimon.shade.guava30.com.google.common.base.Suppliers;
2929

30+
import java.util.Optional;
31+
3032
/** Factory to create a {@link PartitionExpireStrategy}. */
3133
public interface PartitionExpireStrategyFactory {
3234

@@ -36,7 +38,7 @@ PartitionExpireStrategy create(
3638
CoreOptions options,
3739
RowType partitionType);
3840

39-
Supplier<PartitionExpireStrategyFactory> INSTANCE =
41+
Supplier<Optional<PartitionExpireStrategyFactory>> INSTANCE =
4042
Suppliers.memoize(
4143
() ->
4244
FactoryUtil.discoverSingletonFactory(

paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.stream.Collectors;
3737
import java.util.stream.IntStream;
3838

39+
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_STRATEGY;
40+
3941
/**
4042
* A partition expiration policy that compare the time extracted from the partition with the current
4143
* time.
@@ -86,19 +88,17 @@ public boolean test(BinaryRow partition) {
8688
+ " 1. Check the expiration configuration.\n"
8789
+ " 2. Manually delete the partition using the drop-partition command if the partition"
8890
+ " value is non-date formatted.\n"
89-
+ " 3. Use '{}' expiration strategy by set '{}', which supports non-date formatted partition.",
91+
+ " 3. Use 'update-time' expiration strategy by set '{}', which supports non-date formatted partition.",
9092
formatPartitionInfo(array),
91-
CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
92-
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
93+
PARTITION_EXPIRATION_STRATEGY.key());
9394
return false;
9495
} catch (NullPointerException e) {
9596
// there might exist NULL partition value
9697
LOG.warn(
9798
"This partition {} cannot be expired because it contains null value. "
98-
+ "You can try to drop it manually or use '{}' expiration strategy by set '{}'.",
99+
+ "You can try to drop it manually or use 'update-time' expiration strategy by set '{}'.",
99100
formatPartitionInfo(array),
100-
CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
101-
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
101+
PARTITION_EXPIRATION_STRATEGY.key());
102102
return false;
103103
}
104104
}

paimon-core/src/test/java/org/apache/paimon/partition/CustomPartitionExpirationFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ public PartitionExpireStrategy create(
4343
Identifier identifier,
4444
CoreOptions options,
4545
RowType partitionType) {
46+
String strategy = options.partitionExpireStrategy();
47+
if (!"custom".equals(strategy)) {
48+
throw new UnsupportedOperationException();
49+
}
4650
return new PartitionExpireStrategy(partitionType, options.partitionDefaultName()) {
4751
@Override
4852
public List<PartitionEntry> selectExpiredPartitions(

0 commit comments

Comments
 (0)