Skip to content

Commit add8d42

Browse files
committed
fix
1 parent efab033 commit add8d42

4 files changed

Lines changed: 146 additions & 63 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@
4343

4444
import java.io.File;
4545
import java.io.IOException;
46+
import java.util.Collections;
4647
import java.util.List;
4748
import java.util.Optional;
49+
import java.util.Set;
4850
import java.util.concurrent.ExecutionException;
4951
import java.util.concurrent.ExecutorService;
5052
import java.util.stream.IntStream;
@@ -204,10 +206,14 @@ private CompactResult compact(boolean fullCompaction) throws Exception {
204206
// Snapshot sorted files before Phase 1 to avoid including newly created files in Phase 2
205207
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
206208
for (DataFileMeta file : unsortedFiles) {
209+
Set<String> originalFileNames = Collections.singleton(file.fileName());
207210
List<DataFileMeta> sortedFiles =
208211
fileRewriter.sortAndRewriteFiles(
209-
singletonList(file), kvSerializer, kvSchemaType);
210-
keyIndex.updateIndex(file, sortedFiles);
212+
singletonList(file),
213+
kvSerializer,
214+
kvSchemaType,
215+
keyIndex,
216+
originalFileNames);
211217
result.before().add(file);
212218
result.after().addAll(sortedFiles);
213219
}
@@ -232,19 +238,23 @@ private CompactResult compact(boolean fullCompaction) throws Exception {
232238
keyIndex.rebuildIndex(newFile);
233239
}
234240
// Remove stale deletion vectors for merged-away files
235-
for (DataFileMeta file : mergeGroup) {
236-
dvMaintainer.removeDeletionVectorOf(file.fileName());
241+
if (dvMaintainer != null) {
242+
for (DataFileMeta file : mergeGroup) {
243+
dvMaintainer.removeDeletionVectorOf(file.fileName());
244+
}
237245
}
238246
result.before().addAll(mergeGroup);
239247
result.after().addAll(mergedFiles);
240248
}
241249
}
242250

243-
CompactDeletionFile deletionFile =
244-
lazyGenDeletionFile
245-
? CompactDeletionFile.lazyGeneration(dvMaintainer)
246-
: CompactDeletionFile.generateFiles(dvMaintainer);
247-
result.setDeletionFile(deletionFile);
251+
if (dvMaintainer != null) {
252+
CompactDeletionFile deletionFile =
253+
lazyGenDeletionFile
254+
? CompactDeletionFile.lazyGeneration(dvMaintainer)
255+
: CompactDeletionFile.generateFiles(dvMaintainer);
256+
result.setDeletionFile(deletionFile);
257+
}
248258
return result;
249259
}
250260

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.data.InternalRow;
2828
import org.apache.paimon.data.serializer.BinaryRowSerializer;
2929
import org.apache.paimon.data.serializer.InternalRowSerializer;
30+
import org.apache.paimon.data.serializer.RowCompactedSerializer;
3031
import org.apache.paimon.disk.ChannelReaderInputView;
3132
import org.apache.paimon.disk.ChannelReaderInputViewIterator;
3233
import org.apache.paimon.disk.ChannelWithMeta;
@@ -47,13 +48,16 @@
4748
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
4849
import org.apache.paimon.utils.MutableObjectIterator;
4950

51+
import javax.annotation.Nullable;
52+
5053
import java.io.IOException;
5154
import java.util.ArrayList;
5255
import java.util.Arrays;
5356
import java.util.Collections;
5457
import java.util.Comparator;
5558
import java.util.List;
5659
import java.util.PriorityQueue;
60+
import java.util.Set;
5761

5862
/**
5963
* Handles file rewriting for clustering compaction, including sorting unsorted files (Phase 1) and
@@ -112,10 +116,19 @@ public ClusteringFileRewriter(
112116

113117
/**
114118
* Sort and rewrite unsorted files by clustering columns. Reads all KeyValue records, sorts them
115-
* using an external sort buffer, and writes to new level-1 files.
119+
* using an external sort buffer, and writes to new level-1 files. Checks the key index inline
120+
* during writing to handle deduplication (FIRST_ROW skips duplicates, DEDUPLICATE marks old
121+
* positions in DV) and updates the index without re-reading the output files.
122+
*
123+
* @param keyIndex the key index for inline checking and batch update, or null to skip
124+
* @param originalFileNames file names of the original files being replaced (for index check)
116125
*/
117126
public List<DataFileMeta> sortAndRewriteFiles(
118-
List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer, RowType kvSchemaType)
127+
List<DataFileMeta> inputFiles,
128+
KeyValueSerializer kvSerializer,
129+
RowType kvSchemaType,
130+
@Nullable ClusteringKeyIndex keyIndex,
131+
Set<String> originalFileNames)
119132
throws Exception {
120133
int[] sortFieldsInKeyValue =
121134
Arrays.stream(clusteringColumns)
@@ -145,17 +158,29 @@ public List<DataFileMeta> sortAndRewriteFiles(
145158
}
146159
}
147160

161+
RowCompactedSerializer keySerializer =
162+
keyIndex != null ? new RowCompactedSerializer(keyType) : null;
163+
List<byte[]> collectedKeys = keyIndex != null ? new ArrayList<>() : null;
164+
148165
RollingFileWriter<KeyValue, DataFileMeta> writer =
149166
writerFactory.createRollingClusteringFileWriter();
150167
try {
151168
MutableObjectIterator<BinaryRow> sortedIterator = sortBuffer.sortedIterator();
152169
BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount());
153170
while ((binaryRow = sortedIterator.next(binaryRow)) != null) {
154171
KeyValue kv = kvSerializer.fromRow(binaryRow);
155-
writer.write(
172+
KeyValue copied =
156173
kv.copy(
157174
new InternalRowSerializer(keyType),
158-
new InternalRowSerializer(valueType)));
175+
new InternalRowSerializer(valueType));
176+
if (keyIndex != null) {
177+
byte[] keyBytes = keySerializer.serializeToBytes(copied.key());
178+
if (!keyIndex.checkKey(keyBytes, originalFileNames)) {
179+
continue;
180+
}
181+
collectedKeys.add(keyBytes);
182+
}
183+
writer.write(copied);
159184
}
160185
} finally {
161186
sortBuffer.clear();
@@ -170,6 +195,16 @@ public List<DataFileMeta> sortAndRewriteFiles(
170195
fileLevels.addNewFile(newFile);
171196
}
172197

198+
// Batch update index using collected keys, split by file rowCount
199+
if (keyIndex != null) {
200+
int offset = 0;
201+
for (DataFileMeta newFile : newFiles) {
202+
int count = (int) newFile.rowCount();
203+
keyIndex.batchPutIndex(newFile, collectedKeys.subList(offset, offset + count));
204+
offset += count;
205+
}
206+
}
207+
173208
return newFiles;
174209
}
175210

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import java.io.IOException;
4646
import java.util.AbstractMap;
4747
import java.util.ArrayList;
48-
import java.util.Collections;
49-
import java.util.HashSet;
5048
import java.util.Iterator;
5149
import java.util.List;
5250
import java.util.Map;
@@ -209,61 +207,47 @@ public Map.Entry<byte[], byte[]> next() {
209207
}
210208

211209
/**
212-
* Update the key index after a single original file is replaced by new sorted files.
210+
* Check a key against the index during sort-and-rewrite writing.
213211
*
214-
* <p>For DEDUPLICATE mode: mark the old position in deletion vectors, keep the new position.
212+
* <p>For FIRST_ROW mode: if key exists pointing to a non-original file, return false (skip
213+
* writing this record — it's a duplicate).
215214
*
216-
* <p>For FIRST_ROW mode: if key exists, mark the new position in deletion vectors (keep the
217-
* first/old one); if key is new, store the new position.
215+
* <p>For DEDUPLICATE mode: if key exists pointing to a non-original file, mark the old position
216+
* in deletion vectors, return true (write the new record).
217+
*
218+
* @param keyBytes serialized key bytes
219+
* @param originalFileNames file names of the original unsorted files being replaced
220+
* @return true if the record should be written, false to skip (FIRST_ROW dedup)
218221
*/
219-
public void updateIndex(DataFileMeta originalFile, List<DataFileMeta> newSortedFiles)
220-
throws Exception {
221-
updateIndex(Collections.singletonList(originalFile), newSortedFiles);
222+
public boolean checkKey(byte[] keyBytes, Set<String> originalFileNames) throws Exception {
223+
byte[] oldValue = kvDb.get(keyBytes);
224+
if (oldValue != null) {
225+
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
226+
int oldFileId = decodeInt(valueIn);
227+
int oldPosition = decodeInt(valueIn);
228+
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
229+
if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) {
230+
if (firstRow) {
231+
return false;
232+
} else {
233+
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
234+
}
235+
}
236+
}
237+
return true;
222238
}
223239

224240
/**
225-
* Update the key index after multiple original files are replaced by new sorted files.
226-
*
227-
* @see #updateIndex(DataFileMeta, List)
241+
* Batch update the key index for a new sorted file using pre-collected key bytes. Avoids
242+
* re-reading the file.
228243
*/
229-
public void updateIndex(List<DataFileMeta> originalFiles, List<DataFileMeta> newSortedFiles)
230-
throws Exception {
231-
RowCompactedSerializer keySerializer = new RowCompactedSerializer(keyType);
232-
233-
Set<String> originalFileNames = new HashSet<>();
234-
for (DataFileMeta file : originalFiles) {
235-
originalFileNames.add(file.fileName());
236-
}
237-
238-
for (DataFileMeta sortedFile : newSortedFiles) {
239-
int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
240-
int position = 0;
241-
try (CloseableIterator<InternalRow> iterator = readKeyIterator(sortedFile)) {
242-
while (iterator.hasNext()) {
243-
byte[] key = keySerializer.serializeToBytes(iterator.next());
244-
byte[] oldValue = kvDb.get(key);
245-
if (oldValue != null) {
246-
ByteArrayInputStream valueIn = new ByteArrayInputStream(oldValue);
247-
int oldFileId = decodeInt(valueIn);
248-
int oldPosition = decodeInt(valueIn);
249-
DataFileMeta oldFile = fileLevels.getFileById(oldFileId);
250-
if (oldFile != null && !originalFileNames.contains(oldFile.fileName())) {
251-
if (firstRow) {
252-
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
253-
position++;
254-
continue;
255-
} else {
256-
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
257-
}
258-
}
259-
}
260-
ByteArrayOutputStream value = new ByteArrayOutputStream(8);
261-
encodeInt(value, fileId);
262-
encodeInt(value, position);
263-
kvDb.put(key, value.toByteArray());
264-
position++;
265-
}
266-
}
244+
public void batchPutIndex(DataFileMeta sortedFile, List<byte[]> keyBytesList) throws Exception {
245+
int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
246+
for (int position = 0; position < keyBytesList.size(); position++) {
247+
ByteArrayOutputStream value = new ByteArrayOutputStream(8);
248+
encodeInt(value, fileId);
249+
encodeInt(value, position);
250+
kvDb.put(keyBytesList.get(position), value.toByteArray());
267251
}
268252
}
269253

paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.data.InternalRow;
2929
import org.apache.paimon.disk.IOManager;
3030
import org.apache.paimon.fs.Path;
31+
import org.apache.paimon.io.DataFileMeta;
3132
import org.apache.paimon.manifest.IndexManifestEntry;
3233
import org.apache.paimon.predicate.Predicate;
3334
import org.apache.paimon.predicate.PredicateBuilder;
@@ -633,6 +634,59 @@ public void testFirstRowDeletionVectorCorrectness() throws Exception {
633634
.containsExactlyInAnyOrder(GenericRow.of(1, 10), GenericRow.of(2, 20));
634635
}
635636

637+
/**
638+
* Test that FIRST_ROW inline dedup actually reduces the number of records written. Duplicate
639+
* keys should be dropped during sort-and-rewrite, resulting in fewer total rows across data
640+
* files compared to the number of rows written.
641+
*/
642+
@Test
643+
public void testFirstRowInlineDedupReducesFileRows() throws Exception {
644+
Table firstRowTable = createFirstRowTable();
645+
646+
// Commit 1: write 5 unique keys
647+
writeRows(
648+
firstRowTable,
649+
Arrays.asList(
650+
GenericRow.of(1, 10),
651+
GenericRow.of(2, 20),
652+
GenericRow.of(3, 30),
653+
GenericRow.of(4, 40),
654+
GenericRow.of(5, 50)));
655+
656+
// Commit 2: write 5 duplicate keys (all should be dropped inline)
657+
writeRows(
658+
firstRowTable,
659+
Arrays.asList(
660+
GenericRow.of(1, 99),
661+
GenericRow.of(2, 99),
662+
GenericRow.of(3, 99),
663+
GenericRow.of(4, 99),
664+
GenericRow.of(5, 99)));
665+
666+
// Verify correctness: still see first values
667+
assertThat(readRows(firstRowTable))
668+
.containsExactlyInAnyOrder(
669+
GenericRow.of(1, 10),
670+
GenericRow.of(2, 20),
671+
GenericRow.of(3, 30),
672+
GenericRow.of(4, 40),
673+
GenericRow.of(5, 50));
674+
675+
// Verify optimization: total row count across all data files should be exactly 5
676+
// (duplicates dropped during writing, not just DV-marked)
677+
List<Split> splits = firstRowTable.newReadBuilder().newScan().plan().splits();
678+
long totalRows =
679+
splits.stream()
680+
.mapToLong(
681+
split ->
682+
((DataSplit) split)
683+
.dataFiles().stream()
684+
.mapToLong(DataFileMeta::rowCount)
685+
.sum())
686+
.sum();
687+
assertThat(totalRows).isEqualTo(5);
688+
}
689+
636690
/** Test first-row mode with many writes to trigger compaction. */
637691
@Test
638692
public void testFirstRowManyWrites() throws Exception {

0 commit comments

Comments
 (0)