Skip to content

Commit 400e417

Browse files
authored
[flink] Clean empty directory in FlinkOrphanFilesClean (apache#5521)
1 parent 10ce5aa commit 400e417

File tree

3 files changed

+78
-105
lines changed

3 files changed

+78
-105
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -169,35 +169,22 @@ protected void cleanBranchSnapshotDir(
169169

170170
private List<Pair<Path, Long>> tryGetNonSnapshotFiles(
171171
Path snapshotDirectory, Predicate<FileStatus> fileStatusFilter) {
172-
return listPathWithFilter(
173-
fileIO, snapshotDirectory, fileStatusFilter, nonSnapshotFileFilter());
172+
return listPathWithFilter(snapshotDirectory, fileStatusFilter, nonSnapshotFileFilter());
174173
}
175174

176175
private List<Pair<Path, Long>> tryGetNonChangelogFiles(
177176
Path changelogDirectory, Predicate<FileStatus> fileStatusFilter) {
178-
return listPathWithFilter(
179-
fileIO, changelogDirectory, fileStatusFilter, nonChangelogFileFilter());
177+
return listPathWithFilter(changelogDirectory, fileStatusFilter, nonChangelogFileFilter());
180178
}
181179

182-
private static List<Pair<Path, Long>> listPathWithFilter(
183-
FileIO fileIO,
184-
Path directory,
185-
Predicate<FileStatus> fileStatusFilter,
186-
Predicate<Path> fileFilter) {
187-
try {
188-
FileStatus[] statuses = fileIO.listStatus(directory);
189-
if (statuses == null) {
190-
return Collections.emptyList();
191-
}
192-
193-
return Arrays.stream(statuses)
194-
.filter(fileStatusFilter)
195-
.filter(status -> fileFilter.test(status.getPath()))
196-
.map(status -> Pair.of(status.getPath(), status.getLen()))
197-
.collect(Collectors.toList());
198-
} catch (IOException ignored) {
199-
return Collections.emptyList();
200-
}
180+
private List<Pair<Path, Long>> listPathWithFilter(
181+
Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
182+
List<FileStatus> statuses = tryBestListingDirs(directory);
183+
return statuses.stream()
184+
.filter(fileStatusFilter)
185+
.filter(status -> fileFilter.test(status.getPath()))
186+
.map(status -> Pair.of(status.getPath(), status.getLen()))
187+
.collect(Collectors.toList());
201188
}
202189

203190
private static Predicate<Path> nonSnapshotFileFilter() {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java

Lines changed: 50 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@
3535

3636
import org.apache.flink.api.common.RuntimeExecutionMode;
3737
import org.apache.flink.api.common.functions.ReduceFunction;
38-
import org.apache.flink.api.common.typeinfo.TypeHint;
3938
import org.apache.flink.api.common.typeinfo.TypeInformation;
4039
import org.apache.flink.api.java.tuple.Tuple2;
41-
import org.apache.flink.api.java.tuple.Tuple7;
4240
import org.apache.flink.configuration.Configuration;
4341
import org.apache.flink.configuration.CoreOptions;
4442
import org.apache.flink.configuration.ExecutionOptions;
@@ -58,7 +56,6 @@
5856

5957
import java.io.IOException;
6058
import java.util.ArrayList;
61-
import java.util.Arrays;
6259
import java.util.Collections;
6360
import java.util.HashMap;
6461
import java.util.HashSet;
@@ -118,8 +115,7 @@ public DataStream<CleanOrphanFilesResult> doOrphanClean(StreamExecutionEnvironme
118115
public void processElement(
119116
String branch,
120117
ProcessFunction<String, Tuple2<Long, Long>>.Context ctx,
121-
Collector<Tuple2<Long, Long>> out)
122-
throws Exception {
118+
Collector<Tuple2<Long, Long>> out) {
123119
AtomicLong deletedFilesCount = new AtomicLong(0);
124120
AtomicLong deletedFilesLenInBytes = new AtomicLong(0);
125121
cleanBranchSnapshotDir(
@@ -239,88 +235,17 @@ public void endInput() throws IOException {
239235
});
240236

241237
usedFiles = usedFiles.union(usedManifestFiles);
242-
FileStorePathFactory pathFactory = table.store().pathFactory();
243-
List<Tuple7<String, String, String, String, String, Integer, String>> tablePaths =
244-
Arrays.asList(
245-
new Tuple7<>(
246-
table.fullName(),
247-
pathFactory.manifestPath().toString(),
248-
pathFactory.indexPath().toString(),
249-
pathFactory.statisticsPath().toString(),
250-
pathFactory.dataFilePath().toString(),
251-
partitionKeysNum,
252-
table.store().options().dataFileExternalPaths()));
253238
DataStream<Tuple2<String, Long>> candidates =
254-
env.fromCollection(
255-
tablePaths,
256-
TypeInformation.of(
257-
new TypeHint<
258-
Tuple7<
259-
String,
260-
String,
261-
String,
262-
String,
263-
String,
264-
Integer,
265-
String>>() {}))
239+
env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class))
266240
.process(
267-
new ProcessFunction<
268-
Tuple7<
269-
String,
270-
String,
271-
String,
272-
String,
273-
String,
274-
Integer,
275-
String>,
276-
Tuple2<String, Long>>() {
241+
new ProcessFunction<Integer, Tuple2<String, Long>>() {
277242
@Override
278243
public void processElement(
279-
Tuple7<
280-
String,
281-
String,
282-
String,
283-
String,
284-
String,
285-
Integer,
286-
String>
287-
paths,
288-
ProcessFunction<
289-
Tuple7<
290-
String,
291-
String,
292-
String,
293-
String,
294-
String,
295-
Integer,
296-
String>,
297-
Tuple2<String, Long>>
298-
.Context
244+
Integer i,
245+
ProcessFunction<Integer, Tuple2<String, Long>>.Context
299246
ctx,
300247
Collector<Tuple2<String, Long>> out) {
301-
List<String> dirs =
302-
listPaimonFileDirs(
303-
paths.f0, paths.f1, paths.f2,
304-
paths.f3, paths.f4, paths.f5,
305-
paths.f6)
306-
.stream()
307-
.map(Path::toUri)
308-
.map(Object::toString)
309-
.collect(Collectors.toList());
310-
for (String dir : dirs) {
311-
for (FileStatus fileStatus :
312-
tryBestListingDirs(new Path(dir))) {
313-
if (oldEnough(fileStatus)) {
314-
out.collect(
315-
new Tuple2(
316-
fileStatus
317-
.getPath()
318-
.toUri()
319-
.toString(),
320-
fileStatus.getLen()));
321-
}
322-
}
323-
}
248+
listPaimonFilesForTable(out);
324249
}
325250
})
326251
.setParallelism(1);
@@ -398,6 +323,50 @@ public void processElement2(
398323
return deleted;
399324
}
400325

326+
private void listPaimonFilesForTable(Collector<Tuple2<String, Long>> out) {
327+
FileStorePathFactory pathFactory = table.store().pathFactory();
328+
List<String> dirs =
329+
listPaimonFileDirs(
330+
table.fullName(),
331+
pathFactory.manifestPath().toString(),
332+
pathFactory.indexPath().toString(),
333+
pathFactory.statisticsPath().toString(),
334+
pathFactory.dataFilePath().toString(),
335+
partitionKeysNum,
336+
table.coreOptions().dataFileExternalPaths())
337+
.stream()
338+
.map(Path::toUri)
339+
.map(Object::toString)
340+
.collect(Collectors.toList());
341+
Set<Path> emptyDirs = new HashSet<>();
342+
for (String dir : dirs) {
343+
Path dirPath = new Path(dir);
344+
List<FileStatus> files = tryBestListingDirs(dirPath);
345+
for (FileStatus file : files) {
346+
if (oldEnough(file)) {
347+
out.collect(new Tuple2<>(file.getPath().toUri().toString(), file.getLen()));
348+
}
349+
}
350+
if (files.isEmpty()) {
351+
emptyDirs.add(dirPath);
352+
}
353+
}
354+
355+
// delete empty dir
356+
while (!emptyDirs.isEmpty()) {
357+
Set<Path> newEmptyDir = new HashSet<>();
358+
for (Path emptyDir : emptyDirs) {
359+
try {
360+
fileIO.delete(emptyDir, false);
361+
// recursive cleaning
362+
newEmptyDir.add(emptyDir.getParent());
363+
} catch (IOException ignored) {
364+
}
365+
}
366+
emptyDirs = newEmptyDir;
367+
}
368+
}
369+
401370
public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
402371
StreamExecutionEnvironment env,
403372
Catalog catalog,

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private Path getOrphanFilePath(FileStoreTable table, String orphanFile) {
103103
public void testRunWithoutException(boolean isNamedArgument) throws Exception {
104104
assumeTrue(!isNamedArgument || supportNamedArgument());
105105

106-
createTableAndWriteData(tableName);
106+
FileStoreTable table = createTableAndWriteData(tableName);
107107

108108
List<String> args =
109109
new ArrayList<>(
@@ -158,6 +158,23 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception {
158158
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan));
159159

160160
assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2"));
161+
162+
// test clean empty directories
163+
FileIO fileIO = table.fileIO();
164+
Path location = table.location();
165+
Path bucketDir = new Path(location, "bucket-0");
166+
167+
// delete snapshots and clean orphan files
168+
fileIO.delete(new Path(location, "snapshot"), true);
169+
ImmutableList.copyOf(executeSQL(withOlderThan));
170+
assertThat(fileIO.exists(bucketDir)).isTrue();
171+
assertThat(fileIO.listDirectories(bucketDir)).isEmpty();
172+
173+
// clean empty directories
174+
ImmutableList.copyOf(executeSQL(withOlderThan));
175+
assertThat(fileIO.exists(bucketDir)).isFalse();
176+
// table should not be deleted
177+
assertThat(fileIO.exists(location)).isTrue();
161178
}
162179

163180
@ParameterizedTest

0 commit comments

Comments
 (0)