|
24 | 24 | import org.apache.paimon.Snapshot; |
25 | 25 | import org.apache.paimon.TestFileStore; |
26 | 26 | import org.apache.paimon.TestKeyValueGenerator; |
| 27 | +import org.apache.paimon.consumer.Consumer; |
| 28 | +import org.apache.paimon.consumer.ConsumerManager; |
27 | 29 | import org.apache.paimon.data.BinaryRow; |
28 | 30 | import org.apache.paimon.data.Timestamp; |
29 | 31 | import org.apache.paimon.fs.FileIO; |
@@ -661,6 +663,48 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception |
661 | 663 | assertSnapshot(latestSnapshotId, allData, snapshotPositions); |
662 | 664 | } |
663 | 665 |
|
| 666 | + @Test |
| 667 | + public void testConsumerChangelogOnly() throws Exception { |
| 668 | + List<KeyValue> allData = new ArrayList<>(); |
| 669 | + List<Integer> snapshotPositions = new ArrayList<>(); |
| 670 | + commit(10, allData, snapshotPositions); |
| 671 | + |
| 672 | + // create a consumer at snapshot 3 |
| 673 | + ConsumerManager consumerManager = new ConsumerManager(fileIO, new Path(tempDir.toUri())); |
| 674 | + consumerManager.resetConsumer("myConsumer", new Consumer(3)); |
| 675 | + |
| 676 | + // without consumerChangelogOnly, consumer should prevent snapshot expiration |
| 677 | + ExpireConfig configDefault = |
| 678 | + ExpireConfig.builder() |
| 679 | + .snapshotRetainMin(1) |
| 680 | + .snapshotRetainMax(1) |
| 681 | + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) |
| 682 | + .build(); |
| 683 | + store.newExpire(configDefault).expire(); |
| 684 | + |
| 685 | + // earliest snapshot should be 3 (protected by consumer) |
| 686 | + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(3L); |
| 687 | + |
| 688 | + // with consumerChangelogOnly=true, consumer should NOT prevent snapshot expiration |
| 689 | + ExpireConfig configChangelogOnly = |
| 690 | + ExpireConfig.builder() |
| 691 | + .snapshotRetainMin(1) |
| 692 | + .snapshotRetainMax(1) |
| 693 | + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) |
| 694 | + .consumerChangelogOnly(true) |
| 695 | + .build(); |
| 696 | + store.newExpire(configChangelogOnly).expire(); |
| 697 | + |
| 698 | + int latestSnapshotId2 = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); |
| 699 | + // earliest snapshot should be latestSnapshotId (consumer no longer protects snapshots) |
| 700 | + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo((long) latestSnapshotId2); |
| 701 | + assertSnapshot(latestSnapshotId2, allData, snapshotPositions); |
| 702 | + |
| 703 | + // clean up consumer file so assertCleaned passes |
| 704 | + consumerManager.deleteConsumer("myConsumer"); |
| 705 | + store.assertCleaned(); |
| 706 | + } |
| 707 | + |
664 | 708 | private TestFileStore createStore() { |
665 | 709 | ThreadLocalRandom random = ThreadLocalRandom.current(); |
666 | 710 |
|
|
0 commit comments