Skip to content

Commit f5ef8a4

Browse files
[FLUSS-2251] Support NestedRow types for paimon
- Created FlussNestedRowAsPaimonRow adapter class to convert Fluss nested rows to Paimon nested rows - Implemented getRow method in FlussRowAsPaimonRow to support nested row fields in tables - Implemented getRow method in FlussArrayAsPaimonArray to support arrays of nested rows - Added comprehensive test cases covering: * Simple nested rows with primitive types * Deeply nested rows (row within row) * Arrays of nested rows * Nested rows with array fields * Nested rows with all primitive types * Null nested rows * Nested rows with nullable fields * Nested rows with decimal and timestamp fields
1 parent 3be2f3d commit f5ef8a4

File tree

3 files changed

+361
-5
lines changed

3 files changed

+361
-5
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,11 @@ public InternalMap getMap(int pos) {
166166

167167
@Override
168168
public InternalRow getRow(int pos, int numFields) {
169-
throw new UnsupportedOperationException(
170-
"getRow is not supported for Fluss array currently.");
169+
org.apache.fluss.row.InternalRow nestedFlussRow = flussArray.getRow(pos, numFields);
170+
return nestedFlussRow == null
171+
? null
172+
: new FlussRowAsPaimonRow(
173+
nestedFlussRow, (org.apache.paimon.types.RowType) elementType);
171174
}
172175

173176
@Override

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,12 @@ public InternalMap getMap(int pos) {
175175
}
176176

177177
@Override
178-
public InternalRow getRow(int pos, int pos1) {
179-
throw new UnsupportedOperationException(
180-
"getRow is not support for Fluss record currently.");
178+
public InternalRow getRow(int pos, int numFields) {
179+
org.apache.fluss.row.InternalRow nestedFlussRow = internalRow.getRow(pos, numFields);
180+
return nestedFlussRow == null
181+
? null
182+
: new FlussRowAsPaimonRow(
183+
nestedFlussRow,
184+
(org.apache.paimon.types.RowType) tableRowType.getField(pos).type());
181185
}
182186
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,4 +545,353 @@ void testEmptyArray() {
545545
assertThat(array).isNotNull();
546546
assertThat(array.size()).isEqualTo(0);
547547
}
548+
549+
@Test
550+
void testSimpleNestedRow() {
551+
int tableBucket = 0;
552+
org.apache.paimon.types.RowType nestedRowType =
553+
org.apache.paimon.types.RowType.builder()
554+
.field("nested_int", new org.apache.paimon.types.IntType())
555+
.field("nested_string", new org.apache.paimon.types.VarCharType())
556+
.build();
557+
558+
RowType tableRowType =
559+
RowType.of(
560+
new org.apache.paimon.types.IntType(),
561+
nestedRowType,
562+
// system columns
563+
new org.apache.paimon.types.IntType(),
564+
new org.apache.paimon.types.BigIntType(),
565+
new org.apache.paimon.types.LocalZonedTimestampType(3));
566+
567+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
568+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
569+
long logOffset = 0;
570+
long timeStamp = System.currentTimeMillis();
571+
GenericRow genericRow = new GenericRow(2);
572+
genericRow.setField(0, 42);
573+
574+
GenericRow nestedRow = new GenericRow(2);
575+
nestedRow.setField(0, 100);
576+
nestedRow.setField(1, BinaryString.fromString("nested_value"));
577+
genericRow.setField(1, nestedRow);
578+
579+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
580+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
581+
582+
assertThat(flussRecordAsPaimonRow.getInt(0)).isEqualTo(42);
583+
org.apache.paimon.data.InternalRow paimonNestedRow = flussRecordAsPaimonRow.getRow(1, 2);
584+
assertThat(paimonNestedRow).isNotNull();
585+
assertThat(paimonNestedRow.getInt(0)).isEqualTo(100);
586+
assertThat(paimonNestedRow.getString(1).toString()).isEqualTo("nested_value");
587+
}
588+
589+
@Test
590+
void testDeeplyNestedRow() {
591+
int tableBucket = 0;
592+
org.apache.paimon.types.RowType innerRowType =
593+
org.apache.paimon.types.RowType.builder()
594+
.field("inner_value", new org.apache.paimon.types.IntType())
595+
.build();
596+
597+
org.apache.paimon.types.RowType middleRowType =
598+
org.apache.paimon.types.RowType.builder()
599+
.field("middle_int", new org.apache.paimon.types.IntType())
600+
.field("inner_row", innerRowType)
601+
.build();
602+
603+
RowType tableRowType =
604+
RowType.of(
605+
middleRowType,
606+
// system columns
607+
new org.apache.paimon.types.IntType(),
608+
new org.apache.paimon.types.BigIntType(),
609+
new org.apache.paimon.types.LocalZonedTimestampType(3));
610+
611+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
612+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
613+
long logOffset = 0;
614+
long timeStamp = System.currentTimeMillis();
615+
GenericRow genericRow = new GenericRow(1);
616+
617+
GenericRow innerRow = new GenericRow(1);
618+
innerRow.setField(0, 999);
619+
620+
GenericRow middleRow = new GenericRow(2);
621+
middleRow.setField(0, 500);
622+
middleRow.setField(1, innerRow);
623+
624+
genericRow.setField(0, middleRow);
625+
626+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
627+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
628+
629+
org.apache.paimon.data.InternalRow paimonMiddleRow = flussRecordAsPaimonRow.getRow(0, 2);
630+
assertThat(paimonMiddleRow).isNotNull();
631+
assertThat(paimonMiddleRow.getInt(0)).isEqualTo(500);
632+
633+
org.apache.paimon.data.InternalRow paimonInnerRow = paimonMiddleRow.getRow(1, 1);
634+
assertThat(paimonInnerRow).isNotNull();
635+
assertThat(paimonInnerRow.getInt(0)).isEqualTo(999);
636+
}
637+
638+
@Test
639+
void testArrayOfNestedRows() {
640+
int tableBucket = 0;
641+
org.apache.paimon.types.RowType nestedRowType =
642+
org.apache.paimon.types.RowType.builder()
643+
.field("id", new org.apache.paimon.types.IntType())
644+
.field("name", new org.apache.paimon.types.VarCharType())
645+
.build();
646+
647+
RowType tableRowType =
648+
RowType.of(
649+
new org.apache.paimon.types.ArrayType(nestedRowType),
650+
// system columns
651+
new org.apache.paimon.types.IntType(),
652+
new org.apache.paimon.types.BigIntType(),
653+
new org.apache.paimon.types.LocalZonedTimestampType(3));
654+
655+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
656+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
657+
long logOffset = 0;
658+
long timeStamp = System.currentTimeMillis();
659+
GenericRow genericRow = new GenericRow(1);
660+
661+
GenericRow row1 = new GenericRow(2);
662+
row1.setField(0, 1);
663+
row1.setField(1, BinaryString.fromString("Alice"));
664+
665+
GenericRow row2 = new GenericRow(2);
666+
row2.setField(0, 2);
667+
row2.setField(1, BinaryString.fromString("Bob"));
668+
669+
genericRow.setField(0, new GenericArray(new Object[] {row1, row2}));
670+
671+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
672+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
673+
674+
org.apache.paimon.data.InternalArray array = flussRecordAsPaimonRow.getArray(0);
675+
assertThat(array).isNotNull();
676+
assertThat(array.size()).isEqualTo(2);
677+
678+
org.apache.paimon.data.InternalRow paimonRow1 = array.getRow(0, 2);
679+
assertThat(paimonRow1.getInt(0)).isEqualTo(1);
680+
assertThat(paimonRow1.getString(1).toString()).isEqualTo("Alice");
681+
682+
org.apache.paimon.data.InternalRow paimonRow2 = array.getRow(1, 2);
683+
assertThat(paimonRow2.getInt(0)).isEqualTo(2);
684+
assertThat(paimonRow2.getString(1).toString()).isEqualTo("Bob");
685+
}
686+
687+
@Test
688+
void testNestedRowWithArrayField() {
689+
int tableBucket = 0;
690+
org.apache.paimon.types.RowType nestedRowType =
691+
org.apache.paimon.types.RowType.builder()
692+
.field("id", new org.apache.paimon.types.IntType())
693+
.field(
694+
"values",
695+
new org.apache.paimon.types.ArrayType(
696+
new org.apache.paimon.types.IntType()))
697+
.build();
698+
699+
RowType tableRowType =
700+
RowType.of(
701+
nestedRowType,
702+
// system columns
703+
new org.apache.paimon.types.IntType(),
704+
new org.apache.paimon.types.BigIntType(),
705+
new org.apache.paimon.types.LocalZonedTimestampType(3));
706+
707+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
708+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
709+
long logOffset = 0;
710+
long timeStamp = System.currentTimeMillis();
711+
GenericRow genericRow = new GenericRow(1);
712+
713+
GenericRow nestedRow = new GenericRow(2);
714+
nestedRow.setField(0, 123);
715+
nestedRow.setField(1, new GenericArray(new int[] {10, 20, 30}));
716+
717+
genericRow.setField(0, nestedRow);
718+
719+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
720+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
721+
722+
org.apache.paimon.data.InternalRow paimonNestedRow = flussRecordAsPaimonRow.getRow(0, 2);
723+
assertThat(paimonNestedRow).isNotNull();
724+
assertThat(paimonNestedRow.getInt(0)).isEqualTo(123);
725+
726+
org.apache.paimon.data.InternalArray array = paimonNestedRow.getArray(1);
727+
assertThat(array).isNotNull();
728+
assertThat(array.size()).isEqualTo(3);
729+
assertThat(array.getInt(0)).isEqualTo(10);
730+
assertThat(array.getInt(1)).isEqualTo(20);
731+
assertThat(array.getInt(2)).isEqualTo(30);
732+
}
733+
734+
@Test
735+
void testNestedRowWithAllPrimitiveTypes() {
736+
int tableBucket = 0;
737+
org.apache.paimon.types.RowType nestedRowType =
738+
org.apache.paimon.types.RowType.builder()
739+
.field("bool_field", new org.apache.paimon.types.BooleanType())
740+
.field("byte_field", new org.apache.paimon.types.TinyIntType())
741+
.field("short_field", new org.apache.paimon.types.SmallIntType())
742+
.field("int_field", new org.apache.paimon.types.IntType())
743+
.field("long_field", new org.apache.paimon.types.BigIntType())
744+
.field("float_field", new org.apache.paimon.types.FloatType())
745+
.field("double_field", new org.apache.paimon.types.DoubleType())
746+
.field("string_field", new org.apache.paimon.types.VarCharType())
747+
.build();
748+
749+
RowType tableRowType =
750+
RowType.of(
751+
nestedRowType,
752+
// system columns
753+
new org.apache.paimon.types.IntType(),
754+
new org.apache.paimon.types.BigIntType(),
755+
new org.apache.paimon.types.LocalZonedTimestampType(3));
756+
757+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
758+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
759+
long logOffset = 0;
760+
long timeStamp = System.currentTimeMillis();
761+
GenericRow genericRow = new GenericRow(1);
762+
763+
GenericRow nestedRow = new GenericRow(8);
764+
nestedRow.setField(0, true);
765+
nestedRow.setField(1, (byte) 127);
766+
nestedRow.setField(2, (short) 32000);
767+
nestedRow.setField(3, 2147483647);
768+
nestedRow.setField(4, 9223372036854775807L);
769+
nestedRow.setField(5, 3.14f);
770+
nestedRow.setField(6, 2.718281828);
771+
nestedRow.setField(7, BinaryString.fromString("test_string"));
772+
773+
genericRow.setField(0, nestedRow);
774+
775+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
776+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
777+
778+
org.apache.paimon.data.InternalRow paimonNestedRow = flussRecordAsPaimonRow.getRow(0, 8);
779+
assertThat(paimonNestedRow).isNotNull();
780+
assertThat(paimonNestedRow.getBoolean(0)).isTrue();
781+
assertThat(paimonNestedRow.getByte(1)).isEqualTo((byte) 127);
782+
assertThat(paimonNestedRow.getShort(2)).isEqualTo((short) 32000);
783+
assertThat(paimonNestedRow.getInt(3)).isEqualTo(2147483647);
784+
assertThat(paimonNestedRow.getLong(4)).isEqualTo(9223372036854775807L);
785+
assertThat(paimonNestedRow.getFloat(5)).isEqualTo(3.14f);
786+
assertThat(paimonNestedRow.getDouble(6)).isEqualTo(2.718281828);
787+
assertThat(paimonNestedRow.getString(7).toString()).isEqualTo("test_string");
788+
}
789+
790+
@Test
791+
void testNullNestedRow() {
792+
int tableBucket = 0;
793+
org.apache.paimon.types.RowType nestedRowType =
794+
org.apache.paimon.types.RowType.builder()
795+
.field("field1", new org.apache.paimon.types.IntType())
796+
.build();
797+
798+
RowType tableRowType =
799+
RowType.of(
800+
nestedRowType.nullable(),
801+
// system columns
802+
new org.apache.paimon.types.IntType(),
803+
new org.apache.paimon.types.BigIntType(),
804+
new org.apache.paimon.types.LocalZonedTimestampType(3));
805+
806+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
807+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
808+
long logOffset = 0;
809+
long timeStamp = System.currentTimeMillis();
810+
GenericRow genericRow = new GenericRow(1);
811+
genericRow.setField(0, null);
812+
813+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
814+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
815+
816+
assertThat(flussRecordAsPaimonRow.isNullAt(0)).isTrue();
817+
}
818+
819+
@Test
820+
void testNestedRowWithNullableFields() {
821+
int tableBucket = 0;
822+
org.apache.paimon.types.RowType nestedRowType =
823+
org.apache.paimon.types.RowType.builder()
824+
.field("id", new org.apache.paimon.types.IntType())
825+
.field(
826+
"nullable_field",
827+
new org.apache.paimon.types.VarCharType().nullable())
828+
.build();
829+
830+
RowType tableRowType =
831+
RowType.of(
832+
nestedRowType,
833+
// system columns
834+
new org.apache.paimon.types.IntType(),
835+
new org.apache.paimon.types.BigIntType(),
836+
new org.apache.paimon.types.LocalZonedTimestampType(3));
837+
838+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
839+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
840+
long logOffset = 0;
841+
long timeStamp = System.currentTimeMillis();
842+
GenericRow genericRow = new GenericRow(1);
843+
844+
GenericRow nestedRow = new GenericRow(2);
845+
nestedRow.setField(0, 42);
846+
nestedRow.setField(1, null);
847+
848+
genericRow.setField(0, nestedRow);
849+
850+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
851+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
852+
853+
org.apache.paimon.data.InternalRow paimonNestedRow = flussRecordAsPaimonRow.getRow(0, 2);
854+
assertThat(paimonNestedRow).isNotNull();
855+
assertThat(paimonNestedRow.getInt(0)).isEqualTo(42);
856+
assertThat(paimonNestedRow.isNullAt(1)).isTrue();
857+
}
858+
859+
@Test
860+
void testNestedRowWithDecimalAndTimestamp() {
861+
int tableBucket = 0;
862+
org.apache.paimon.types.RowType nestedRowType =
863+
org.apache.paimon.types.RowType.builder()
864+
.field("decimal_field", new org.apache.paimon.types.DecimalType(10, 2))
865+
.field("timestamp_field", new org.apache.paimon.types.TimestampType(3))
866+
.build();
867+
868+
RowType tableRowType =
869+
RowType.of(
870+
nestedRowType,
871+
// system columns
872+
new org.apache.paimon.types.IntType(),
873+
new org.apache.paimon.types.BigIntType(),
874+
new org.apache.paimon.types.LocalZonedTimestampType(3));
875+
876+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
877+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
878+
long logOffset = 0;
879+
long timeStamp = System.currentTimeMillis();
880+
GenericRow genericRow = new GenericRow(1);
881+
882+
GenericRow nestedRow = new GenericRow(2);
883+
nestedRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2));
884+
nestedRow.setField(1, TimestampNtz.fromMillis(1698235273182L));
885+
886+
genericRow.setField(0, nestedRow);
887+
888+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
889+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
890+
891+
org.apache.paimon.data.InternalRow paimonNestedRow = flussRecordAsPaimonRow.getRow(0, 2);
892+
assertThat(paimonNestedRow).isNotNull();
893+
assertThat(paimonNestedRow.getDecimal(0, 10, 2).toBigDecimal())
894+
.isEqualTo(new BigDecimal("123.45"));
895+
assertThat(paimonNestedRow.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235273182L);
896+
}
548897
}

0 commit comments

Comments
 (0)