1919package org .apache .fluss .lake .iceberg .maintenance ;
2020
2121import org .apache .fluss .lake .iceberg .testutils .FlinkIcebergTieringTestBase ;
22+ import org .apache .fluss .metadata .Schema ;
2223import org .apache .fluss .metadata .TableBucket ;
2324import org .apache .fluss .metadata .TablePath ;
2425import org .apache .fluss .row .InternalRow ;
26+ import org .apache .fluss .types .DataTypes ;
2527
2628import org .apache .flink .core .execution .JobClient ;
2729import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
30+ import org .apache .iceberg .data .Record ;
2831import org .junit .jupiter .api .BeforeAll ;
2932import org .junit .jupiter .api .Test ;
3033
3134import java .util .ArrayList ;
3235import java .util .Arrays ;
36+ import java .util .Collections ;
37+ import java .util .Comparator ;
38+ import java .util .Iterator ;
3339import java .util .List ;
3440
3541import static org .apache .fluss .testutils .DataTestUtils .row ;
42+ import static org .assertj .core .api .Assertions .assertThat ;
3643
3744/** Integration test for Iceberg compaction. */
3845class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {
3946 protected static final String DEFAULT_DB = "fluss" ;
4047
4148 private static StreamExecutionEnvironment execEnv ;
4249
50+ private static final Schema pkSchema =
51+ Schema .newBuilder ()
52+ .column ("f_int" , DataTypes .INT ())
53+ .column ("f_string" , DataTypes .STRING ())
54+ .primaryKey ("f_int" )
55+ .build ();
56+
57+ private static final Schema logSchema =
58+ Schema .newBuilder ()
59+ .column ("f_int" , DataTypes .INT ())
60+ .column ("f_string" , DataTypes .STRING ())
61+ .build ();
62+
4363 @ BeforeAll
4464 protected static void beforeAll () {
4565 FlinkIcebergTieringTestBase .beforeAll ();
@@ -48,42 +68,148 @@ protected static void beforeAll() {
4868 execEnv .enableCheckpointing (1000 );
4969 }
5070
71+ @ Test
72+ void testPkTableCompaction () throws Exception {
73+ JobClient jobClient = buildTieringJob (execEnv );
74+ try {
75+ TablePath t1 = TablePath .of (DEFAULT_DB , "pk_table_1" );
76+ long t1Id = createPkTable (t1 , 1 , true , pkSchema );
77+ TableBucket t1Bucket = new TableBucket (t1Id , 0 );
78+ List <InternalRow > flussRows = new ArrayList <>();
79+
80+ List <InternalRow > rows = Collections .singletonList (row (1 , "v1" ));
81+ writeIcebergTableRecords (t1 , t1Bucket , 1 , false , rows );
82+ flussRows .addAll (rows );
83+
84+ rows = Collections .singletonList (row (2 , "v1" ));
85+ writeIcebergTableRecords (t1 , t1Bucket , 2 , false , rows );
86+ flussRows .addAll (rows );
87+
88+ // add pos-delete
89+ rows = Arrays .asList (row (3 , "v1" ), row (3 , "v2" ));
90+ writeIcebergTableRecords (t1 , t1Bucket , 5 , false , rows );
91+ // one UPDATE_BEFORE and one UPDATE_AFTER
92+ checkFileStatusInIcebergTable (t1 , 3 , true );
93+ flussRows .add (rows .get (1 ));
94+
95+ // trigger compaction
96+ rows = Collections .singletonList (row (4 , "v1" ));
97+ writeIcebergTableRecords (t1 , t1Bucket , 6 , false , rows );
98+ checkFileStatusInIcebergTable (t1 , 2 , false );
99+ flussRows .addAll (rows );
100+
101+ checkRecords (getIcebergRecords (t1 ), flussRows );
102+ } finally {
103+ jobClient .cancel ().get ();
104+ }
105+ }
106+
107+ private void checkRecords (List <Record > actualRows , List <InternalRow > expectedRows ) {
108+ // check records size
109+ assertThat (actualRows .size ()).isEqualTo (expectedRows .size ());
110+
111+ // check records content
112+ Iterator <Record > actualIterator =
113+ actualRows .stream ()
114+ .sorted (Comparator .comparingInt ((Record r ) -> (int ) r .get (0 )))
115+ .iterator ();
116+ Iterator <InternalRow > expectedIterator =
117+ expectedRows .stream ().sorted (Comparator .comparingInt (r -> r .getInt (0 ))).iterator ();
118+ while (actualIterator .hasNext () && expectedIterator .hasNext ()) {
119+ Record record = actualIterator .next ();
120+ InternalRow row = expectedIterator .next ();
121+ assertThat (record .get (0 )).isEqualTo (row .getInt (0 ));
122+ assertThat (record .get (1 )).isEqualTo (row .getString (1 ).toString ());
123+ }
124+ assertThat (actualIterator .hasNext ()).isFalse ();
125+ assertThat (expectedIterator .hasNext ()).isFalse ();
126+ }
127+
128+ @ Test
129+ void testPkTableCompactionWithConflict () throws Exception {
130+ JobClient jobClient = buildTieringJob (execEnv );
131+ try {
132+ TablePath t1 = TablePath .of (DEFAULT_DB , "pk_table_2" );
133+ long t1Id = createPkTable (t1 , 1 , true , pkSchema );
134+ TableBucket t1Bucket = new TableBucket (t1Id , 0 );
135+ List <InternalRow > flussRows = new ArrayList <>();
136+
137+ List <InternalRow > rows = Collections .singletonList (row (1 , "v1" ));
138+ flussRows .addAll (writeIcebergTableRecords (t1 , t1Bucket , 1 , false , rows ));
139+ checkFileStatusInIcebergTable (t1 , 1 , false );
140+
141+ rows = Collections .singletonList (row (2 , "v1" ));
142+ flussRows .addAll (writeIcebergTableRecords (t1 , t1Bucket , 2 , false , rows ));
143+
144+ rows = Collections .singletonList (row (3 , "v1" ));
145+ flussRows .addAll (writeIcebergTableRecords (t1 , t1Bucket , 3 , false , rows ));
146+
147+ // add pos-delete and trigger compaction
148+ rows = Arrays .asList (row (4 , "v1" ), row (4 , "v2" ));
149+ flussRows .add (writeIcebergTableRecords (t1 , t1Bucket , 6 , false , rows ).get (1 ));
150+ // rewritten files should fail to commit due to conflict, add check here
151+ checkRecords (getIcebergRecords (t1 ), flussRows );
152+ // 4 data file and 1 delete file
153+ checkFileStatusInIcebergTable (t1 , 4 , true );
154+
155+ // previous compaction conflicts won't prevent further compaction, and check iceberg
156+ // records
157+ rows = Collections .singletonList (row (5 , "v1" ));
158+ flussRows .addAll (writeIcebergTableRecords (t1 , t1Bucket , 7 , false , rows ));
159+ checkRecords (getIcebergRecords (t1 ), flussRows );
160+ checkFileStatusInIcebergTable (t1 , 2 , false );
161+ } finally {
162+ jobClient .cancel ().get ();
163+ }
164+ }
165+
51166 @ Test
52167 void testLogTableCompaction () throws Exception {
53168 JobClient jobClient = buildTieringJob (execEnv );
54169 try {
55170 TablePath t1 = TablePath .of (DEFAULT_DB , "log_table" );
56- long t1Id = createLogTable (t1 , true );
171+ long t1Id = createLogTable (t1 , 1 , true , logSchema );
57172 TableBucket t1Bucket = new TableBucket (t1Id , 0 );
58173
59174 int i = 0 ;
60175 List <InternalRow > flussRows = new ArrayList <>();
61- flussRows .addAll (writeLogTableRecords (t1 , t1Bucket , ++i ));
176+ flussRows .addAll (
177+ writeIcebergTableRecords (
178+ t1 , t1Bucket , ++i , true , Collections .singletonList (row (1 , "v1" ))));
62179
63- flussRows .addAll (writeLogTableRecords (t1 , t1Bucket , ++i ));
180+ flussRows .addAll (
181+ writeIcebergTableRecords (
182+ t1 , t1Bucket , ++i , true , Collections .singletonList (row (1 , "v1" ))));
64183
65- flussRows .addAll (writeLogTableRecords (t1 , t1Bucket , ++i ));
66- checkFileCountInIcebergTable (t1 , 3 );
184+ flussRows .addAll (
185+ writeIcebergTableRecords (
186+ t1 , t1Bucket , ++i , true , Collections .singletonList (row (1 , "v1" ))));
187+ checkFileStatusInIcebergTable (t1 , 3 , false );
67188
68189 // Write should trigger compaction now since the current data file count is greater or
69190 // equal MIN_FILES_TO_COMPACT
70- flussRows .addAll (writeLogTableRecords (t1 , t1Bucket , ++i ));
191+ flussRows .addAll (
192+ writeIcebergTableRecords (
193+ t1 , t1Bucket , ++i , true , Collections .singletonList (row (1 , "v1" ))));
71194 // Should only have two files now, one file it for newly written, one file is for target
72195 // compacted file
73- checkFileCountInIcebergTable (t1 , 2 );
196+ checkFileStatusInIcebergTable (t1 , 2 , false );
74197
75198 // check data in iceberg to make sure compaction won't lose data or duplicate data
76- checkDataInIcebergAppendOnlyTable ( t1 , flussRows , 0 );
199+ checkRecords ( getIcebergRecords ( t1 ) , flussRows );
77200 } finally {
78201 jobClient .cancel ().get ();
79202 }
80203 }
81204
82- private List <InternalRow > writeLogTableRecords (
83- TablePath tablePath , TableBucket tableBucket , long expectedLogEndOffset )
205+ private List <InternalRow > writeIcebergTableRecords (
206+ TablePath tablePath ,
207+ TableBucket tableBucket ,
208+ long expectedLogEndOffset ,
209+ boolean append ,
210+ List <InternalRow > rows )
84211 throws Exception {
85- List <InternalRow > rows = Arrays .asList (row (1 , "v1" ));
86- writeRows (tablePath , rows , true );
212+ writeRows (tablePath , rows , append );
87213 assertReplicaStatus (tableBucket , expectedLogEndOffset );
88214 return rows ;
89215 }
0 commit comments