Skip to content

Commit d640b2e

Browse files
committed
[core] Make bucketed append table write initial lighter
1 parent d923861 commit d640b2e

File tree

3 files changed

+14
-36
lines changed

3 files changed

+14
-36
lines changed

paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.io.IOException;
3939
import java.util.ArrayList;
4040
import java.util.Collection;
41-
import java.util.Comparator;
4241
import java.util.LinkedList;
4342
import java.util.List;
4443
import java.util.Optional;
@@ -47,6 +46,7 @@
4746
import java.util.concurrent.ExecutorService;
4847

4948
import static java.util.Collections.emptyList;
49+
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.DATA_FILE_COMPARATOR;
5050

5151
/** Compact manager for {@link AppendOnlyFileStore}. */
5252
public class BucketedAppendCompactManager extends CompactFutureManager {
@@ -78,7 +78,7 @@ public BucketedAppendCompactManager(
7878
@Nullable CompactionMetrics.Reporter metricsReporter) {
7979
this.executor = executor;
8080
this.dvMaintainer = dvMaintainer;
81-
this.toCompact = new PriorityQueue<>(fileComparator(false));
81+
this.toCompact = new PriorityQueue<>(DATA_FILE_COMPARATOR);
8282
this.toCompact.addAll(restored);
8383
this.minFileNum = minFileNum;
8484
this.targetFileSize = targetFileSize;
@@ -357,35 +357,4 @@ private static CompactResult result(List<DataFileMeta> before, List<DataFileMeta
357357
public interface CompactRewriter {
358358
List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws Exception;
359359
}
360-
361-
/**
362-
* New files may be created during the compaction process, then the results of the compaction
363-
* may be put after the new files, and this order will be disrupted. We need to ensure this
364-
* order, so we force the order by sequence.
365-
*/
366-
public static Comparator<DataFileMeta> fileComparator(boolean ignoreOverlap) {
367-
return (o1, o2) -> {
368-
if (o1 == o2) {
369-
return 0;
370-
}
371-
372-
if (!ignoreOverlap && isOverlap(o1, o2)) {
373-
LOG.warn(
374-
String.format(
375-
"There should no overlap in append files, but Range1(%s, %s), Range2(%s, %s),"
376-
+ " check if you have multiple write jobs.",
377-
o1.minSequenceNumber(),
378-
o1.maxSequenceNumber(),
379-
o2.minSequenceNumber(),
380-
o2.maxSequenceNumber()));
381-
}
382-
383-
return Long.compare(o1.minSequenceNumber(), o2.minSequenceNumber());
384-
};
385-
}
386-
387-
private static boolean isOverlap(DataFileMeta o1, DataFileMeta o2) {
388-
return o2.minSequenceNumber() <= o1.maxSequenceNumber()
389-
&& o2.maxSequenceNumber() >= o1.minSequenceNumber();
390-
}
391360
}

paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,16 @@ public BucketedAppendFileStoreWrite(
6868
options,
6969
dvMaintainerFactory,
7070
tableName);
71+
super.withIgnorePreviousFiles(options.writeOnly());
7172
this.commitUser = commitUser;
7273
}
7374

75+
@Override
76+
public void withIgnorePreviousFiles(boolean ignorePrevious) {
77+
// for append table, we need all writers to be empty if write only
78+
super.withIgnorePreviousFiles(options.writeOnly());
79+
}
80+
7481
@Override
7582
protected CompactManager getCompactManager(
7683
BinaryRow partition,

paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.Collections;
27+
import java.util.Comparator;
2728
import java.util.List;
2829
import java.util.function.Function;
2930
import java.util.stream.Collectors;
3031

31-
import static org.apache.paimon.append.BucketedAppendCompactManager.fileComparator;
32-
3332
/** Append only implementation of {@link SplitGenerator}. */
3433
public class AppendOnlySplitGenerator implements SplitGenerator {
3534

35+
public static final Comparator<DataFileMeta> DATA_FILE_COMPARATOR =
36+
Comparator.comparing(DataFileMeta::creationTime).thenComparing(DataFileMeta::fileName);
37+
3638
private final long targetSplitSize;
3739
private final long openFileCost;
3840
private final BucketMode bucketMode;
@@ -52,7 +54,7 @@ public boolean alwaysRawConvertible() {
5254
@Override
5355
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
5456
List<DataFileMeta> files = new ArrayList<>(input);
55-
files.sort(fileComparator(bucketMode == BucketMode.BUCKET_UNAWARE));
57+
files.sort(DATA_FILE_COMPARATOR);
5658
Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openFileCost);
5759
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
5860
.map(SplitGroup::rawConvertibleGroup)

0 commit comments

Comments
 (0)