Skip to content

Commit 365cba4

Browse files
committed
[core] Purge files use Table API and no dry run
1 parent 6d844c2 commit 365cba4

File tree

15 files changed

+133
-216
lines changed

15 files changed

+133
-216
lines changed

docs/content/flink/procedures.md

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -486,29 +486,15 @@ All available procedures are listed below.
486486
<tr>
487487
<td>purge_files</td>
488488
<td>
489-
-- for Flink 1.18<br/>
490-
-- clear table with purge files directly.<br/>
489+
-- clear table with purge files.<br/>
491490
CALL [catalog.]sys.purge_files('identifier')<br/>
492-
-- only check what dirs will be deleted, but not really delete them.<br/>
493-
CALL [catalog.]sys.purge_files('identifier', true)<br/><br/>
494-
-- for Flink 1.19 and later<br/>
495-
-- clear table with purge files directly.<br/>
496-
CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/>
497-
-- only check what dirs will be deleted, but not really delete them.<br/>
498-
CALL [catalog.]sys.purge_files(`table` => 'default.T', `dry_run` => true)<br/><br/>
499491
</td>
500492
<td>
501-
To clear table with purge files directly. Argument:
493+
To clear table with purge files. Argument:
502494
<li>table: the target table identifier. Cannot be empty.</li>
503-
<li>dry_run (optional): only check what dirs will be deleted, but not really delete them. Default is false.</li>
504495
</td>
505496
<td>
506-
-- for Flink 1.18<br/>
507497
CALL sys.purge_files('default.T')<br/>
508-
CALL sys.purge_files('default.T', true)<br/><br/>
509-
-- for Flink 1.19 and later<br/>
510-
CALL sys.purge_files(`table` => 'default.T')<br/>
511-
CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)
512498
</td>
513499
</tr>
514500
<tr>

docs/content/spark/procedures.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,11 @@ This section introduce all available spark procedures about paimon.
198198
<tr>
199199
<td>purge_files</td>
200200
<td>
201-
To clear table with purge files directly. Argument:
201+
To clear table with purge files. Argument:
202202
<li>table: the target table identifier. Cannot be empty.</li>
203-
<li>dry_run (optional): only check what dirs will be deleted, but not really delete them. Default is false.</li>
204203
</td>
205204
<td>
206205
CALL sys.purge_files(table => 'default.T')<br/>
207-
CALL sys.purge_files(table => 'default.T', dry_run => true)
208206
</td>
209207
</tr>
210208
<tr>

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,11 @@ public TableCommitImpl newCommit(String commitUser) {
452452
options.forceCreatingSnapshot());
453453
}
454454

455+
@Override
456+
public ConsumerManager consumerManager() {
457+
return new ConsumerManager(fileIO, path, snapshotManager().branch());
458+
}
459+
455460
@Nullable
456461
protected Runnable newExpireRunnable() {
457462
CoreOptions options = coreOptions();

paimon-core/src/main/java/org/apache/paimon/table/DataTable.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.table;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.consumer.ConsumerManager;
2223
import org.apache.paimon.fs.Path;
2324
import org.apache.paimon.schema.SchemaManager;
2425
import org.apache.paimon.table.source.DataTableScan;
@@ -42,6 +43,8 @@ public interface DataTable extends InnerTable {
4243

4344
ChangelogManager changelogManager();
4445

46+
ConsumerManager consumerManager();
47+
4548
SchemaManager schemaManager();
4649

4750
TagManager tagManager();

paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.FileStore;
2323
import org.apache.paimon.Snapshot;
24+
import org.apache.paimon.consumer.ConsumerManager;
2425
import org.apache.paimon.fs.FileIO;
2526
import org.apache.paimon.fs.Path;
2627
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -105,6 +106,11 @@ public SchemaManager schemaManager() {
105106
return wrapped.schemaManager();
106107
}
107108

109+
@Override
110+
public ConsumerManager consumerManager() {
111+
return wrapped.consumerManager();
112+
}
113+
108114
@Override
109115
public TagManager tagManager() {
110116
return wrapped.tagManager();
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table;
20+
21+
import org.apache.paimon.consumer.ConsumerManager;
22+
import org.apache.paimon.operation.LocalOrphanFilesClean;
23+
import org.apache.paimon.options.ExpireConfig;
24+
import org.apache.paimon.table.sink.BatchTableCommit;
25+
import org.apache.paimon.utils.BranchManager;
26+
import org.apache.paimon.utils.TagManager;
27+
28+
import java.time.Duration;
29+
30+
/** Utils for {@link FileStoreTable}. */
31+
public class FileStoreTableUtils {
32+
33+
public static void purgeFiles(FileStoreTable table) throws Exception {
34+
// clear branches
35+
BranchManager branchManager = table.branchManager();
36+
branchManager.branches().forEach(branchManager::dropBranch);
37+
38+
// clear tags
39+
TagManager tagManager = table.tagManager();
40+
tagManager.allTagNames().forEach(table::deleteTag);
41+
42+
// clear consumers
43+
ConsumerManager consumerManager = table.consumerManager();
44+
consumerManager.consumers().keySet().forEach(consumerManager::deleteConsumer);
45+
46+
// truncate table
47+
try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
48+
commit.truncateTable();
49+
}
50+
51+
// clear snapshots and changelogs
52+
ExpireConfig expireConfig =
53+
ExpireConfig.builder()
54+
.changelogMaxDeletes(Integer.MAX_VALUE)
55+
.changelogRetainMax(1)
56+
.changelogRetainMin(1)
57+
.changelogTimeRetain(Duration.ZERO)
58+
.snapshotMaxDeletes(Integer.MAX_VALUE)
59+
.snapshotRetainMax(1)
60+
.snapshotRetainMin(1)
61+
.snapshotTimeRetain(Duration.ZERO)
62+
.build();
63+
table.newExpireChangelog().config(expireConfig).expire();
64+
table.newExpireSnapshots().config(expireConfig).expire();
65+
66+
// clear orphan files
67+
LocalOrphanFilesClean clean = new LocalOrphanFilesClean(table, System.currentTimeMillis());
68+
clean.clean();
69+
}
70+
}

paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ public ChangelogManager changelogManager() {
193193
return wrapped.changelogManager();
194194
}
195195

196+
@Override
197+
public ConsumerManager consumerManager() {
198+
return wrapped.consumerManager();
199+
}
200+
196201
@Override
197202
public SchemaManager schemaManager() {
198203
return wrapped.schemaManager();

paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.consumer.ConsumerManager;
2324
import org.apache.paimon.data.BinaryString;
2425
import org.apache.paimon.data.GenericRow;
2526
import org.apache.paimon.data.InternalRow;
@@ -152,6 +153,11 @@ public ChangelogManager changelogManager() {
152153
return wrapped.changelogManager();
153154
}
154155

156+
@Override
157+
public ConsumerManager consumerManager() {
158+
return wrapped.consumerManager();
159+
}
160+
155161
@Override
156162
public SchemaManager schemaManager() {
157163
return wrapped.schemaManager();

paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.Snapshot;
2323
import org.apache.paimon.annotation.Experimental;
24+
import org.apache.paimon.consumer.ConsumerManager;
2425
import org.apache.paimon.data.BinaryRow;
2526
import org.apache.paimon.data.GenericRow;
2627
import org.apache.paimon.data.InternalRow;
@@ -138,6 +139,11 @@ public ChangelogManager changelogManager() {
138139
return wrapped.changelogManager();
139140
}
140141

142+
@Override
143+
public ConsumerManager consumerManager() {
144+
return wrapped.consumerManager();
145+
}
146+
141147
@Override
142148
public SchemaManager schemaManager() {
143149
return wrapped.schemaManager();

paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.consumer.ConsumerManager;
2324
import org.apache.paimon.fs.FileIO;
2425
import org.apache.paimon.fs.Path;
2526
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -173,6 +174,11 @@ public ChangelogManager changelogManager() {
173174
return wrapped.changelogManager();
174175
}
175176

177+
@Override
178+
public ConsumerManager consumerManager() {
179+
return wrapped.consumerManager();
180+
}
181+
176182
@Override
177183
public SchemaManager schemaManager() {
178184
return wrapped.schemaManager();

0 commit comments

Comments
 (0)