3434import com .alibaba .fluss .row .BinaryString ;
3535import com .alibaba .fluss .row .GenericRow ;
3636import com .alibaba .fluss .types .DataTypes ;
37+ import com .alibaba .fluss .utils .types .Tuple2 ;
3738
3839import org .apache .iceberg .Snapshot ;
3940import org .apache .iceberg .Table ;
4748import org .apache .iceberg .io .CloseableIterator ;
4849import org .apache .iceberg .types .Types ;
4950import org .junit .jupiter .api .BeforeEach ;
50- import org .junit .jupiter .api .Test ;
5151import org .junit .jupiter .api .io .TempDir ;
52+ import org .junit .jupiter .params .ParameterizedTest ;
53+ import org .junit .jupiter .params .provider .Arguments ;
54+ import org .junit .jupiter .params .provider .MethodSource ;
5255
5356import javax .annotation .Nullable ;
5457
5760import java .time .OffsetDateTime ;
5861import java .time .ZoneOffset ;
5962import java .util .ArrayList ;
63+ import java .util .Arrays ;
6064import java .util .Collections ;
6165import java .util .HashMap ;
66+ import java .util .HashSet ;
6267import java .util .List ;
6368import java .util .Map ;
69+ import java .util .Set ;
70+ import java .util .stream .Stream ;
6471
6572import static com .alibaba .fluss .lake .iceberg .utils .IcebergConversions .toIceberg ;
6673import static com .alibaba .fluss .metadata .TableDescriptor .BUCKET_COLUMN_NAME ;
6774import static com .alibaba .fluss .metadata .TableDescriptor .OFFSET_COLUMN_NAME ;
6875import static com .alibaba .fluss .metadata .TableDescriptor .TIMESTAMP_COLUMN_NAME ;
76+ import static com .alibaba .fluss .record .ChangeType .DELETE ;
77+ import static com .alibaba .fluss .record .ChangeType .INSERT ;
78+ import static com .alibaba .fluss .record .ChangeType .UPDATE_AFTER ;
79+ import static com .alibaba .fluss .record .ChangeType .UPDATE_BEFORE ;
6980import static org .assertj .core .api .Assertions .assertThat ;
7081
7182/** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
@@ -87,10 +98,15 @@ void beforeEach() {
8798 icebergLakeTieringFactory = new IcebergLakeTieringFactory (configuration );
8899 }
89100
90- @ Test
91- void testTieringWriteTable () throws Exception {
101+ private static Stream <Arguments > tieringWriteArgs () {
102+ return Stream .of (Arguments .of (true ), Arguments .of (false ));
103+ }
104+
105+ @ ParameterizedTest
106+ @ MethodSource ("tieringWriteArgs" )
107+ void testTieringWriteTable (boolean isPrimaryKeyTable ) throws Exception {
92108 TablePath tablePath = TablePath .of ("iceberg" , "test_table" );
93- createTable (tablePath );
109+ createTable (tablePath , isPrimaryKeyTable );
94110
95111 Table icebergTable = icebergCatalog .loadTable (toIceberg (tablePath ));
96112
@@ -107,11 +123,17 @@ void testTieringWriteTable() throws Exception {
107123 // first, write data
108124 for (int bucket = 0 ; bucket < bucketNum ; bucket ++) {
109125 try (LakeWriter <IcebergWriteResult > writer = createLakeWriter (tablePath , bucket )) {
110- List <LogRecord > records = genLogTableRecords (bucket , 5 );
111- for (LogRecord record : records ) {
126+ Tuple2 <List <LogRecord >, List <LogRecord >> writeAndExpectRecords =
127+ isPrimaryKeyTable
128+ ? genPrimaryKeyTableRecords (bucket )
129+ : genLogTableRecords (bucket , 5 );
130+
131+ List <LogRecord > writtenRecords = writeAndExpectRecords .f0 ;
132+ List <LogRecord > expectRecords = writeAndExpectRecords .f1 ;
133+ for (LogRecord record : writtenRecords ) {
112134 writer .write (record );
113135 }
114- recordsByBucket .put (bucket , records );
136+ recordsByBucket .put (bucket , expectRecords );
115137 IcebergWriteResult result = writer .complete ();
116138 byte [] serialized = writeResultSerializer .serialize (result );
117139 icebergWriteResults .add (
@@ -142,7 +164,11 @@ void testTieringWriteTable() throws Exception {
142164 for (int bucket = 0 ; bucket < 3 ; bucket ++) {
143165 List <LogRecord > expectRecords = recordsByBucket .get (bucket );
144166 CloseableIterator <Record > actualRecords = getIcebergRows (icebergTable , bucket );
145- verifyLogTableRecords (actualRecords , bucket , expectRecords );
167+ if (isPrimaryKeyTable ) {
168+ verifyPrimaryKeyTableRecord (actualRecords , expectRecords , bucket );
169+ } else {
170+ verifyLogTableRecords (actualRecords , bucket , expectRecords );
171+ }
146172 }
147173 }
148174
@@ -187,7 +213,8 @@ private LakeCommitter<IcebergWriteResult, IcebergCommittable> createLakeCommitte
187213 return icebergLakeTieringFactory .createLakeCommitter (() -> tablePath );
188214 }
189215
190- private List <LogRecord > genLogTableRecords (int bucket , int numRecords ) {
216+ private Tuple2 <List <LogRecord >, List <LogRecord >> genLogTableRecords (
217+ int bucket , int numRecords ) {
191218 List <LogRecord > logRecords = new ArrayList <>();
192219 for (int i = 0 ; i < numRecords ; i ++) {
193220 GenericRow genericRow = new GenericRow (3 );
@@ -200,10 +227,67 @@ private List<LogRecord> genLogTableRecords(int bucket, int numRecords) {
200227 i , System .currentTimeMillis (), ChangeType .APPEND_ONLY , genericRow );
201228 logRecords .add (logRecord );
202229 }
203- return logRecords ;
230+ return Tuple2 .of (logRecords , logRecords );
231+ }
232+
233+ private Tuple2 <List <LogRecord >, List <LogRecord >> genPrimaryKeyTableRecords (int bucket ) {
234+ int offset = -1 ;
235+ // gen +I, -U, +U, -D
236+ List <GenericRow > rows = genKvRow (bucket , 0 , 0 , 4 );
237+ List <LogRecord > writtenLogRecords =
238+ new ArrayList <>(
239+ Arrays .asList (
240+ toRecord (++offset , rows .get (0 ), INSERT ),
241+ toRecord (++offset , rows .get (1 ), UPDATE_BEFORE ),
242+ toRecord (++offset , rows .get (2 ), UPDATE_AFTER ),
243+ toRecord (++offset , rows .get (3 ), DELETE )));
244+ List <LogRecord > expectLogRecords = new ArrayList <>();
245+
246+ // gen +I, -U, +U
247+ rows = genKvRow (bucket , 1 , 4 , 7 );
248+ writtenLogRecords .addAll (
249+ Arrays .asList (
250+ toRecord (++offset , rows .get (0 ), INSERT ),
251+ toRecord (++offset , rows .get (1 ), UPDATE_BEFORE ),
252+ toRecord (++offset , rows .get (2 ), UPDATE_AFTER )));
253+ expectLogRecords .add (writtenLogRecords .get (writtenLogRecords .size () - 1 ));
254+
255+ // gen +I, +U
256+ rows = genKvRow (bucket , 2 , 7 , 9 );
257+ writtenLogRecords .addAll (
258+ Arrays .asList (
259+ toRecord (++offset , rows .get (0 ), INSERT ),
260+ toRecord (++offset , rows .get (1 ), UPDATE_AFTER )));
261+ expectLogRecords .add (writtenLogRecords .get (writtenLogRecords .size () - 1 ));
262+
263+ // gen +I
264+ rows = genKvRow (bucket , 3 , 9 , 10 );
265+ writtenLogRecords .add (toRecord (++offset , rows .get (0 ), INSERT ));
266+ expectLogRecords .add (writtenLogRecords .get (writtenLogRecords .size () - 1 ));
267+
268+ return Tuple2 .of (writtenLogRecords , expectLogRecords );
269+ }
270+
271+ private List <GenericRow > genKvRow (int bucket , int key , int from , int to ) {
272+ List <GenericRow > rows = new ArrayList <>();
273+ for (int i = from ; i < to ; i ++) {
274+ GenericRow genericRow ;
275+ // Non-partitioned table
276+ genericRow = new GenericRow (3 );
277+ genericRow .setField (0 , key );
278+ genericRow .setField (1 , BinaryString .fromString ("bucket" + bucket + "_" + i ));
279+ genericRow .setField (2 , BinaryString .fromString ("bucket" + bucket ));
280+
281+ rows .add (genericRow );
282+ }
283+ return rows ;
284+ }
285+
286+ private GenericRecord toRecord (long offset , GenericRow row , ChangeType changeType ) {
287+ return new GenericRecord (offset , System .currentTimeMillis (), changeType , row );
204288 }
205289
206- private void createTable (TablePath tablePath ) throws Exception {
290+ private void createTable (TablePath tablePath , boolean isPrimaryTable ) throws Exception {
207291 Namespace namespace = Namespace .of (tablePath .getDatabaseName ());
208292 if (icebergCatalog instanceof SupportsNamespaces ) {
209293 SupportsNamespaces ns = (SupportsNamespaces ) icebergCatalog ;
@@ -212,15 +296,24 @@ private void createTable(TablePath tablePath) throws Exception {
212296 }
213297 }
214298
299+ Set <Integer > identifierFieldIds = new HashSet <>();
300+ if (isPrimaryTable ) {
301+ identifierFieldIds .add (1 );
302+ }
303+
215304 org .apache .iceberg .Schema schema =
216305 new org .apache .iceberg .Schema (
217- Types .NestedField .optional (1 , "c1" , Types .IntegerType .get ()),
218- Types .NestedField .optional (2 , "c2" , Types .StringType .get ()),
219- Types .NestedField .optional (3 , "c3" , Types .StringType .get ()),
220- Types .NestedField .required (4 , BUCKET_COLUMN_NAME , Types .IntegerType .get ()),
221- Types .NestedField .required (5 , OFFSET_COLUMN_NAME , Types .LongType .get ()),
222- Types .NestedField .required (
223- 6 , TIMESTAMP_COLUMN_NAME , Types .TimestampType .withZone ()));
306+ Arrays .asList (
307+ Types .NestedField .required (1 , "c1" , Types .IntegerType .get ()),
308+ Types .NestedField .optional (2 , "c2" , Types .StringType .get ()),
309+ Types .NestedField .optional (3 , "c3" , Types .StringType .get ()),
310+ Types .NestedField .required (
311+ 4 , BUCKET_COLUMN_NAME , Types .IntegerType .get ()),
312+ Types .NestedField .required (
313+ 5 , OFFSET_COLUMN_NAME , Types .LongType .get ()),
314+ Types .NestedField .required (
315+ 6 , TIMESTAMP_COLUMN_NAME , Types .TimestampType .withZone ())),
316+ identifierFieldIds );
224317
225318 TableIdentifier tableId =
226319 TableIdentifier .of (tablePath .getDatabaseName (), tablePath .getTableName ());
@@ -258,4 +351,34 @@ private void verifyLogTableRecords(
258351 .isEqualTo (expectRecord .timestamp ());
259352 }
260353 }
354+
355+ private void verifyPrimaryKeyTableRecord (
356+ CloseableIterator <Record > actualRecords ,
357+ List <LogRecord > expectRecords ,
358+ int expectBucket )
359+ throws Exception {
360+ for (LogRecord expectRecord : expectRecords ) {
361+ Record actualRow = actualRecords .next ();
362+ // check business columns:
363+ assertThat (actualRow .get (0 )).isEqualTo (expectRecord .getRow ().getInt (0 ));
364+ assertThat (actualRow .get (1 ).toString ())
365+ .isEqualTo (expectRecord .getRow ().getString (1 ).toString ());
366+
367+ // For non-partitioned tables
368+ assertThat (actualRow .get (2 ).toString ())
369+ .isEqualTo (expectRecord .getRow ().getString (2 ).toString ());
370+ // check system columns: __bucket, __offset, __timestamp
371+ assertThat (actualRow .get (3 )).isEqualTo (expectBucket );
372+ assertThat (actualRow .get (4 )).isEqualTo (expectRecord .logOffset ());
373+ assertThat (
374+ actualRow
375+ .get (5 , OffsetDateTime .class )
376+ .atZoneSameInstant (ZoneOffset .UTC )
377+ .toInstant ()
378+ .toEpochMilli ())
379+ .isEqualTo (expectRecord .timestamp ());
380+ }
381+ assertThat (actualRecords .hasNext ()).isFalse ();
382+ actualRecords .close ();
383+ }
261384}
0 commit comments