Skip to content

Commit 3e706fe

Browse files
authored
[spark] Support spark compaction for postpone bucket table (apache#7169)
1 parent 6c1415b commit 3e706fe

File tree

10 files changed

+475
-97
lines changed

10 files changed

+475
-97
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,12 @@
10311031
<td>Boolean</td>
10321032
<td>Whether to write the data into fixed bucket for batch writing a postpone bucket table.</td>
10331033
</tr>
1034+
<tr>
1035+
<td><h5>postpone.default-bucket-num</h5></td>
1036+
<td style="word-wrap: break-word;">1</td>
1037+
<td>Integer</td>
1038+
<td>Bucket number for the partitions compacted for the first time in postpone bucket tables.</td>
1039+
</tr>
10341040
<tr>
10351041
<td><h5>primary-key</h5></td>
10361042
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,6 @@
128128
<td>Duration</td>
129129
<td>You can specify time interval for partition, for example, daily partition is '1 d', hourly partition is '1 h'.</td>
130130
</tr>
131-
<tr>
132-
<td><h5>postpone.default-bucket-num</h5></td>
133-
<td style="word-wrap: break-word;">1</td>
134-
<td>Integer</td>
135-
<td>Bucket number for the partitions compacted for the first time in postpone bucket tables.</td>
136-
</tr>
137131
<tr>
138132
<td><h5>precommit-compact</h5></td>
139133
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2164,6 +2164,13 @@ public InlineElement getDescription() {
21642164
.withDescription(
21652165
"Whether to write the data into fixed bucket for batch writing a postpone bucket table.");
21662166

2167+
public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
2168+
key("postpone.default-bucket-num")
2169+
.intType()
2170+
.defaultValue(1)
2171+
.withDescription(
2172+
"Bucket number for the partitions compacted for the first time in postpone bucket tables.");
2173+
21672174
public static final ConfigOption<Long> GLOBAL_INDEX_ROW_COUNT_PER_SHARD =
21682175
key("global-index.row-count-per-shard")
21692176
.longType()
@@ -3394,6 +3401,10 @@ public boolean postponeBatchWriteFixedBucket() {
33943401
return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
33953402
}
33963403

3404+
public int postponeDefaultBucketNum() {
3405+
return options.get(POSTPONE_DEFAULT_BUCKET_NUM);
3406+
}
3407+
33973408
public long globalIndexRowCountPerShard() {
33983409
return options.get(GLOBAL_INDEX_ROW_COUNT_PER_SHARD);
33993410
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.postpone;
20+
21+
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
24+
import org.apache.paimon.index.IndexFileMeta;
25+
import org.apache.paimon.io.CompactIncrement;
26+
import org.apache.paimon.io.DataFileMeta;
27+
import org.apache.paimon.io.DataFilePathFactory;
28+
import org.apache.paimon.io.DataIncrement;
29+
import org.apache.paimon.table.sink.CommitMessageImpl;
30+
31+
import javax.annotation.Nullable;
32+
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.LinkedHashMap;
36+
import java.util.List;
37+
import java.util.Map;
38+
39+
/**
40+
* A utility class to track and manage file changes for a specific bucket in a postpone bucket
41+
* table.
42+
*/
43+
public class BucketFiles {
44+
private final DataFilePathFactory pathFactory;
45+
private final FileIO fileIO;
46+
47+
private @Nullable Integer totalBuckets;
48+
private final Map<String, DataFileMeta> newFiles;
49+
private final List<DataFileMeta> compactBefore;
50+
private final List<DataFileMeta> compactAfter;
51+
private final List<DataFileMeta> changelogFiles;
52+
private final List<IndexFileMeta> newIndexFiles;
53+
private final List<IndexFileMeta> deletedIndexFiles;
54+
55+
public BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
56+
this.pathFactory = pathFactory;
57+
this.fileIO = fileIO;
58+
59+
this.newFiles = new LinkedHashMap<>();
60+
this.compactBefore = new ArrayList<>();
61+
this.compactAfter = new ArrayList<>();
62+
this.changelogFiles = new ArrayList<>();
63+
this.newIndexFiles = new ArrayList<>();
64+
this.deletedIndexFiles = new ArrayList<>();
65+
}
66+
67+
public void update(CommitMessageImpl message) {
68+
totalBuckets = message.totalBuckets();
69+
70+
for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
71+
newFiles.put(file.fileName(), file);
72+
}
73+
74+
Map<String, Path> toDelete = new HashMap<>();
75+
for (DataFileMeta file : message.compactIncrement().compactBefore()) {
76+
if (newFiles.containsKey(file.fileName())) {
77+
toDelete.put(file.fileName(), pathFactory.toPath(file));
78+
newFiles.remove(file.fileName());
79+
} else {
80+
compactBefore.add(file);
81+
}
82+
}
83+
84+
for (DataFileMeta file : message.compactIncrement().compactAfter()) {
85+
compactAfter.add(file);
86+
toDelete.remove(file.fileName());
87+
}
88+
89+
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
90+
changelogFiles.addAll(message.compactIncrement().changelogFiles());
91+
92+
newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
93+
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
94+
95+
toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
96+
}
97+
98+
public CommitMessageImpl makeMessage(BinaryRow partition, int bucket) {
99+
List<DataFileMeta> realCompactAfter = new ArrayList<>(newFiles.values());
100+
realCompactAfter.addAll(compactAfter);
101+
return new CommitMessageImpl(
102+
partition,
103+
bucket,
104+
totalBuckets,
105+
DataIncrement.emptyIncrement(),
106+
new CompactIncrement(
107+
compactBefore,
108+
realCompactAfter,
109+
changelogFiles,
110+
newIndexFiles,
111+
deletedIndexFiles));
112+
}
113+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -447,13 +447,6 @@ public class FlinkConnectorOptions {
447447
"Bounded mode for Paimon consumer. "
448448
+ "By default, Paimon automatically selects bounded mode based on the mode of the Flink job.");
449449

450-
public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
451-
key("postpone.default-bucket-num")
452-
.intType()
453-
.defaultValue(1)
454-
.withDescription(
455-
"Bucket number for the partitions compacted for the first time in postpone bucket tables.");
456-
457450
public static final ConfigOption<Boolean> SCAN_DEDICATED_SPLIT_GENERATION =
458451
key("scan.dedicated-split-generation")
459452
.booleanType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ protected boolean buildForPostponeBucketCompaction(
290290
"Postpone bucket compaction currently does not support predicates");
291291

292292
Options options = new Options(table.options());
293-
int defaultBucketNum = options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
293+
int defaultBucketNum = options.get(CoreOptions.POSTPONE_DEFAULT_BUCKET_NUM);
294294

295295
// change bucket to a positive value, so we can scan files from the bucket = -2 directory
296296
Map<String, String> bucketOptions = new HashMap<>(table.options());

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java

Lines changed: 1 addition & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,13 @@
2020

2121
import org.apache.paimon.data.BinaryRow;
2222
import org.apache.paimon.flink.sink.Committable;
23-
import org.apache.paimon.fs.FileIO;
24-
import org.apache.paimon.fs.Path;
25-
import org.apache.paimon.index.IndexFileMeta;
26-
import org.apache.paimon.io.CompactIncrement;
27-
import org.apache.paimon.io.DataFileMeta;
28-
import org.apache.paimon.io.DataFilePathFactory;
29-
import org.apache.paimon.io.DataIncrement;
23+
import org.apache.paimon.postpone.BucketFiles;
3024
import org.apache.paimon.table.FileStoreTable;
3125
import org.apache.paimon.table.sink.CommitMessageImpl;
3226
import org.apache.paimon.utils.FileStorePathFactory;
3327

34-
import javax.annotation.Nullable;
35-
3628
import java.util.ArrayList;
3729
import java.util.HashMap;
38-
import java.util.LinkedHashMap;
3930
import java.util.List;
4031
import java.util.Map;
4132

@@ -84,77 +75,4 @@ public List<Committable> emitAll(long checkpointId) {
8475
buckets.clear();
8576
return result;
8677
}
87-
88-
private static class BucketFiles {
89-
90-
private final DataFilePathFactory pathFactory;
91-
private final FileIO fileIO;
92-
93-
private @Nullable Integer totalBuckets;
94-
private final Map<String, DataFileMeta> newFiles;
95-
private final List<DataFileMeta> compactBefore;
96-
private final List<DataFileMeta> compactAfter;
97-
private final List<DataFileMeta> changelogFiles;
98-
private final List<IndexFileMeta> newIndexFiles;
99-
private final List<IndexFileMeta> deletedIndexFiles;
100-
101-
private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
102-
this.pathFactory = pathFactory;
103-
this.fileIO = fileIO;
104-
105-
this.newFiles = new LinkedHashMap<>();
106-
this.compactBefore = new ArrayList<>();
107-
this.compactAfter = new ArrayList<>();
108-
this.changelogFiles = new ArrayList<>();
109-
this.newIndexFiles = new ArrayList<>();
110-
this.deletedIndexFiles = new ArrayList<>();
111-
}
112-
113-
private void update(CommitMessageImpl message) {
114-
totalBuckets = message.totalBuckets();
115-
116-
for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
117-
newFiles.put(file.fileName(), file);
118-
}
119-
120-
Map<String, Path> toDelete = new HashMap<>();
121-
for (DataFileMeta file : message.compactIncrement().compactBefore()) {
122-
if (newFiles.containsKey(file.fileName())) {
123-
toDelete.put(file.fileName(), pathFactory.toPath(file));
124-
newFiles.remove(file.fileName());
125-
} else {
126-
compactBefore.add(file);
127-
}
128-
}
129-
130-
for (DataFileMeta file : message.compactIncrement().compactAfter()) {
131-
compactAfter.add(file);
132-
toDelete.remove(file.fileName());
133-
}
134-
135-
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
136-
changelogFiles.addAll(message.compactIncrement().changelogFiles());
137-
138-
newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
139-
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
140-
141-
toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
142-
}
143-
144-
private CommitMessageImpl makeMessage(BinaryRow partition, int bucket) {
145-
List<DataFileMeta> realCompactAfter = new ArrayList<>(newFiles.values());
146-
realCompactAfter.addAll(compactAfter);
147-
return new CommitMessageImpl(
148-
partition,
149-
bucket,
150-
totalBuckets,
151-
DataIncrement.emptyIncrement(),
152-
new CompactIncrement(
153-
compactBefore,
154-
realCompactAfter,
155-
changelogFiles,
156-
newIndexFiles,
157-
deletedIndexFiles));
158-
}
159-
}
16078
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,11 @@ private boolean execute(
278278
table, partitionPredicate, partitionIdleTime, javaSparkContext);
279279
}
280280
break;
281+
case POSTPONE_MODE:
282+
SparkPostponeCompactProcedure.apply(
283+
table, spark(), partitionPredicate, relation)
284+
.execute();
285+
break;
281286
default:
282287
throw new UnsupportedOperationException(
283288
"Spark compact with " + bucketMode + " is not support yet.");

0 commit comments

Comments
 (0)