@@ -282,4 +282,267 @@ void testNestedArrayType() {
282282 assertThat (innerArray2 .size ()).isEqualTo (3 );
283283 assertThat (innerArray2 .getInt (0 )).isEqualTo (3 );
284284 }
285+
286+ @ Test
287+ void testArrayWithAllPrimitiveTypes () {
288+ int tableBucket = 0 ;
289+ RowType tableRowType =
290+ RowType .of (
291+ new org .apache .paimon .types .ArrayType (
292+ new org .apache .paimon .types .BooleanType ()),
293+ new org .apache .paimon .types .ArrayType (
294+ new org .apache .paimon .types .TinyIntType ()),
295+ new org .apache .paimon .types .ArrayType (
296+ new org .apache .paimon .types .SmallIntType ()),
297+ new org .apache .paimon .types .ArrayType (
298+ new org .apache .paimon .types .IntType ()),
299+ new org .apache .paimon .types .ArrayType (
300+ new org .apache .paimon .types .BigIntType ()),
301+ new org .apache .paimon .types .ArrayType (
302+ new org .apache .paimon .types .FloatType ()),
303+ new org .apache .paimon .types .ArrayType (
304+ new org .apache .paimon .types .DoubleType ()),
305+ // system columns: __bucket, __offset, __timestamp
306+ new org .apache .paimon .types .IntType (),
307+ new org .apache .paimon .types .BigIntType (),
308+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
309+
310+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
311+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
312+ long logOffset = 0 ;
313+ long timeStamp = System .currentTimeMillis ();
314+ GenericRow genericRow = new GenericRow (7 );
315+ genericRow .setField (0 , new GenericArray (new boolean [] {true , false , true }));
316+ genericRow .setField (1 , new GenericArray (new byte [] {1 , 2 , 3 }));
317+ genericRow .setField (2 , new GenericArray (new short [] {100 , 200 , 300 }));
318+ genericRow .setField (3 , new GenericArray (new int [] {1000 , 2000 , 3000 }));
319+ genericRow .setField (4 , new GenericArray (new long [] {10000L , 20000L , 30000L }));
320+ genericRow .setField (5 , new GenericArray (new float [] {1.1f , 2.2f , 3.3f }));
321+ genericRow .setField (6 , new GenericArray (new double [] {1.11 , 2.22 , 3.33 }));
322+
323+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
324+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
325+
326+ // Test boolean array
327+ InternalArray boolArray = flussRecordAsPaimonRow .getArray (0 );
328+ assertThat (boolArray .size ()).isEqualTo (3 );
329+ assertThat (boolArray .getBoolean (0 )).isTrue ();
330+ assertThat (boolArray .getBoolean (1 )).isFalse ();
331+ assertThat (boolArray .getBoolean (2 )).isTrue ();
332+ assertThat (boolArray .toBooleanArray ()).isEqualTo (new boolean [] {true , false , true });
333+
334+ // Test byte array
335+ InternalArray byteArray = flussRecordAsPaimonRow .getArray (1 );
336+ assertThat (byteArray .size ()).isEqualTo (3 );
337+ assertThat (byteArray .getByte (0 )).isEqualTo ((byte ) 1 );
338+ assertThat (byteArray .getByte (1 )).isEqualTo ((byte ) 2 );
339+ assertThat (byteArray .toByteArray ()).isEqualTo (new byte [] {1 , 2 , 3 });
340+
341+ // Test short array
342+ InternalArray shortArray = flussRecordAsPaimonRow .getArray (2 );
343+ assertThat (shortArray .size ()).isEqualTo (3 );
344+ assertThat (shortArray .getShort (0 )).isEqualTo ((short ) 100 );
345+ assertThat (shortArray .toShortArray ()).isEqualTo (new short [] {100 , 200 , 300 });
346+
347+ // Test int array
348+ InternalArray intArray = flussRecordAsPaimonRow .getArray (3 );
349+ assertThat (intArray .toIntArray ()).isEqualTo (new int [] {1000 , 2000 , 3000 });
350+
351+ // Test long array
352+ InternalArray longArray = flussRecordAsPaimonRow .getArray (4 );
353+ assertThat (longArray .getLong (0 )).isEqualTo (10000L );
354+ assertThat (longArray .toLongArray ()).isEqualTo (new long [] {10000L , 20000L , 30000L });
355+
356+ // Test float array
357+ InternalArray floatArray = flussRecordAsPaimonRow .getArray (5 );
358+ assertThat (floatArray .getFloat (0 )).isEqualTo (1.1f );
359+ assertThat (floatArray .toFloatArray ()).isEqualTo (new float [] {1.1f , 2.2f , 3.3f });
360+
361+ // Test double array
362+ InternalArray doubleArray = flussRecordAsPaimonRow .getArray (6 );
363+ assertThat (doubleArray .getDouble (0 )).isEqualTo (1.11 );
364+ assertThat (doubleArray .toDoubleArray ()).isEqualTo (new double [] {1.11 , 2.22 , 3.33 });
365+
366+ // Verify system columns
367+ assertThat (flussRecordAsPaimonRow .getInt (7 )).isEqualTo (tableBucket );
368+ assertThat (flussRecordAsPaimonRow .getLong (8 )).isEqualTo (logOffset );
369+ }
370+
371+ @ Test
372+ void testArrayWithDecimalElements () {
373+ int tableBucket = 0 ;
374+ RowType tableRowType =
375+ RowType .of (
376+ new org .apache .paimon .types .ArrayType (
377+ new org .apache .paimon .types .DecimalType (10 , 2 )),
378+ // system columns
379+ new org .apache .paimon .types .IntType (),
380+ new org .apache .paimon .types .BigIntType (),
381+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
382+
383+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
384+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
385+ long logOffset = 0 ;
386+ long timeStamp = System .currentTimeMillis ();
387+ GenericRow genericRow = new GenericRow (1 );
388+ genericRow .setField (
389+ 0 ,
390+ new GenericArray (
391+ new Object [] {
392+ Decimal .fromBigDecimal (new BigDecimal ("123.45" ), 10 , 2 ),
393+ Decimal .fromBigDecimal (new BigDecimal ("678.90" ), 10 , 2 )
394+ }));
395+
396+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
397+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
398+
399+ InternalArray array = flussRecordAsPaimonRow .getArray (0 );
400+ assertThat (array .size ()).isEqualTo (2 );
401+ assertThat (array .getDecimal (0 , 10 , 2 ).toBigDecimal ()).isEqualTo (new BigDecimal ("123.45" ));
402+ assertThat (array .getDecimal (1 , 10 , 2 ).toBigDecimal ()).isEqualTo (new BigDecimal ("678.90" ));
403+ }
404+
405+ @ Test
406+ void testArrayWithTimestampElements () {
407+ int tableBucket = 0 ;
408+ RowType tableRowType =
409+ RowType .of (
410+ new org .apache .paimon .types .ArrayType (
411+ new org .apache .paimon .types .TimestampType (3 )),
412+ // system columns
413+ new org .apache .paimon .types .IntType (),
414+ new org .apache .paimon .types .BigIntType (),
415+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
416+
417+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
418+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
419+ long logOffset = 0 ;
420+ long timeStamp = System .currentTimeMillis ();
421+ GenericRow genericRow = new GenericRow (1 );
422+ genericRow .setField (
423+ 0 ,
424+ new GenericArray (
425+ new Object [] {
426+ TimestampNtz .fromMillis (1698235273182L ),
427+ TimestampNtz .fromMillis (1698235274000L )
428+ }));
429+
430+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
431+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
432+
433+ InternalArray array = flussRecordAsPaimonRow .getArray (0 );
434+ assertThat (array .size ()).isEqualTo (2 );
435+ assertThat (array .getTimestamp (0 , 3 ).getMillisecond ()).isEqualTo (1698235273182L );
436+ assertThat (array .getTimestamp (1 , 3 ).getMillisecond ()).isEqualTo (1698235274000L );
437+ }
438+
439+ @ Test
440+ void testArrayWithBinaryElements () {
441+ int tableBucket = 0 ;
442+ RowType tableRowType =
443+ RowType .of (
444+ new org .apache .paimon .types .ArrayType (
445+ new org .apache .paimon .types .VarBinaryType ()),
446+ // system columns
447+ new org .apache .paimon .types .IntType (),
448+ new org .apache .paimon .types .BigIntType (),
449+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
450+
451+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
452+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
453+ long logOffset = 0 ;
454+ long timeStamp = System .currentTimeMillis ();
455+ GenericRow genericRow = new GenericRow (1 );
456+ genericRow .setField (
457+ 0 , new GenericArray (new Object [] {new byte [] {1 , 2 , 3 }, new byte [] {4 , 5 , 6 , 7 }}));
458+
459+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
460+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
461+
462+ InternalArray array = flussRecordAsPaimonRow .getArray (0 );
463+ assertThat (array .size ()).isEqualTo (2 );
464+ assertThat (array .getBinary (0 )).isEqualTo (new byte [] {1 , 2 , 3 });
465+ assertThat (array .getBinary (1 )).isEqualTo (new byte [] {4 , 5 , 6 , 7 });
466+ }
467+
468+ @ Test
469+ void testNullArray () {
470+ int tableBucket = 0 ;
471+ RowType tableRowType =
472+ RowType .of (
473+ new org .apache .paimon .types .ArrayType (new org .apache .paimon .types .IntType ())
474+ .nullable (),
475+ // system columns
476+ new org .apache .paimon .types .IntType (),
477+ new org .apache .paimon .types .BigIntType (),
478+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
479+
480+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
481+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
482+ long logOffset = 0 ;
483+ long timeStamp = System .currentTimeMillis ();
484+ GenericRow genericRow = new GenericRow (1 );
485+ genericRow .setField (0 , null );
486+
487+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
488+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
489+
490+ assertThat (flussRecordAsPaimonRow .isNullAt (0 )).isTrue ();
491+ }
492+
493+ @ Test
494+ void testArrayWithNullableElements () {
495+ int tableBucket = 0 ;
496+ RowType tableRowType =
497+ RowType .of (
498+ new org .apache .paimon .types .ArrayType (
499+ new org .apache .paimon .types .IntType ().nullable ()),
500+ // system columns
501+ new org .apache .paimon .types .IntType (),
502+ new org .apache .paimon .types .BigIntType (),
503+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
504+
505+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
506+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
507+ long logOffset = 0 ;
508+ long timeStamp = System .currentTimeMillis ();
509+ GenericRow genericRow = new GenericRow (1 );
510+ genericRow .setField (0 , new GenericArray (new Object [] {1 , null , 3 }));
511+
512+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
513+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
514+
515+ InternalArray array = flussRecordAsPaimonRow .getArray (0 );
516+ assertThat (array .size ()).isEqualTo (3 );
517+ assertThat (array .getInt (0 )).isEqualTo (1 );
518+ assertThat (array .isNullAt (1 )).isTrue ();
519+ assertThat (array .getInt (2 )).isEqualTo (3 );
520+ }
521+
522+ @ Test
523+ void testEmptyArray () {
524+ int tableBucket = 0 ;
525+ RowType tableRowType =
526+ RowType .of (
527+ new org .apache .paimon .types .ArrayType (
528+ new org .apache .paimon .types .IntType ()),
529+ // system columns
530+ new org .apache .paimon .types .IntType (),
531+ new org .apache .paimon .types .BigIntType (),
532+ new org .apache .paimon .types .LocalZonedTimestampType (3 ));
533+
534+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
535+ new FlussRecordAsPaimonRow (tableBucket , tableRowType );
536+ long logOffset = 0 ;
537+ long timeStamp = System .currentTimeMillis ();
538+ GenericRow genericRow = new GenericRow (1 );
539+ genericRow .setField (0 , new GenericArray (new int [] {}));
540+
541+ LogRecord logRecord = new GenericRecord (logOffset , timeStamp , APPEND_ONLY , genericRow );
542+ flussRecordAsPaimonRow .setFlussRecord (logRecord );
543+
544+ InternalArray array = flussRecordAsPaimonRow .getArray (0 );
545+ assertThat (array ).isNotNull ();
546+ assertThat (array .size ()).isEqualTo (0 );
547+ }
285548}
0 commit comments