1919package org .apache .paimon .flink ;
2020
2121import org .apache .paimon .CoreOptions ;
22+ import org .apache .paimon .Snapshot ;
23+ import org .apache .paimon .catalog .Catalog ;
24+ import org .apache .paimon .deletionvectors .DeletionVector ;
2225import org .apache .paimon .flink .util .AbstractTestBase ;
26+ import org .apache .paimon .index .IndexFileMeta ;
27+ import org .apache .paimon .manifest .IndexManifestEntry ;
2328import org .apache .paimon .table .FileStoreTable ;
2429import org .apache .paimon .table .Table ;
2530import org .apache .paimon .table .source .snapshot .TimeTravelUtil ;
4045
4146import java .math .BigDecimal ;
4247import java .util .Arrays ;
43- import java .util .Collections ;
4448import java .util .HashMap ;
4549import java .util .List ;
4650import java .util .Map ;
4751import java .util .concurrent .ThreadLocalRandom ;
4852import java .util .stream .Collectors ;
4953
54+ import static java .util .Collections .singletonList ;
5055import static org .apache .paimon .testutils .assertj .PaimonAssertions .anyCauseMatches ;
5156import static org .assertj .core .api .Assertions .assertThat ;
5257import static org .assertj .core .api .Assertions .assertThatThrownBy ;
@@ -56,7 +61,56 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
5661
5762 @ Override
5863 protected List <String > ddl () {
59- return Collections .singletonList ("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)" );
64+ return singletonList ("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)" );
65+ }
66+
67+ @ Test
68+ public void testFullCompactionNoDv () throws Catalog .TableNotExistException {
69+ sql (
70+ "CREATE TEMPORARY TABLE GEN (a INT) WITH ("
71+ + "'connector'='datagen', "
72+ + "'number-of-rows'='1000', "
73+ + "'fields.a.kind'='sequence', "
74+ + "'fields.a.start'='0', "
75+ + "'fields.a.end'='1000')" );
76+ sql (
77+ "CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING, c STRING) WITH ("
78+ + "'bucket' = '1', "
79+ + "'file.format' = 'avro', "
80+ + "'file.compression' = 'null', "
81+ + "'deletion-vectors.enabled' = 'true')" );
82+ batchSql ("INSERT INTO T1 SELECT a, 'unknown', 'unknown' FROM GEN" );
83+
84+ // first insert, producing dv files
85+ batchSql ("INSERT INTO T1 VALUES (1, '22', '33')" );
86+ FileStoreTable table = paimonTable ("T1" );
87+ Snapshot snapshot = table .latestSnapshot ().get ();
88+ assertThat (deletionVectors (table , snapshot )).hasSize (1 );
89+ assertThat (batchSql ("SELECT * FROM T1 WHERE a = 1" )).containsExactly (Row .of (1 , "22" , "33" ));
90+
91+ // second insert, producing no dv files
92+ batchSql ("ALTER TABLE T1 SET ('compaction.total-size-threshold' = '1m')" );
93+ batchSql ("INSERT INTO T1 VALUES (1, '44', '55')" );
94+ snapshot = table .latestSnapshot ().get ();
95+ assertThat (deletionVectors (table , snapshot )).hasSize (0 );
96+ assertThat (batchSql ("SELECT * FROM T1 WHERE a = 1" )).containsExactly (Row .of (1 , "44" , "55" ));
97+
98+ // third insert, producing no dv files, same index manifest
99+ batchSql ("INSERT INTO T1 VALUES (1, '66', '77')" );
100+ assertThat (table .latestSnapshot ().get ().indexManifest ())
101+ .isEqualTo (snapshot .indexManifest ());
102+ assertThat (batchSql ("SELECT * FROM T1 WHERE a = 1" )).containsExactly (Row .of (1 , "66" , "77" ));
103+ }
104+
105+ private Map <String , DeletionVector > deletionVectors (FileStoreTable table , Snapshot snapshot ) {
106+ assertThat (snapshot .indexManifest ()).isNotNull ();
107+ List <IndexManifestEntry > indexManifestEntries =
108+ table .indexManifestFileReader ().read (snapshot .indexManifest ());
109+ assertThat (indexManifestEntries .size ()).isEqualTo (1 );
110+ IndexFileMeta indexFileMeta = indexManifestEntries .get (0 ).indexFile ();
111+ return table .store ()
112+ .newIndexFileHandler ()
113+ .readAllDeletionVectors (singletonList (indexFileMeta ));
60114 }
61115
62116 @ Test
@@ -794,7 +848,7 @@ public void testIncrementScanMode() throws Exception {
794848 // snapshot 5,6
795849 String dataId =
796850 TestValuesTableFactory .registerData (
797- Collections . singletonList (Row .ofKind (RowKind .DELETE , 2 , "B" )));
851+ singletonList (Row .ofKind (RowKind .DELETE , 2 , "B" )));
798852 sEnv .executeSql (
799853 "CREATE TEMPORARY TABLE source (id INT, v STRING) "
800854 + "WITH ('connector' = 'values', 'bounded' = 'true', 'data-id' = '"
0 commit comments