Skip to content

Commit deeed7e

Browse files
authored
[core] Preserving old Iceberg Metadata files (apache#5228)
1 parent 737da60 commit deeed7e

File tree

4 files changed

+61
-9
lines changed

4 files changed

+61
-9
lines changed

docs/layouts/shortcodes/generated/iceberg_configuration.html

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,26 @@
3030
<td><h5>metadata.iceberg.compaction.max.file-num</h5></td>
3131
<td style="word-wrap: break-word;">50</td>
3232
<td>Integer</td>
33-
<td>If number of small Iceberg metadata files exceeds this limit, always trigger metadata compaction regardless of their total size.</td>
33+
<td>If number of small Iceberg manifest metadata files exceeds this limit, always trigger manifest metadata compaction regardless of their total size.</td>
3434
</tr>
3535
<tr>
3636
<td><h5>metadata.iceberg.compaction.min.file-num</h5></td>
3737
<td style="word-wrap: break-word;">10</td>
3838
<td>Integer</td>
39-
<td>Minimum number of Iceberg metadata files to trigger metadata compaction.</td>
39+
<td>Minimum number of Iceberg manifest metadata files to trigger manifest metadata compaction.</td>
4040
</tr>
4141
<tr>
4242
<td><h5>metadata.iceberg.database</h5></td>
4343
<td style="word-wrap: break-word;">(none)</td>
4444
<td>String</td>
4545
<td>Metastore database name for Iceberg Catalog. Set this as an iceberg database alias if using a centralized Catalog.</td>
4646
</tr>
47+
<tr>
48+
<td><h5>metadata.iceberg.delete-after-commit.enabled</h5></td>
49+
<td style="word-wrap: break-word;">true</td>
50+
<td>Boolean</td>
51+
<td>Whether to delete old metadata files after each table commit</td>
52+
</tr>
4753
<tr>
4854
<td><h5>metadata.iceberg.glue.skip-archive</h5></td>
4955
<td style="word-wrap: break-word;">false</td>
@@ -80,6 +86,12 @@
8086
<td>Boolean</td>
8187
<td>Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.</td>
8288
</tr>
89+
<tr>
90+
<td><h5>metadata.iceberg.previous-versions-max</h5></td>
91+
<td style="word-wrap: break-word;">0</td>
92+
<td>Integer</td>
93+
<td>The number of old metadata files to keep after each table commit</td>
94+
</tr>
8395
<tr>
8496
<td><h5>metadata.iceberg.storage</h5></td>
8597
<td style="word-wrap: break-word;">disabled</td>

paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ private void createMetadataWithBase(
424424
new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME),
425425
String.valueOf(snapshotId));
426426

427-
table.fileIO().deleteQuietly(baseMetadataPath);
427+
deleteApplicableMetadataFiles(snapshotId);
428428
for (int i = 0; i + 1 < toExpireExceptLast.size(); i++) {
429429
expireManifestList(
430430
new Path(toExpireExceptLast.get(i).manifestList()).getName(),
@@ -752,7 +752,25 @@ private void expireAllBefore(long snapshotId) throws IOException {
752752
}
753753
table.fileIO().deleteQuietly(listPath);
754754
}
755-
table.fileIO().deleteQuietly(path);
755+
deleteApplicableMetadataFiles(snapshotId);
756+
}
757+
}
758+
759+
private void deleteApplicableMetadataFiles(long snapshotId) throws IOException {
760+
Options options = new Options(table.options());
761+
if (options.get(IcebergOptions.METADATA_DELETE_AFTER_COMMIT)) {
762+
long earliestMetadataId =
763+
snapshotId - options.get(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX);
764+
if (earliestMetadataId > 0) {
765+
Iterator<Path> it =
766+
pathFactory
767+
.getAllMetadataPathBefore(table.fileIO(), earliestMetadataId)
768+
.iterator();
769+
while (it.hasNext()) {
770+
Path path = it.next();
771+
table.fileIO().deleteQuietly(path);
772+
}
773+
}
756774
}
757775
}
758776

paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,29 @@ public class IcebergOptions {
4242
.intType()
4343
.defaultValue(10)
4444
.withDescription(
45-
"Minimum number of Iceberg metadata files to trigger metadata compaction.");
45+
"Minimum number of Iceberg manifest metadata files to trigger manifest metadata compaction.");
4646

4747
public static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
4848
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
4949
.intType()
5050
.defaultValue(50)
5151
.withDescription(
52-
"If number of small Iceberg metadata files exceeds this limit, "
53-
+ "always trigger metadata compaction regardless of their total size.");
52+
"If number of small Iceberg manifest metadata files exceeds this limit, "
53+
+ "always trigger manifest metadata compaction regardless of their total size.");
54+
55+
public static final ConfigOption<Boolean> METADATA_DELETE_AFTER_COMMIT =
56+
key("metadata.iceberg.delete-after-commit.enabled")
57+
.booleanType()
58+
.defaultValue(true)
59+
.withDescription(
60+
"Whether to delete old metadata files after each table commit");
61+
62+
public static final ConfigOption<Integer> METADATA_PREVIOUS_VERSIONS_MAX =
63+
key("metadata.iceberg.previous-versions-max")
64+
.intType()
65+
.defaultValue(0)
66+
.withDescription(
67+
"The number of old metadata files to keep after each table commit");
5468

5569
public static final ConfigOption<String> URI =
5670
key("metadata.iceberg.uri")

paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,15 +440,21 @@ public void testIcebergSnapshotExpire() throws Exception {
440440
write.close();
441441
commit.close();
442442

443-
// The old metadata.json is removed when the new metadata.json is created.
444-
for (int i = 1; i <= 4; i++) {
443+
// The old metadata.json is removed when the new metadata.json is created
444+
// depending on the old metadata retention configuration.
445+
for (int i = 1; i <= 3; i++) {
445446
unusedFiles.add(pathFactory.toMetadataPath(i).toString());
446447
}
447448

448449
for (String path : unusedFiles) {
449450
assertThat(fileIO.exists(new Path(path))).isFalse();
450451
}
451452

453+
// Check existence of retained Iceberg metadata.json files
454+
for (int i = 4; i <= 5; i++) {
455+
assertThat(fileIO.exists(new Path(pathFactory.toMetadataPath(i).toString()))).isTrue();
456+
}
457+
452458
// Test all existing Iceberg snapshots are valid.
453459
assertThat(getIcebergResult())
454460
.containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 21)", "Record(3, 31)");
@@ -961,6 +967,8 @@ private FileStoreTable createPaimonTable(
961967
options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
962968
options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
963969
options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
970+
options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
971+
options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
964972
options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, MemorySize.ofKibiBytes(8));
965973
Schema schema =
966974
new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), "");

0 commit comments

Comments
 (0)