Skip to content

Commit abef2a9

Browse files
committed
[core] Simplify FileStoreCommitImpl to extract some classes
1 parent 08aeb86 commit abef2a9

File tree

11 files changed

+715
-421
lines changed

11 files changed

+715
-421
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 98 additions & 419 deletions
Large diffs are not rendered by default.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.manifest.IndexManifestEntry;
22+
import org.apache.paimon.manifest.ManifestEntry;
23+
24+
import java.util.List;
25+
26+
/** Commit changes. */
27+
public class CommitChanges {
28+
29+
public final List<ManifestEntry> tableFiles;
30+
public final List<ManifestEntry> changelogFiles;
31+
public final List<IndexManifestEntry> indexFiles;
32+
33+
public CommitChanges(
34+
List<ManifestEntry> tableFiles,
35+
List<ManifestEntry> changelogFiles,
36+
List<IndexManifestEntry> indexFiles) {
37+
this.tableFiles = tableFiles;
38+
this.changelogFiles = changelogFiles;
39+
this.indexFiles = indexFiles;
40+
}
41+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.manifest.IndexManifestEntry;
23+
import org.apache.paimon.manifest.ManifestEntry;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.List;
28+
29+
/** Provider to provide {@link CommitChanges}. */
30+
@FunctionalInterface
31+
public interface CommitChangesProvider {
32+
33+
CommitChanges provide(@Nullable Snapshot latestSnapshot);
34+
35+
static CommitChangesProvider provider(
36+
List<ManifestEntry> tableFiles,
37+
List<ManifestEntry> changelogFiles,
38+
List<IndexManifestEntry> indexFiles) {
39+
return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles);
40+
}
41+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.manifest.IndexManifestFile;
22+
import org.apache.paimon.manifest.ManifestFile;
23+
import org.apache.paimon.manifest.ManifestFileMeta;
24+
import org.apache.paimon.manifest.ManifestList;
25+
import org.apache.paimon.utils.Pair;
26+
27+
import java.util.List;
28+
import java.util.Objects;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
31+
32+
/** A cleaner to clean commit tmp files. */
33+
public class CommitCleaner {
34+
35+
private final ManifestList manifestList;
36+
private final ManifestFile manifestFile;
37+
private final IndexManifestFile indexManifestFile;
38+
39+
public CommitCleaner(
40+
ManifestList manifestList,
41+
ManifestFile manifestFile,
42+
IndexManifestFile indexManifestFile) {
43+
this.manifestList = manifestList;
44+
this.manifestFile = manifestFile;
45+
this.indexManifestFile = indexManifestFile;
46+
}
47+
48+
public void cleanUpReuseTmpManifests(
49+
Pair<String, Long> deltaManifestList,
50+
Pair<String, Long> changelogManifestList,
51+
String oldIndexManifest,
52+
String newIndexManifest) {
53+
if (deltaManifestList != null) {
54+
for (ManifestFileMeta manifest : manifestList.read(deltaManifestList.getKey())) {
55+
manifestFile.delete(manifest.fileName());
56+
}
57+
manifestList.delete(deltaManifestList.getKey());
58+
}
59+
60+
if (changelogManifestList != null) {
61+
for (ManifestFileMeta manifest : manifestList.read(changelogManifestList.getKey())) {
62+
manifestFile.delete(manifest.fileName());
63+
}
64+
manifestList.delete(changelogManifestList.getKey());
65+
}
66+
67+
cleanIndexManifest(oldIndexManifest, newIndexManifest);
68+
}
69+
70+
public void cleanUpNoReuseTmpManifests(
71+
Pair<String, Long> baseManifestList,
72+
List<ManifestFileMeta> mergeBeforeManifests,
73+
List<ManifestFileMeta> mergeAfterManifests) {
74+
if (baseManifestList != null) {
75+
manifestList.delete(baseManifestList.getKey());
76+
}
77+
Set<String> oldMetaSet =
78+
mergeBeforeManifests.stream()
79+
.map(ManifestFileMeta::fileName)
80+
.collect(Collectors.toSet());
81+
for (ManifestFileMeta suspect : mergeAfterManifests) {
82+
if (!oldMetaSet.contains(suspect.fileName())) {
83+
manifestFile.delete(suspect.fileName());
84+
}
85+
}
86+
}
87+
88+
private void cleanIndexManifest(String oldIndexManifest, String newIndexManifest) {
89+
if (newIndexManifest != null && !Objects.equals(oldIndexManifest, newIndexManifest)) {
90+
indexManifestFile.delete(newIndexManifest);
91+
}
92+
}
93+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.Snapshot.CommitKind;
22+
23+
/** Provider to provide {@link CommitKind}. */
24+
@FunctionalInterface
25+
public interface CommitKindProvider {
26+
27+
CommitKind provide(CommitChanges changes);
28+
29+
static CommitKindProvider provider(CommitKind kind) {
30+
return changes -> kind;
31+
}
32+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
/** Result of a commit. */
22+
public interface CommitResult {
23+
boolean isSuccess();
24+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.manifest.FileKind;
25+
import org.apache.paimon.manifest.IndexManifestEntry;
26+
import org.apache.paimon.manifest.IndexManifestFile;
27+
import org.apache.paimon.manifest.ManifestEntry;
28+
import org.apache.paimon.manifest.SimpleFileEntry;
29+
import org.apache.paimon.operation.FileStoreScan;
30+
import org.apache.paimon.partition.PartitionPredicate;
31+
import org.apache.paimon.table.BucketMode;
32+
import org.apache.paimon.table.source.ScanMode;
33+
34+
import javax.annotation.Nullable;
35+
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
39+
import static java.util.Collections.emptyList;
40+
41+
/** Manifest entries scanner for commit. */
42+
public class CommitScanner {
43+
44+
private final FileStoreScan scan;
45+
private final IndexManifestFile indexManifestFile;
46+
47+
public CommitScanner(
48+
FileStoreScan scan, IndexManifestFile indexManifestFile, CoreOptions options) {
49+
this.scan = scan;
50+
this.indexManifestFile = indexManifestFile;
51+
// Stats in DELETE Manifest Entries is useless
52+
if (options.manifestDeleteFileDropStats()) {
53+
this.scan.dropStats();
54+
}
55+
}
56+
57+
public List<SimpleFileEntry> readIncrementalChanges(
58+
Snapshot from, Snapshot to, List<BinaryRow> changedPartitions) {
59+
List<SimpleFileEntry> entries = new ArrayList<>();
60+
for (long i = from.id() + 1; i <= to.id(); i++) {
61+
List<SimpleFileEntry> delta =
62+
scan.withSnapshot(i)
63+
.withKind(ScanMode.DELTA)
64+
.withPartitionFilter(changedPartitions)
65+
.readSimpleEntries();
66+
entries.addAll(delta);
67+
}
68+
return entries;
69+
}
70+
71+
public List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
72+
Snapshot snapshot, List<BinaryRow> changedPartitions) {
73+
try {
74+
return scan.withSnapshot(snapshot)
75+
.withKind(ScanMode.ALL)
76+
.withPartitionFilter(changedPartitions)
77+
.readSimpleEntries();
78+
} catch (Throwable e) {
79+
throw new RuntimeException("Cannot read manifest entries from changed partitions.", e);
80+
}
81+
}
82+
83+
public CommitChanges readOverwriteChanges(
84+
int numBucket,
85+
List<ManifestEntry> changes,
86+
List<IndexManifestEntry> indexFiles,
87+
@Nullable Snapshot latestSnapshot,
88+
@Nullable PartitionPredicate partitionFilter) {
89+
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
90+
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
91+
if (latestSnapshot != null) {
92+
scan.withSnapshot(latestSnapshot)
93+
.withPartitionFilter(partitionFilter)
94+
.withKind(ScanMode.ALL);
95+
if (numBucket != BucketMode.POSTPONE_BUCKET) {
96+
// bucket = -2 can only be overwritten in postpone bucket tables
97+
scan.withBucketFilter(bucket -> bucket >= 0);
98+
}
99+
List<ManifestEntry> currentEntries = scan.plan().files();
100+
for (ManifestEntry entry : currentEntries) {
101+
changesWithOverwrite.add(
102+
ManifestEntry.create(
103+
FileKind.DELETE,
104+
entry.partition(),
105+
entry.bucket(),
106+
entry.totalBuckets(),
107+
entry.file()));
108+
}
109+
110+
// collect index files
111+
if (latestSnapshot.indexManifest() != null) {
112+
List<IndexManifestEntry> entries =
113+
indexManifestFile.read(latestSnapshot.indexManifest());
114+
for (IndexManifestEntry entry : entries) {
115+
if (partitionFilter == null || partitionFilter.test(entry.partition())) {
116+
indexChangesWithOverwrite.add(entry.toDeleteEntry());
117+
}
118+
}
119+
}
120+
}
121+
changesWithOverwrite.addAll(changes);
122+
indexChangesWithOverwrite.addAll(indexFiles);
123+
return new CommitChanges(changesWithOverwrite, emptyList(), indexChangesWithOverwrite);
124+
}
125+
126+
public long totalRecordCount(Snapshot snapshot) {
127+
return scan.totalRecordCount(snapshot);
128+
}
129+
}

0 commit comments

Comments
 (0)