Skip to content

Commit 8280f47

Browse files
[FLUSS-2251] Support NestedRow types for paimon
- Created FlussNestedRowAsPaimonRow adapter class to convert Fluss nested rows to Paimon nested rows - Implemented getRow method in FlussRowAsPaimonRow to support nested row fields in tables - Implemented getRow method in FlussArrayAsPaimonArray to support arrays of nested rows - Added comprehensive test cases covering: * Simple nested rows with primitive types * Deeply nested rows (row within row) * Arrays of nested rows * Nested rows with array fields * Nested rows with all primitive types * Null nested rows * Nested rows with nullable fields * Nested rows with decimal and timestamp fields
1 parent 3be2f3d commit 8280f47

File tree

5 files changed

+450
-5
lines changed

5 files changed

+450
-5
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,11 @@ public InternalMap getMap(int pos) {
166166

167167
@Override
168168
public InternalRow getRow(int pos, int numFields) {
169-
throw new UnsupportedOperationException(
170-
"getRow is not supported for Fluss array currently.");
169+
org.apache.fluss.row.InternalRow nestedFlussRow = flussArray.getRow(pos, numFields);
170+
return nestedFlussRow == null
171+
? null
172+
: new FlussRowAsPaimonRow(
173+
nestedFlussRow, (org.apache.paimon.types.RowType) elementType);
171174
}
172175

173176
@Override

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,12 @@ public InternalMap getMap(int pos) {
175175
}
176176

177177
@Override
178-
public InternalRow getRow(int pos, int pos1) {
179-
throw new UnsupportedOperationException(
180-
"getRow is not support for Fluss record currently.");
178+
public InternalRow getRow(int pos, int numFields) {
179+
org.apache.fluss.row.InternalRow nestedFlussRow = internalRow.getRow(pos, numFields);
180+
return nestedFlussRow == null
181+
? null
182+
: new FlussRowAsPaimonRow(
183+
nestedFlussRow,
184+
(org.apache.paimon.types.RowType) tableRowType.getField(pos).type());
181185
}
182186
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,4 +545,220 @@ void testEmptyArray() {
545545
assertThat(array).isNotNull();
546546
assertThat(array.size()).isEqualTo(0);
547547
}
548+
549+
@Test
550+
void testNestedRowType() {
551+
int tableBucket = 0;
552+
553+
// Build nested row types
554+
org.apache.paimon.types.RowType simpleNestedRowType =
555+
org.apache.paimon.types.RowType.builder()
556+
.field("nested_int", new org.apache.paimon.types.IntType())
557+
.field("nested_string", new org.apache.paimon.types.VarCharType())
558+
.build();
559+
560+
org.apache.paimon.types.RowType allTypesNestedRowType =
561+
org.apache.paimon.types.RowType.builder()
562+
.field("bool_field", new org.apache.paimon.types.BooleanType())
563+
.field("byte_field", new org.apache.paimon.types.TinyIntType())
564+
.field("short_field", new org.apache.paimon.types.SmallIntType())
565+
.field("int_field", new org.apache.paimon.types.IntType())
566+
.field("long_field", new org.apache.paimon.types.BigIntType())
567+
.field("float_field", new org.apache.paimon.types.FloatType())
568+
.field("double_field", new org.apache.paimon.types.DoubleType())
569+
.field("string_field", new org.apache.paimon.types.VarCharType())
570+
.build();
571+
572+
org.apache.paimon.types.RowType decimalTimestampRowType =
573+
org.apache.paimon.types.RowType.builder()
574+
.field("decimal_field", new org.apache.paimon.types.DecimalType(10, 2))
575+
.field("timestamp_field", new org.apache.paimon.types.TimestampType(3))
576+
.build();
577+
578+
org.apache.paimon.types.RowType innerRowType =
579+
org.apache.paimon.types.RowType.builder()
580+
.field("inner_value", new org.apache.paimon.types.IntType())
581+
.build();
582+
583+
org.apache.paimon.types.RowType middleRowType =
584+
org.apache.paimon.types.RowType.builder()
585+
.field("middle_int", new org.apache.paimon.types.IntType())
586+
.field("inner_row", innerRowType)
587+
.build();
588+
589+
org.apache.paimon.types.RowType rowWithArrayType =
590+
org.apache.paimon.types.RowType.builder()
591+
.field("id", new org.apache.paimon.types.IntType())
592+
.field(
593+
"values",
594+
new org.apache.paimon.types.ArrayType(
595+
new org.apache.paimon.types.IntType()))
596+
.build();
597+
598+
org.apache.paimon.types.RowType arrayElementRowType =
599+
org.apache.paimon.types.RowType.builder()
600+
.field("id", new org.apache.paimon.types.IntType())
601+
.field("name", new org.apache.paimon.types.VarCharType())
602+
.build();
603+
604+
org.apache.paimon.types.RowType nullableFieldsRowType =
605+
org.apache.paimon.types.RowType.builder()
606+
.field("id", new org.apache.paimon.types.IntType())
607+
.field(
608+
"nullable_field",
609+
new org.apache.paimon.types.VarCharType().nullable())
610+
.build();
611+
612+
RowType tableRowType =
613+
RowType.of(
614+
simpleNestedRowType,
615+
allTypesNestedRowType,
616+
decimalTimestampRowType,
617+
middleRowType,
618+
rowWithArrayType,
619+
new org.apache.paimon.types.ArrayType(arrayElementRowType),
620+
simpleNestedRowType.nullable(),
621+
nullableFieldsRowType,
622+
// system columns
623+
new org.apache.paimon.types.IntType(),
624+
new org.apache.paimon.types.BigIntType(),
625+
new org.apache.paimon.types.LocalZonedTimestampType(3));
626+
627+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
628+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
629+
long logOffset = 0;
630+
long timeStamp = System.currentTimeMillis();
631+
GenericRow genericRow = new GenericRow(8);
632+
633+
// Simple nested row
634+
GenericRow simpleNestedRow = new GenericRow(2);
635+
simpleNestedRow.setField(0, 100);
636+
simpleNestedRow.setField(1, BinaryString.fromString("nested_value"));
637+
genericRow.setField(0, simpleNestedRow);
638+
639+
// Nested row with all primitive types
640+
GenericRow allTypesNestedRow = new GenericRow(8);
641+
allTypesNestedRow.setField(0, true);
642+
allTypesNestedRow.setField(1, (byte) 127);
643+
allTypesNestedRow.setField(2, (short) 32000);
644+
allTypesNestedRow.setField(3, 2147483647);
645+
allTypesNestedRow.setField(4, 9223372036854775807L);
646+
allTypesNestedRow.setField(5, 3.14f);
647+
allTypesNestedRow.setField(6, 2.718281828);
648+
allTypesNestedRow.setField(7, BinaryString.fromString("test_string"));
649+
genericRow.setField(1, allTypesNestedRow);
650+
651+
// Nested row with decimal and timestamp
652+
GenericRow decimalTimestampRow = new GenericRow(2);
653+
decimalTimestampRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2));
654+
decimalTimestampRow.setField(1, TimestampNtz.fromMillis(1698235273182L));
655+
genericRow.setField(2, decimalTimestampRow);
656+
657+
// Deeply nested row
658+
GenericRow innerRow = new GenericRow(1);
659+
innerRow.setField(0, 999);
660+
GenericRow middleRow = new GenericRow(2);
661+
middleRow.setField(0, 500);
662+
middleRow.setField(1, innerRow);
663+
genericRow.setField(3, middleRow);
664+
665+
// Nested row with array field
666+
GenericRow nestedRowWithArray = new GenericRow(2);
667+
nestedRowWithArray.setField(0, 123);
668+
nestedRowWithArray.setField(1, new GenericArray(new int[] {10, 20, 30}));
669+
genericRow.setField(4, nestedRowWithArray);
670+
671+
// Array of nested rows
672+
GenericRow arrayRow1 = new GenericRow(2);
673+
arrayRow1.setField(0, 1);
674+
arrayRow1.setField(1, BinaryString.fromString("Alice"));
675+
GenericRow arrayRow2 = new GenericRow(2);
676+
arrayRow2.setField(0, 2);
677+
arrayRow2.setField(1, BinaryString.fromString("Bob"));
678+
genericRow.setField(5, new GenericArray(new Object[] {arrayRow1, arrayRow2}));
679+
680+
// Null nested row
681+
genericRow.setField(6, null);
682+
683+
// Nested row with nullable fields
684+
GenericRow nullableFieldsRow = new GenericRow(2);
685+
nullableFieldsRow.setField(0, 42);
686+
nullableFieldsRow.setField(1, null);
687+
genericRow.setField(7, nullableFieldsRow);
688+
689+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
690+
flussRecordAsPaimonRow.setFlussRecord(logRecord);
691+
692+
// Test simple nested row
693+
org.apache.paimon.data.InternalRow paimonSimpleRow = flussRecordAsPaimonRow.getRow(0, 2);
694+
assertThat(paimonSimpleRow).isNotNull();
695+
assertThat(paimonSimpleRow.getInt(0)).isEqualTo(100);
696+
assertThat(paimonSimpleRow.getString(1).toString()).isEqualTo("nested_value");
697+
698+
// Test nested row with all primitive types
699+
org.apache.paimon.data.InternalRow paimonAllTypesRow = flussRecordAsPaimonRow.getRow(1, 8);
700+
assertThat(paimonAllTypesRow).isNotNull();
701+
assertThat(paimonAllTypesRow.getBoolean(0)).isTrue();
702+
assertThat(paimonAllTypesRow.getByte(1)).isEqualTo((byte) 127);
703+
assertThat(paimonAllTypesRow.getShort(2)).isEqualTo((short) 32000);
704+
assertThat(paimonAllTypesRow.getInt(3)).isEqualTo(2147483647);
705+
assertThat(paimonAllTypesRow.getLong(4)).isEqualTo(9223372036854775807L);
706+
assertThat(paimonAllTypesRow.getFloat(5)).isEqualTo(3.14f);
707+
assertThat(paimonAllTypesRow.getDouble(6)).isEqualTo(2.718281828);
708+
assertThat(paimonAllTypesRow.getString(7).toString()).isEqualTo("test_string");
709+
710+
// Test nested row with decimal and timestamp
711+
org.apache.paimon.data.InternalRow paimonDecimalTimestampRow =
712+
flussRecordAsPaimonRow.getRow(2, 2);
713+
assertThat(paimonDecimalTimestampRow).isNotNull();
714+
assertThat(paimonDecimalTimestampRow.getDecimal(0, 10, 2).toBigDecimal())
715+
.isEqualTo(new BigDecimal("123.45"));
716+
assertThat(paimonDecimalTimestampRow.getTimestamp(1, 3).getMillisecond())
717+
.isEqualTo(1698235273182L);
718+
719+
// Test deeply nested row
720+
org.apache.paimon.data.InternalRow paimonMiddleRow = flussRecordAsPaimonRow.getRow(3, 2);
721+
assertThat(paimonMiddleRow).isNotNull();
722+
assertThat(paimonMiddleRow.getInt(0)).isEqualTo(500);
723+
org.apache.paimon.data.InternalRow paimonInnerRow = paimonMiddleRow.getRow(1, 1);
724+
assertThat(paimonInnerRow).isNotNull();
725+
assertThat(paimonInnerRow.getInt(0)).isEqualTo(999);
726+
727+
// Test nested row with array field
728+
org.apache.paimon.data.InternalRow paimonRowWithArray = flussRecordAsPaimonRow.getRow(4, 2);
729+
assertThat(paimonRowWithArray).isNotNull();
730+
assertThat(paimonRowWithArray.getInt(0)).isEqualTo(123);
731+
org.apache.paimon.data.InternalArray arrayInRow = paimonRowWithArray.getArray(1);
732+
assertThat(arrayInRow).isNotNull();
733+
assertThat(arrayInRow.size()).isEqualTo(3);
734+
assertThat(arrayInRow.getInt(0)).isEqualTo(10);
735+
assertThat(arrayInRow.getInt(1)).isEqualTo(20);
736+
assertThat(arrayInRow.getInt(2)).isEqualTo(30);
737+
738+
// Test array of nested rows
739+
org.apache.paimon.data.InternalArray arrayOfRows = flussRecordAsPaimonRow.getArray(5);
740+
assertThat(arrayOfRows).isNotNull();
741+
assertThat(arrayOfRows.size()).isEqualTo(2);
742+
org.apache.paimon.data.InternalRow paimonRow1 = arrayOfRows.getRow(0, 2);
743+
assertThat(paimonRow1.getInt(0)).isEqualTo(1);
744+
assertThat(paimonRow1.getString(1).toString()).isEqualTo("Alice");
745+
org.apache.paimon.data.InternalRow paimonRow2 = arrayOfRows.getRow(1, 2);
746+
assertThat(paimonRow2.getInt(0)).isEqualTo(2);
747+
assertThat(paimonRow2.getString(1).toString()).isEqualTo("Bob");
748+
749+
// Test null nested row
750+
assertThat(flussRecordAsPaimonRow.isNullAt(6)).isTrue();
751+
752+
// Test nested row with nullable fields
753+
org.apache.paimon.data.InternalRow paimonNullableFieldsRow =
754+
flussRecordAsPaimonRow.getRow(7, 2);
755+
assertThat(paimonNullableFieldsRow).isNotNull();
756+
assertThat(paimonNullableFieldsRow.getInt(0)).isEqualTo(42);
757+
assertThat(paimonNullableFieldsRow.isNullAt(1)).isTrue();
758+
759+
// Verify system columns
760+
assertThat(flussRecordAsPaimonRow.getInt(8)).isEqualTo(tableBucket);
761+
assertThat(flussRecordAsPaimonRow.getLong(9)).isEqualTo(logOffset);
762+
assertThat(flussRecordAsPaimonRow.getLong(10)).isEqualTo(timeStamp);
763+
}
548764
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,93 @@ private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
489489
return "[" + String.join(",", partitionOffsetStrs) + "]";
490490
}
491491

492+
private static Stream<Arguments> tieringNestedRowWriteArgs() {
493+
return Stream.of(Arguments.of(true), Arguments.of(false));
494+
}
495+
496+
@ParameterizedTest
497+
@MethodSource("tieringNestedRowWriteArgs")
498+
void testTieringForNestedRow(boolean isPrimaryKeyTable) throws Exception {
499+
TablePath t1 =
500+
TablePath.of(
501+
DEFAULT_DB,
502+
isPrimaryKeyTable ? "pkTableForNestedRow" : "logTableForNestedRow");
503+
504+
Schema.Builder builder =
505+
Schema.newBuilder()
506+
.column("c0", DataTypes.INT())
507+
.column("c1", DataTypes.STRING())
508+
.column(
509+
"c2",
510+
DataTypes.ROW(
511+
DataTypes.FIELD("nested_int", DataTypes.INT()),
512+
DataTypes.FIELD("nested_string", DataTypes.STRING()),
513+
DataTypes.FIELD("nested_double", DataTypes.DOUBLE())));
514+
515+
if (isPrimaryKeyTable) {
516+
builder.primaryKey("c0", "c1");
517+
}
518+
519+
TableDescriptor.Builder tableDescriptor =
520+
TableDescriptor.builder()
521+
.schema(builder.build())
522+
.distributedBy(1, "c0")
523+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
524+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
525+
tableDescriptor.customProperties(Collections.emptyMap());
526+
tableDescriptor.properties(Collections.emptyMap());
527+
long t1Id = createTable(t1, tableDescriptor.build());
528+
TableBucket t1Bucket = new TableBucket(t1Id, 0);
529+
530+
List<InternalRow> rows =
531+
Collections.singletonList(
532+
row(
533+
builder.build().getRowType(),
534+
1,
535+
"outer_value",
536+
new Object[] {100, "nested_value", 3.14}));
537+
writeRows(t1, rows, !isPrimaryKeyTable);
538+
539+
if (isPrimaryKeyTable) {
540+
waitUntilSnapshot(t1Id, 1, 0);
541+
}
542+
543+
JobClient jobClient = buildTieringJob(execEnv);
544+
545+
try {
546+
assertReplicaStatus(t1Bucket, 1);
547+
548+
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
549+
getPaimonRowCloseableIterator(t1);
550+
for (InternalRow expectedRow : rows) {
551+
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
552+
assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
553+
assertThat(row.getString(1).toString())
554+
.isEqualTo(expectedRow.getString(1).toString());
555+
556+
org.apache.paimon.data.InternalRow paimonNestedRow = row.getRow(2, 3);
557+
assertThat(paimonNestedRow).isNotNull();
558+
org.apache.fluss.row.InternalRow flussNestedRow = expectedRow.getRow(2, 3);
559+
assertThat(paimonNestedRow.getInt(0)).isEqualTo(flussNestedRow.getInt(0));
560+
assertThat(paimonNestedRow.getString(1).toString())
561+
.isEqualTo(flussNestedRow.getString(1).toString());
562+
assertThat(paimonNestedRow.getDouble(2)).isEqualTo(flussNestedRow.getDouble(2));
563+
}
564+
565+
Map<String, String> properties =
566+
new HashMap<>() {
567+
{
568+
put(
569+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
570+
"[{\"bucket\":0,\"offset\":1}]");
571+
}
572+
};
573+
checkSnapshotPropertyInPaimon(t1, properties);
574+
} finally {
575+
jobClient.cancel().get();
576+
}
577+
}
578+
492579
@Test
493580
void testTieringToDvEnabledTable() throws Exception {
494581
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");

0 commit comments

Comments
 (0)