Skip to content

Commit 2f6f682

Browse files
committed
[core] Introduce custom PartitionExpireStrategy
1 parent 10b8dfa commit 2f6f682

File tree

9 files changed

+240
-5
lines changed

9 files changed

+240
-5
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3248,7 +3248,9 @@ public enum PartitionExpireStrategy implements DescribedEnum {
32483248

32493249
UPDATE_TIME(
32503250
"update-time",
3251-
"This strategy compares the last update time of the partition with the current time.");
3251+
"This strategy compares the last update time of the partition with the current time."),
3252+
3253+
CUSTOM("custom", "This strategy use custom class to expire partitions.");
32523254

32533255
private final String value;
32543256

paimon-api/src/main/java/org/apache/paimon/factories/FactoryUtil.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,37 @@ public static <T> List<T> discoverFactories(ClassLoader classLoader, Class<T> kl
140140

141141
return loadResults;
142142
}
143+
144+
/**
145+
* Discover a singleton factory.
146+
*
147+
* @param classLoader the class loader
148+
* @param klass the klass
149+
* @param <T> the type of the factory
150+
* @return the factory
151+
*/
152+
public static <T> T discoverSingletonFactory(ClassLoader classLoader, Class<T> klass) {
153+
List<T> factories = FactoryUtil.discoverFactories(classLoader, klass);
154+
if (factories.isEmpty()) {
155+
throw new FactoryException(
156+
String.format(
157+
"Could not find any factories that implement '%s' in the classpath.",
158+
klass.getName()));
159+
}
160+
161+
if (factories.size() > 1) {
162+
throw new FactoryException(
163+
String.format(
164+
"Multiple factories that implement '%s' found in the classpath.\n\n"
165+
+ "Ambiguous factory classes are:\n\n"
166+
+ "%s",
167+
klass.getName(),
168+
factories.stream()
169+
.map(f -> f.getClass().getName())
170+
.sorted()
171+
.collect(Collectors.joining("\n"))));
172+
}
173+
174+
return factories.get(0);
175+
}
143176
}

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.Comparator;
8080
import java.util.List;
8181

82+
import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
8283
import static org.apache.paimon.utils.Preconditions.checkArgument;
8384

8485
/**
@@ -423,7 +424,11 @@ public PartitionExpire newPartitionExpire(String commitUser, FileStoreTable tabl
423424
table,
424425
partitionExpireTime,
425426
options.partitionExpireCheckInterval(),
426-
PartitionExpireStrategy.createPartitionExpireStrategy(options, partitionType()));
427+
createPartitionExpireStrategy(
428+
options,
429+
partitionType(),
430+
catalogEnvironment.catalogLoader(),
431+
catalogEnvironment.identifier()));
427432
}
428433

429434
@Override

paimon-core/src/main/java/org/apache/paimon/partition/PartitionExpireStrategy.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
package org.apache.paimon.partition;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.CatalogLoader;
23+
import org.apache.paimon.catalog.Identifier;
2224
import org.apache.paimon.data.BinaryRow;
2325
import org.apache.paimon.manifest.PartitionEntry;
2426
import org.apache.paimon.operation.FileStoreScan;
2527
import org.apache.paimon.types.RowType;
2628
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
2729

30+
import javax.annotation.Nullable;
31+
2832
import java.time.LocalDateTime;
2933
import java.util.ArrayList;
3034
import java.util.LinkedHashMap;
@@ -66,13 +70,22 @@ public abstract List<PartitionEntry> selectExpiredPartitions(
6670
FileStoreScan scan, LocalDateTime expirationTime);
6771

6872
public static PartitionExpireStrategy createPartitionExpireStrategy(
69-
CoreOptions options, RowType partitionType) {
73+
CoreOptions options,
74+
RowType partitionType,
75+
@Nullable CatalogLoader catalogLoader,
76+
@Nullable Identifier identifier) {
7077
switch (options.partitionExpireStrategy()) {
7178
case UPDATE_TIME:
7279
return new PartitionUpdateTimeExpireStrategy(partitionType);
7380
case VALUES_TIME:
74-
default:
7581
return new PartitionValuesTimeExpireStrategy(options, partitionType);
82+
case CUSTOM:
83+
return PartitionExpireStrategyFactory.INSTANCE
84+
.get()
85+
.create(catalogLoader, identifier, partitionType);
86+
default:
87+
throw new IllegalArgumentException(
88+
"Unknown partitionExpireStrategy: " + options.partitionExpireStrategy());
7689
}
7790
}
7891
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.partition;
20+
21+
import org.apache.paimon.catalog.CatalogLoader;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.factories.FactoryUtil;
24+
import org.apache.paimon.types.RowType;
25+
26+
import org.apache.paimon.shade.guava30.com.google.common.base.Supplier;
27+
import org.apache.paimon.shade.guava30.com.google.common.base.Suppliers;
28+
29+
/** Factory to create a {@link PartitionExpireStrategy}. */
30+
public interface PartitionExpireStrategyFactory {
31+
32+
PartitionExpireStrategy create(
33+
CatalogLoader catalogLoader, Identifier identifier, RowType partitionType);
34+
35+
Supplier<PartitionExpireStrategyFactory> INSTANCE =
36+
Suppliers.memoize(
37+
() ->
38+
FactoryUtil.discoverSingletonFactory(
39+
PartitionExpireStrategy.class.getClassLoader(),
40+
PartitionExpireStrategyFactory.class));
41+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.partition;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.CatalogLoader;
23+
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.manifest.PartitionEntry;
25+
import org.apache.paimon.operation.FileStoreScan;
26+
import org.apache.paimon.table.Table;
27+
import org.apache.paimon.types.RowType;
28+
29+
import java.time.LocalDateTime;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/** Custom {@link PartitionExpireStrategyFactory}. */
35+
public class CustomPartitionExpirationFactory implements PartitionExpireStrategyFactory {
36+
37+
public static final Map<String, List<PartitionEntry>> TABLE_EXPIRE_PARTITIONS = new HashMap<>();
38+
39+
@Override
40+
public PartitionExpireStrategy create(
41+
CatalogLoader catalogLoader, Identifier identifier, RowType partitionType) {
42+
return new PartitionExpireStrategy(partitionType) {
43+
@Override
44+
public List<PartitionEntry> selectExpiredPartitions(
45+
FileStoreScan scan, LocalDateTime expirationTime) {
46+
Table table;
47+
try (Catalog catalog = catalogLoader.load()) {
48+
table = catalog.getTable(identifier);
49+
} catch (Exception e) {
50+
throw new RuntimeException(e);
51+
}
52+
return TABLE_EXPIRE_PARTITIONS.get(table.options().get("path"));
53+
}
54+
};
55+
}
56+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.partition;
20+
21+
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.data.GenericRow;
23+
import org.apache.paimon.manifest.PartitionEntry;
24+
import org.apache.paimon.schema.Schema;
25+
import org.apache.paimon.table.Table;
26+
import org.apache.paimon.table.TableTestBase;
27+
import org.apache.paimon.types.DataTypes;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.Collections;
32+
33+
import static org.apache.paimon.CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE;
34+
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_STRATEGY;
35+
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
class PartitionExpireTableTest extends TableTestBase {
39+
40+
@Test
41+
public void testCustomExpire() throws Exception {
42+
Schema.Builder schemaBuilder = Schema.newBuilder();
43+
schemaBuilder.column("f0", DataTypes.INT());
44+
schemaBuilder.column("pt", DataTypes.INT());
45+
schemaBuilder.partitionKeys("pt");
46+
schemaBuilder.option(PARTITION_EXPIRATION_STRATEGY.key(), "custom");
47+
schemaBuilder.option(PARTITION_EXPIRATION_TIME.key(), "1 d");
48+
schemaBuilder.option(END_INPUT_CHECK_PARTITION_EXPIRE.key(), "true");
49+
catalog.createTable(identifier(), schemaBuilder.build(), true);
50+
51+
Table table = catalog.getTable(identifier());
52+
write(table, GenericRow.of(1, 1));
53+
write(table, GenericRow.of(2, 2));
54+
assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(1, 1), GenericRow.of(2, 2));
55+
56+
PartitionEntry expire = new PartitionEntry(BinaryRow.singleColumn(1), 1, 1, 1, 1);
57+
CustomPartitionExpirationFactory.TABLE_EXPIRE_PARTITIONS.put(
58+
table.options().get("path"), Collections.singletonList(expire));
59+
write(table, GenericRow.of(3, 3));
60+
assertThat(read(table)).containsExactlyInAnyOrder(GenericRow.of(3, 3), GenericRow.of(2, 2));
61+
}
62+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
org.apache.paimon.partition.CustomPartitionExpirationFactory

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.FileStore;
23+
import org.apache.paimon.catalog.Identifier;
2324
import org.apache.paimon.operation.PartitionExpire;
2425
import org.apache.paimon.table.FileStoreTable;
2526
import org.apache.paimon.utils.TimeUtils;
@@ -63,7 +64,10 @@ public ExpirePartitionsAction(
6364
TimeUtils.parseDuration(expirationTime),
6465
Duration.ofMillis(0L),
6566
createPartitionExpireStrategy(
66-
CoreOptions.fromMap(map), fileStore.partitionType()));
67+
CoreOptions.fromMap(map),
68+
fileStore.partitionType(),
69+
catalogLoader(),
70+
new Identifier(databaseName, tableName)));
6771
}
6872

6973
@Override

0 commit comments

Comments
 (0)