Skip to content

Commit 7a7cb3f

Browse files
committed
fix
1 parent d640b2e commit 7a7cb3f

File tree

10 files changed

+120
-15
lines changed

10 files changed

+120
-15
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2929,6 +2929,10 @@ public String consumerId() {
29292929
return consumerId;
29302930
}
29312931

2932+
public boolean bucketAppendOrdered() {
2933+
return options.get(BUCKET_APPEND_ORDERED);
2934+
}
2935+
29322936
@Nullable
29332937
public Integer fullCompactionDeltaCommits() {
29342938
return options.get(FULL_COMPACTION_DELTA_COMMITS);

paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.io.IOException;
3939
import java.util.ArrayList;
4040
import java.util.Collection;
41+
import java.util.Comparator;
4142
import java.util.LinkedList;
4243
import java.util.List;
4344
import java.util.Optional;
@@ -46,7 +47,8 @@
4647
import java.util.concurrent.ExecutorService;
4748

4849
import static java.util.Collections.emptyList;
49-
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.DATA_FILE_COMPARATOR;
50+
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.CREATION_TIME_COMPARATOR;
51+
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.SEQUENCE_NUMBER_COMPARATOR;
5052

5153
/** Compact manager for {@link AppendOnlyFileStore}. */
5254
public class BucketedAppendCompactManager extends CompactFutureManager {
@@ -55,6 +57,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager {
5557

5658
private static final int FULL_COMPACT_MIN_FILE = 3;
5759

60+
private final Comparator<DataFileMeta> comparator;
5861
private final ExecutorService executor;
5962
private final BucketedDvMaintainer dvMaintainer;
6063
private final PriorityQueue<DataFileMeta> toCompact;
@@ -74,11 +77,13 @@ public BucketedAppendCompactManager(
7477
int minFileNum,
7578
long targetFileSize,
7679
boolean forceRewriteAllFiles,
80+
boolean ordered,
7781
CompactRewriter rewriter,
7882
@Nullable CompactionMetrics.Reporter metricsReporter) {
83+
this.comparator = ordered ? SEQUENCE_NUMBER_COMPARATOR : CREATION_TIME_COMPARATOR;
7984
this.executor = executor;
8085
this.dvMaintainer = dvMaintainer;
81-
this.toCompact = new PriorityQueue<>(DATA_FILE_COMPARATOR);
86+
this.toCompact = new PriorityQueue<>(comparator);
8287
this.toCompact.addAll(restored);
8388
this.minFileNum = minFileNum;
8489
this.targetFileSize = targetFileSize;

paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,21 @@ public BucketedAppendFileStoreWrite(
6868
options,
6969
dvMaintainerFactory,
7070
tableName);
71-
super.withIgnorePreviousFiles(options.writeOnly());
71+
if (!options.bucketAppendOrdered()) {
72+
super.withIgnorePreviousFiles(options.writeOnly());
73+
}
7274
this.commitUser = commitUser;
7375
}
7476

7577
@Override
7678
public void withIgnorePreviousFiles(boolean ignorePrevious) {
77-
// for append table, we need all writers to be empty if write only
78-
super.withIgnorePreviousFiles(options.writeOnly());
79+
if (options.bucketAppendOrdered()) {
80+
super.withIgnorePreviousFiles(ignorePrevious);
81+
} else {
82+
// for unordered, don't need sequence number
83+
// all writers to be empty if write only
84+
super.withIgnorePreviousFiles(options.writeOnly());
85+
}
7986
}
8087

8188
@Override
@@ -99,6 +106,7 @@ protected CompactManager getCompactManager(
99106
options.compactionMinFileNum(),
100107
options.targetFileSize(false),
101108
options.forceRewriteAllFiles(),
109+
options.bucketAppendOrdered(),
102110
files -> compactRewrite(partition, bucket, dvFactory, files),
103111
compactionMetrics == null
104112
? null

paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.data.InternalRow;
2424
import org.apache.paimon.fs.FileIO;
2525
import org.apache.paimon.fs.Path;
26+
import org.apache.paimon.io.DataFileMeta;
2627
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
2728
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
2829
import org.apache.paimon.operation.FileStoreScan;
@@ -45,10 +46,14 @@
4546
import javax.annotation.Nullable;
4647

4748
import java.util.ArrayList;
49+
import java.util.Comparator;
4850
import java.util.List;
4951
import java.util.function.BiConsumer;
5052
import java.util.function.Function;
5153

54+
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.CREATION_TIME_COMPARATOR;
55+
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.SEQUENCE_NUMBER_COMPARATOR;
56+
5257
/** {@link FileStoreTable} for append table. */
5358
public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
5459

@@ -88,11 +93,17 @@ public AppendOnlyFileStore store() {
8893

8994
@Override
9095
protected SplitGenerator splitGenerator() {
91-
long targetSplitSize = store().options().splitTargetSize();
92-
long openFileCost = store().options().splitOpenFileCost();
96+
CoreOptions options = store().options();
97+
long targetSplitSize = options.splitTargetSize();
98+
long openFileCost = options.splitOpenFileCost();
99+
Comparator<DataFileMeta> comparator = CREATION_TIME_COMPARATOR;
100+
if (bucketMode() == BucketMode.HASH_FIXED && options.bucketAppendOrdered()) {
101+
comparator = SEQUENCE_NUMBER_COMPARATOR;
102+
}
93103
return coreOptions().dataEvolutionEnabled()
94104
? new DataEvolutionSplitGenerator(targetSplitSize, openFileCost)
95-
: new AppendOnlySplitGenerator(targetSplitSize, openFileCost, bucketMode());
105+
: new AppendOnlySplitGenerator(
106+
targetSplitSize, openFileCost, bucketMode(), comparator);
96107
}
97108

98109
@Override

paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,27 @@
3232
/** Append only implementation of {@link SplitGenerator}. */
3333
public class AppendOnlySplitGenerator implements SplitGenerator {
3434

35-
public static final Comparator<DataFileMeta> DATA_FILE_COMPARATOR =
35+
public static final Comparator<DataFileMeta> CREATION_TIME_COMPARATOR =
3636
Comparator.comparing(DataFileMeta::creationTime).thenComparing(DataFileMeta::fileName);
3737

38+
public static final Comparator<DataFileMeta> SEQUENCE_NUMBER_COMPARATOR =
39+
Comparator.comparing(DataFileMeta::minSequenceNumber)
40+
.thenComparing(DataFileMeta::fileName);
41+
3842
private final long targetSplitSize;
3943
private final long openFileCost;
4044
private final BucketMode bucketMode;
45+
private final Comparator<DataFileMeta> fileComparator;
4146

4247
public AppendOnlySplitGenerator(
43-
long targetSplitSize, long openFileCost, BucketMode bucketMode) {
48+
long targetSplitSize,
49+
long openFileCost,
50+
BucketMode bucketMode,
51+
Comparator<DataFileMeta> fileComparator) {
4452
this.targetSplitSize = targetSplitSize;
4553
this.openFileCost = openFileCost;
4654
this.bucketMode = bucketMode;
55+
this.fileComparator = fileComparator;
4756
}
4857

4958
@Override
@@ -54,7 +63,7 @@ public boolean alwaysRawConvertible() {
5463
@Override
5564
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
5665
List<DataFileMeta> files = new ArrayList<>(input);
57-
files.sort(DATA_FILE_COMPARATOR);
66+
files.sort(fileComparator);
5867
Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openFileCost);
5968
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
6069
.map(SplitGroup::rawConvertibleGroup)

paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
676676
MIN_FILE_NUM,
677677
targetFileSize,
678678
false,
679+
true,
679680
compactBefore -> {
680681
latch.await();
681682
return compactBefore.isEmpty()

paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ private void innerTest(
208208
minFileNum,
209209
targetFileSize,
210210
false,
211+
true,
211212
null, // not used
212213
null);
213214
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();

paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
9090
null,
9191
0,
9292
new BucketedAppendCompactManager(
93-
null, toCompact, null, 4, 10, false, null, null), // not used
93+
null, toCompact, null, 4, 10, false, true, null, null), // not used
9494
null,
9595
false,
9696
dataFilePathFactory,

paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.apache.paimon.fs.local.LocalFileIO;
3838
import org.apache.paimon.io.BundleRecords;
3939
import org.apache.paimon.io.DataFileMeta;
40+
import org.apache.paimon.manifest.ManifestFile;
4041
import org.apache.paimon.manifest.ManifestFileMeta;
42+
import org.apache.paimon.manifest.ManifestList;
4143
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
4244
import org.apache.paimon.operation.FileStoreWrite;
4345
import org.apache.paimon.options.MemorySize;
@@ -103,6 +105,7 @@
103105
import java.util.stream.Collectors;
104106

105107
import static org.apache.paimon.CoreOptions.BUCKET;
108+
import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED;
106109
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
107110
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
108111
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -121,6 +124,56 @@
121124
/** Tests for {@link AppendOnlyFileStoreTable}. */
122125
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
123126

127+
@Test
128+
public void testBucketedAppendTableWriteWithInit() throws Exception {
129+
innerTestBucketedAppendTableWriteInit(true);
130+
}
131+
132+
@Test
133+
public void testBucketedAppendTableWriteNoInit() throws Exception {
134+
innerTestBucketedAppendTableWriteInit(false);
135+
}
136+
137+
public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws Exception {
138+
FileStoreTable table =
139+
createFileStoreTable(
140+
options -> {
141+
options.set(BUCKET, 2);
142+
options.set(BUCKET_KEY, "a");
143+
options.set(WRITE_ONLY, true);
144+
options.set(BUCKET_APPEND_ORDERED, ordered);
145+
});
146+
147+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
148+
149+
// 1. first write
150+
try (BatchTableWrite write = writeBuilder.newWrite();
151+
BatchTableCommit commit = writeBuilder.newCommit()) {
152+
write.write(rowData(1, 10, 100L));
153+
commit.commit(write.prepareCommit());
154+
}
155+
156+
// 2. delete all manifests
157+
ManifestList manifestList = table.store().manifestListFactory().create();
158+
ManifestFile manifestFile = table.store().manifestFileFactory().create();
159+
List<ManifestFileMeta> manifests =
160+
manifestList.readAllManifests(table.latestSnapshot().get());
161+
for (ManifestFileMeta manifest : manifests) {
162+
manifestFile.delete(manifest.fileName());
163+
}
164+
165+
// 3. check new write
166+
try (BatchTableWrite write = writeBuilder.newWrite()) {
167+
if (ordered) {
168+
assertThatThrownBy(() -> write.write(rowData(1, 10, 100L)))
169+
.hasMessageContaining("FileNotFoundException");
170+
} else {
171+
// no exception
172+
write.write(rowData(1, 10, 100L));
173+
}
174+
}
175+
}
176+
124177
@Test
125178
public void testOverwriteNeverFail() throws Exception {
126179
FileStoreTable table = createFileStoreTable();

paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
3838
import static org.apache.paimon.io.DataFileTestUtils.fromMinMax;
3939
import static org.apache.paimon.io.DataFileTestUtils.newFile;
40+
import static org.apache.paimon.table.source.AppendOnlySplitGenerator.SEQUENCE_NUMBER_COMPARATOR;
4041
import static org.assertj.core.api.Assertions.assertThat;
4142

4243
/** Test for {@link AppendOnlySplitGenerator} and {@link MergeTreeSplitGenerator}. */
@@ -76,7 +77,11 @@ public void testAppend() {
7677
newFileFromSequence("6", 101, 61, 100));
7778
assertThat(
7879
toNames(
79-
new AppendOnlySplitGenerator(40, 2, BucketMode.HASH_FIXED)
80+
new AppendOnlySplitGenerator(
81+
40,
82+
2,
83+
BucketMode.HASH_FIXED,
84+
SEQUENCE_NUMBER_COMPARATOR)
8085
.splitForBatch(files)))
8186
.containsExactlyInAnyOrder(
8287
Arrays.asList("1", "2"),
@@ -86,7 +91,11 @@ public void testAppend() {
8691

8792
assertThat(
8893
toNames(
89-
new AppendOnlySplitGenerator(70, 2, BucketMode.HASH_FIXED)
94+
new AppendOnlySplitGenerator(
95+
70,
96+
2,
97+
BucketMode.HASH_FIXED,
98+
SEQUENCE_NUMBER_COMPARATOR)
9099
.splitForBatch(files)))
91100
.containsExactlyInAnyOrder(
92101
Arrays.asList("1", "2", "3"),
@@ -95,7 +104,11 @@ public void testAppend() {
95104

96105
assertThat(
97106
toNames(
98-
new AppendOnlySplitGenerator(40, 20, BucketMode.HASH_FIXED)
107+
new AppendOnlySplitGenerator(
108+
40,
109+
20,
110+
BucketMode.HASH_FIXED,
111+
SEQUENCE_NUMBER_COMPARATOR)
99112
.splitForBatch(files)))
100113
.containsExactlyInAnyOrder(
101114
Arrays.asList("1", "2"),

0 commit comments

Comments
 (0)