Skip to content

Commit 9807fe4

Browse files
[core] Skip file index for changelog files (apache#7697)
Changelog files are sequentially consumed by streaming readers, file index (bloom filter/bitmap) provides no benefit. Skip generating .index files for changelog to avoid unnecessary storage overhead and potential orphan files after changelog expiration. These orphan .index files increase I/O pressure on the orphan file clean job.
1 parent ec15528 commit 9807fe4

2 files changed

Lines changed: 54 additions & 4 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ private KeyValueClusteringFileWriter createKvSeparatedFileWriter(
149149

150150
private KeyValueDataFileWriter createDataFileWriter(
151151
Path path, WriteFormatKey key, FileSource fileSource, boolean isExternalPath) {
152+
// Changelog is sequentially consumed, file index is unnecessary.
153+
FileIndexOptions indexOptions = key.isChangelog ? new FileIndexOptions() : fileIndexOptions;
152154
return formatContext.thinModeEnabled
153155
? new KeyValueThinDataFileWriterImpl(
154156
fileIO,
@@ -161,7 +163,7 @@ private KeyValueDataFileWriter createDataFileWriter(
161163
key.level,
162164
options,
163165
fileSource,
164-
fileIndexOptions,
166+
indexOptions,
165167
isExternalPath)
166168
: new KeyValueDataFileWriterImpl(
167169
fileIO,
@@ -174,7 +176,7 @@ private KeyValueDataFileWriter createDataFileWriter(
174176
key.level,
175177
options,
176178
fileSource,
177-
fileIndexOptions,
179+
indexOptions,
178180
isExternalPath);
179181
}
180182

paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,16 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
308308
}
309309

310310
protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String format) {
311+
Options options = new Options();
312+
options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
313+
return createWriterFactory(pathStr, format, options);
314+
}
315+
316+
protected KeyValueFileWriterFactory createWriterFactory(
317+
String pathStr, String format, Options options) {
311318
Path path = new Path(pathStr);
312319
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
313320
FileIO fileIO = FileIOFinder.find(path);
314-
Options options = new Options();
315-
options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
316321

317322
Function<String, FileStorePathFactory> pathFactoryMap =
318323
format1 ->
@@ -455,6 +460,49 @@ private void checkRollingFiles(
455460
}
456461
}
457462

463+
@Test
464+
void testChangelogFile() throws Exception {
465+
Options options = new Options();
466+
options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
467+
options.setString("file-index.bloom-filter.columns", "comment");
468+
options.setString("file-index.in-manifest-threshold", "1B");
469+
470+
KeyValueFileWriterFactory writerFactory =
471+
createWriterFactory(tempDir.toString(), "avro", options);
472+
473+
DataFileTestDataGenerator.Data data = gen.next();
474+
RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
475+
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);
476+
dataWriter.write(CloseableIterator.fromList(data.content, kv -> {}));
477+
dataWriter.close();
478+
List<DataFileMeta> dataFileMetas = dataWriter.result();
479+
480+
assertThat(dataFileMetas).isNotEmpty();
481+
assertThat(
482+
dataFileMetas.stream()
483+
.anyMatch(
484+
meta ->
485+
meta.extraFiles().stream()
486+
.anyMatch(
487+
f ->
488+
f.endsWith(
489+
DataFilePathFactory
490+
.INDEX_PATH_SUFFIX))))
491+
.isTrue();
492+
493+
RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
494+
writerFactory.createRollingChangelogFileWriter(0);
495+
changelogWriter.write(CloseableIterator.fromList(data.content, kv -> {}));
496+
changelogWriter.close();
497+
List<DataFileMeta> changelogMetas = changelogWriter.result();
498+
499+
assertThat(changelogMetas).isNotEmpty();
500+
for (DataFileMeta meta : changelogMetas) {
501+
assertThat(meta.extraFiles())
502+
.noneMatch(f -> f.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX));
503+
}
504+
}
505+
458506
@ParameterizedTest
459507
@ValueSource(strings = {"parquet", "orc", "avro"})
460508
public void testReaderUseFileSizeFromMetadata(String format) throws Exception {

0 commit comments

Comments
 (0)