Skip to content

Commit 01d4601

Browse files
committed
[core] Refactor IndexFileHandler to use dvIndex and hashIndex
1 parent 161ff4a commit 01d4601

File tree

17 files changed

+115
-127
lines changed

17 files changed

+115
-127
lines changed

paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.paimon.compact;
2020

2121
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
22-
import org.apache.paimon.index.IndexFileHandler;
22+
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
2323
import org.apache.paimon.index.IndexFileMeta;
2424

2525
import javax.annotation.Nullable;
@@ -42,7 +42,7 @@ public interface CompactDeletionFile {
4242
*/
4343
static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) {
4444
Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex();
45-
return new GeneratedDeletionFile(file.orElse(null), maintainer.indexFileHandler());
45+
return new GeneratedDeletionFile(file.orElse(null), maintainer.dvIndexFile());
4646
}
4747

4848
/** For sync compaction, only create deletion files when prepareCommit. */
@@ -54,14 +54,14 @@ static CompactDeletionFile lazyGeneration(BucketedDvMaintainer maintainer) {
5454
class GeneratedDeletionFile implements CompactDeletionFile {
5555

5656
@Nullable private final IndexFileMeta deletionFile;
57-
private final IndexFileHandler fileHandler;
57+
private final DeletionVectorsIndexFile dvIndexFile;
5858

5959
private boolean getInvoked = false;
6060

6161
public GeneratedDeletionFile(
62-
@Nullable IndexFileMeta deletionFile, IndexFileHandler fileHandler) {
62+
@Nullable IndexFileMeta deletionFile, DeletionVectorsIndexFile dvIndexFile) {
6363
this.deletionFile = deletionFile;
64-
this.fileHandler = fileHandler;
64+
this.dvIndexFile = dvIndexFile;
6565
}
6666

6767
@Override
@@ -92,7 +92,7 @@ public CompactDeletionFile mergeOldFile(CompactDeletionFile old) {
9292
@Override
9393
public void clean() {
9494
if (deletionFile != null) {
95-
fileHandler.deleteIndexFile(deletionFile);
95+
dvIndexFile.delete(deletionFile.fileName());
9696
}
9797
}
9898
}

paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,16 @@
3333
/** Maintainer of deletionVectors index. */
3434
public class BucketedDvMaintainer {
3535

36-
private final IndexFileHandler indexFileHandler;
36+
private final DeletionVectorsIndexFile dvIndexFile;
3737
private final Map<String, DeletionVector> deletionVectors;
3838
protected final boolean bitmap64;
3939
private boolean modified;
4040

4141
private BucketedDvMaintainer(
42-
IndexFileHandler fileHandler, Map<String, DeletionVector> deletionVectors) {
43-
this.indexFileHandler = fileHandler;
42+
DeletionVectorsIndexFile dvIndexFile, Map<String, DeletionVector> deletionVectors) {
43+
this.dvIndexFile = dvIndexFile;
4444
this.deletionVectors = deletionVectors;
45-
this.bitmap64 = indexFileHandler.deletionVectorsIndex().bitmap64();
45+
this.bitmap64 = dvIndexFile.bitmap64();
4646
this.modified = false;
4747
}
4848

@@ -114,8 +114,7 @@ public void removeDeletionVectorOf(String fileName) {
114114
public Optional<IndexFileMeta> writeDeletionVectorsIndex() {
115115
if (modified) {
116116
modified = false;
117-
return Optional.of(
118-
indexFileHandler.deletionVectorsIndex().writeSingleFile(deletionVectors));
117+
return Optional.of(dvIndexFile.writeSingleFile(deletionVectors));
119118
}
120119
return Optional.empty();
121120
}
@@ -131,8 +130,8 @@ public Optional<DeletionVector> deletionVectorOf(String fileName) {
131130
return Optional.ofNullable(deletionVectors.get(fileName));
132131
}
133132

134-
public IndexFileHandler indexFileHandler() {
135-
return indexFileHandler;
133+
public DeletionVectorsIndexFile dvIndexFile() {
134+
return dvIndexFile;
136135
}
137136

138137
@VisibleForTesting
@@ -170,12 +169,8 @@ public BucketedDvMaintainer create(@Nullable List<IndexFileMeta> restoredFiles)
170169
return create(deletionVectors);
171170
}
172171

173-
public BucketedDvMaintainer create() {
174-
return create(new HashMap<>());
175-
}
176-
177172
public BucketedDvMaintainer create(Map<String, DeletionVector> deletionVectors) {
178-
return new BucketedDvMaintainer(handler, deletionVectors);
173+
return new BucketedDvMaintainer(handler.dvIndex(), deletionVectors);
179174
}
180175
}
181176
}

paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,20 @@ static AppendDeleteFileMaintainer forUnawareAppend(
8080
.collect(Collectors.toList());
8181
Map<String, DeletionFile> deletionFiles = new HashMap<>();
8282
for (IndexManifestEntry file : manifestEntries) {
83-
IndexFileMeta meta = file.indexFile();
84-
LinkedHashMap<String, DeletionVectorMeta> dvMetas = meta.deletionVectorMetas();
83+
LinkedHashMap<String, DeletionVectorMeta> dvMetas =
84+
file.indexFile().deletionVectorMetas();
8585
checkNotNull(dvMetas);
8686
for (DeletionVectorMeta dvMeta : dvMetas.values()) {
8787
deletionFiles.put(
8888
dvMeta.dataFileName(),
8989
new DeletionFile(
90-
indexFileHandler.filePath(meta).toString(),
90+
indexFileHandler.filePath(file).toString(),
9191
dvMeta.offset(),
9292
dvMeta.length(),
9393
dvMeta.cardinality()));
9494
}
9595
}
9696
return new AppendDeleteFileMaintainer(
97-
indexFileHandler.deletionVectorsIndex(), partition, manifestEntries, deletionFiles);
97+
indexFileHandler.dvIndex(), partition, manifestEntries, deletionFiles);
9898
}
9999
}

paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
4444
import org.apache.paimon.index.DeletionVectorMeta;
4545
import org.apache.paimon.index.IndexFileHandler;
46-
import org.apache.paimon.index.IndexFileMeta;
4746
import org.apache.paimon.io.DataFileMeta;
4847
import org.apache.paimon.io.DataFilePathFactory;
4948
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -1148,8 +1147,8 @@ private List<IcebergManifestFileMeta> createDvManifestFileMetas(Snapshot snapsho
11481147
return Collections.emptyList();
11491148
}
11501149
for (IndexManifestEntry entry : newIndexes) {
1151-
IndexFileMeta indexFileMeta = entry.indexFile();
1152-
LinkedHashMap<String, DeletionVectorMeta> dvMetas = indexFileMeta.deletionVectorMetas();
1150+
LinkedHashMap<String, DeletionVectorMeta> dvMetas =
1151+
entry.indexFile().deletionVectorMetas();
11531152
Path bucketPath = fileStorePathFactory.bucketPath(entry.partition(), entry.bucket());
11541153
if (dvMetas != null) {
11551154
for (DeletionVectorMeta dvMeta : dvMetas.values()) {
@@ -1162,16 +1161,16 @@ private List<IcebergManifestFileMeta> createDvManifestFileMetas(Snapshot snapsho
11621161
"cardinality in DeletionVector is null, stop generate dv for iceberg. "
11631162
+ "dataFile path is {}, indexFile path is {}",
11641163
new Path(bucketPath, dvMeta.dataFileName()),
1165-
indexFileHandler.filePath(indexFileMeta).toString());
1164+
indexFileHandler.filePath(entry).toString());
11661165

11671166
IcebergDataFileMeta deleteFileMeta =
11681167
IcebergDataFileMeta.createForDeleteFile(
11691168
IcebergDataFileMeta.Content.POSITION_DELETES,
1170-
indexFileHandler.filePath(indexFileMeta).toString(),
1169+
indexFileHandler.filePath(entry).toString(),
11711170
PUFFIN_FORMAT,
11721171
entry.partition(),
11731172
dvMeta.cardinality(),
1174-
indexFileMeta.fileSize(),
1173+
entry.indexFile().fileSize(),
11751174
new Path(bucketPath, dvMeta.dataFileName()).toString(),
11761175
(long) dvMeta.offset(),
11771176
(long) dvMeta.length());

paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,25 @@
3636
/** An Index Maintainer for dynamic bucket to maintain key hashcode in a bucket. */
3737
public class DynamicBucketIndexMaintainer {
3838

39-
private final IndexFileHandler fileHandler;
39+
private final HashIndexFile indexFile;
4040
private final IntHashSet hashcode;
4141

4242
private boolean modified;
4343

4444
private DynamicBucketIndexMaintainer(
45-
IndexFileHandler fileHandler, @Nullable IndexFileMeta restoredFile) {
46-
this.fileHandler = fileHandler;
45+
HashIndexFile indexFile, @Nullable IndexFileMeta restoredFile) {
46+
this.indexFile = indexFile;
4747
IntHashSet hashcode = new IntHashSet();
4848
if (restoredFile != null) {
4949
hashcode = new IntHashSet((int) restoredFile.rowCount());
50-
restore(fileHandler, hashcode, restoredFile);
50+
restore(indexFile, hashcode, restoredFile);
5151
}
5252
this.hashcode = hashcode;
5353
this.modified = false;
5454
}
5555

56-
private void restore(IndexFileHandler fileHandler, IntHashSet hashcode, IndexFileMeta file) {
57-
try (IntIterator iterator = fileHandler.readHashIndex(file)) {
56+
private void restore(HashIndexFile indexFile, IntHashSet hashcode, IndexFileMeta file) {
57+
try (IntIterator iterator = indexFile.read(file.fileName())) {
5858
while (true) {
5959
try {
6060
hashcode.add(iterator.next());
@@ -80,8 +80,12 @@ public void notifyNewRecord(KeyValue record) {
8080

8181
public List<IndexFileMeta> prepareCommit() {
8282
if (modified) {
83-
IndexFileMeta entry =
84-
fileHandler.writeHashIndex(hashcode.size(), hashcode.toIntIterator());
83+
IndexFileMeta entry;
84+
try {
85+
entry = indexFile.write(hashcode.size(), hashcode.toIntIterator());
86+
} catch (IOException e) {
87+
throw new RuntimeException(e);
88+
}
8589
modified = false;
8690
return Collections.singletonList(entry);
8791
}
@@ -107,7 +111,7 @@ public IndexFileHandler indexFileHandler() {
107111
}
108112

109113
public DynamicBucketIndexMaintainer create(@Nullable IndexFileMeta restoredFile) {
110-
return new DynamicBucketIndexMaintainer(handler, restoredFile);
114+
return new DynamicBucketIndexMaintainer(handler.hashIndex(), restoredFile);
111115
}
112116
}
113117
}

paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.utils.PathFactory;
2525

2626
import java.io.IOException;
27+
import java.util.List;
2728

2829
import static org.apache.paimon.utils.IntFileUtils.readInts;
2930
import static org.apache.paimon.utils.IntFileUtils.writeInts;
@@ -45,9 +46,22 @@ public IntIterator read(String fileName) throws IOException {
4546
return readInts(fileIO, pathFactory.toPath(fileName));
4647
}
4748

49+
public List<Integer> readList(String fileName) throws IOException {
50+
return IntIterator.toIntList(read(fileName));
51+
}
52+
4853
public String write(IntIterator input) throws IOException {
4954
Path path = pathFactory.newPath();
5055
writeInts(fileIO, path, input);
5156
return path.getName();
5257
}
58+
59+
public IndexFileMeta write(int size, IntIterator input) throws IOException {
60+
String fileName = write(input);
61+
return new IndexFileMeta(HASH_INDEX, fileName, fileSize(fileName), size);
62+
}
63+
64+
public IndexFileMeta write(int[] ints) throws IOException {
65+
return write(ints.length, IntIterator.create(ints));
66+
}
5367
}

paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.index;
2020

2121
import org.apache.paimon.fs.FileIO;
22+
import org.apache.paimon.fs.Path;
2223
import org.apache.paimon.utils.PathFactory;
2324

2425
import java.io.IOException;
@@ -36,21 +37,25 @@ public IndexFile(FileIO fileIO, PathFactory pathFactory) {
3637
this.pathFactory = pathFactory;
3738
}
3839

40+
public Path path(String fileName) {
41+
return pathFactory.toPath(fileName);
42+
}
43+
3944
public long fileSize(String fileName) {
4045
try {
41-
return fileIO.getFileSize(pathFactory.toPath(fileName));
46+
return fileIO.getFileSize(path(fileName));
4247
} catch (IOException e) {
4348
throw new UncheckedIOException(e);
4449
}
4550
}
4651

4752
public void delete(String fileName) {
48-
fileIO.deleteQuietly(pathFactory.toPath(fileName));
53+
fileIO.deleteQuietly(path(fileName));
4954
}
5055

5156
public boolean exists(String fileName) {
5257
try {
53-
return fileIO.exists(pathFactory.toPath(fileName));
58+
return fileIO.exists(path(fileName));
5459
} catch (IOException e) {
5560
throw new UncheckedIOException(e);
5661
}

0 commit comments

Comments
 (0)