Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,11 @@ public InternalMap getMap(int pos) {

@Override
public InternalRow getRow(int pos, int numFields) {
throw new UnsupportedOperationException(
"getRow is not supported for Fluss array currently.");
org.apache.fluss.row.InternalRow nestedFlussRow = flussArray.getRow(pos, numFields);
return nestedFlussRow == null
? null
: new FlussRowAsPaimonRow(
nestedFlussRow, (org.apache.paimon.types.RowType) elementType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,12 @@ public InternalMap getMap(int pos) {
}

@Override
public InternalRow getRow(int pos, int pos1) {
throw new UnsupportedOperationException(
"getRow is not support for Fluss record currently.");
public InternalRow getRow(int pos, int numFields) {
org.apache.fluss.row.InternalRow nestedFlussRow = internalRow.getRow(pos, numFields);
return nestedFlussRow == null
? null
: new FlussRowAsPaimonRow(
nestedFlussRow,
(org.apache.paimon.types.RowType) tableRowType.getField(pos).type());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,4 +545,220 @@ void testEmptyArray() {
assertThat(array).isNotNull();
assertThat(array.size()).isEqualTo(0);
}

@Test
void testNestedRowType() {
int tableBucket = 0;

// Build nested row types
org.apache.paimon.types.RowType simpleNestedRowType =
org.apache.paimon.types.RowType.builder()
.field("nested_int", new org.apache.paimon.types.IntType())
.field("nested_string", new org.apache.paimon.types.VarCharType())
.build();

org.apache.paimon.types.RowType allTypesNestedRowType =
org.apache.paimon.types.RowType.builder()
.field("bool_field", new org.apache.paimon.types.BooleanType())
.field("byte_field", new org.apache.paimon.types.TinyIntType())
.field("short_field", new org.apache.paimon.types.SmallIntType())
.field("int_field", new org.apache.paimon.types.IntType())
.field("long_field", new org.apache.paimon.types.BigIntType())
.field("float_field", new org.apache.paimon.types.FloatType())
.field("double_field", new org.apache.paimon.types.DoubleType())
.field("string_field", new org.apache.paimon.types.VarCharType())
.build();

org.apache.paimon.types.RowType decimalTimestampRowType =
org.apache.paimon.types.RowType.builder()
.field("decimal_field", new org.apache.paimon.types.DecimalType(10, 2))
.field("timestamp_field", new org.apache.paimon.types.TimestampType(3))
.build();

org.apache.paimon.types.RowType innerRowType =
org.apache.paimon.types.RowType.builder()
.field("inner_value", new org.apache.paimon.types.IntType())
.build();

org.apache.paimon.types.RowType middleRowType =
org.apache.paimon.types.RowType.builder()
.field("middle_int", new org.apache.paimon.types.IntType())
.field("inner_row", innerRowType)
.build();

org.apache.paimon.types.RowType rowWithArrayType =
org.apache.paimon.types.RowType.builder()
.field("id", new org.apache.paimon.types.IntType())
.field(
"values",
new org.apache.paimon.types.ArrayType(
new org.apache.paimon.types.IntType()))
.build();

org.apache.paimon.types.RowType arrayElementRowType =
org.apache.paimon.types.RowType.builder()
.field("id", new org.apache.paimon.types.IntType())
.field("name", new org.apache.paimon.types.VarCharType())
.build();

org.apache.paimon.types.RowType nullableFieldsRowType =
org.apache.paimon.types.RowType.builder()
.field("id", new org.apache.paimon.types.IntType())
.field(
"nullable_field",
new org.apache.paimon.types.VarCharType().nullable())
.build();

RowType tableRowType =
RowType.of(
simpleNestedRowType,
allTypesNestedRowType,
decimalTimestampRowType,
middleRowType,
rowWithArrayType,
new org.apache.paimon.types.ArrayType(arrayElementRowType),
simpleNestedRowType.nullable(),
nullableFieldsRowType,
// system columns
new org.apache.paimon.types.IntType(),
new org.apache.paimon.types.BigIntType(),
new org.apache.paimon.types.LocalZonedTimestampType(3));

FlussRecordAsPaimonRow flussRecordAsPaimonRow =
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
long logOffset = 0;
long timeStamp = System.currentTimeMillis();
GenericRow genericRow = new GenericRow(8);

// Simple nested row
GenericRow simpleNestedRow = new GenericRow(2);
simpleNestedRow.setField(0, 100);
simpleNestedRow.setField(1, BinaryString.fromString("nested_value"));
genericRow.setField(0, simpleNestedRow);

// Nested row with all primitive types
GenericRow allTypesNestedRow = new GenericRow(8);
allTypesNestedRow.setField(0, true);
allTypesNestedRow.setField(1, (byte) 127);
allTypesNestedRow.setField(2, (short) 32000);
allTypesNestedRow.setField(3, 2147483647);
allTypesNestedRow.setField(4, 9223372036854775807L);
allTypesNestedRow.setField(5, 3.14f);
allTypesNestedRow.setField(6, 2.718281828);
allTypesNestedRow.setField(7, BinaryString.fromString("test_string"));
genericRow.setField(1, allTypesNestedRow);

// Nested row with decimal and timestamp
GenericRow decimalTimestampRow = new GenericRow(2);
decimalTimestampRow.setField(0, Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2));
decimalTimestampRow.setField(1, TimestampNtz.fromMillis(1698235273182L));
genericRow.setField(2, decimalTimestampRow);

// Deeply nested row
GenericRow innerRow = new GenericRow(1);
innerRow.setField(0, 999);
GenericRow middleRow = new GenericRow(2);
middleRow.setField(0, 500);
middleRow.setField(1, innerRow);
genericRow.setField(3, middleRow);

// Nested row with array field
GenericRow nestedRowWithArray = new GenericRow(2);
nestedRowWithArray.setField(0, 123);
nestedRowWithArray.setField(1, new GenericArray(new int[] {10, 20, 30}));
genericRow.setField(4, nestedRowWithArray);

// Array of nested rows
GenericRow arrayRow1 = new GenericRow(2);
arrayRow1.setField(0, 1);
arrayRow1.setField(1, BinaryString.fromString("Alice"));
GenericRow arrayRow2 = new GenericRow(2);
arrayRow2.setField(0, 2);
arrayRow2.setField(1, BinaryString.fromString("Bob"));
genericRow.setField(5, new GenericArray(new Object[] {arrayRow1, arrayRow2}));

// Null nested row
genericRow.setField(6, null);

// Nested row with nullable fields
GenericRow nullableFieldsRow = new GenericRow(2);
nullableFieldsRow.setField(0, 42);
nullableFieldsRow.setField(1, null);
genericRow.setField(7, nullableFieldsRow);

LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
flussRecordAsPaimonRow.setFlussRecord(logRecord);

// Test simple nested row
org.apache.paimon.data.InternalRow paimonSimpleRow = flussRecordAsPaimonRow.getRow(0, 2);
assertThat(paimonSimpleRow).isNotNull();
assertThat(paimonSimpleRow.getInt(0)).isEqualTo(100);
assertThat(paimonSimpleRow.getString(1).toString()).isEqualTo("nested_value");

// Test nested row with all primitive types
org.apache.paimon.data.InternalRow paimonAllTypesRow = flussRecordAsPaimonRow.getRow(1, 8);
assertThat(paimonAllTypesRow).isNotNull();
assertThat(paimonAllTypesRow.getBoolean(0)).isTrue();
assertThat(paimonAllTypesRow.getByte(1)).isEqualTo((byte) 127);
assertThat(paimonAllTypesRow.getShort(2)).isEqualTo((short) 32000);
assertThat(paimonAllTypesRow.getInt(3)).isEqualTo(2147483647);
assertThat(paimonAllTypesRow.getLong(4)).isEqualTo(9223372036854775807L);
assertThat(paimonAllTypesRow.getFloat(5)).isEqualTo(3.14f);
assertThat(paimonAllTypesRow.getDouble(6)).isEqualTo(2.718281828);
assertThat(paimonAllTypesRow.getString(7).toString()).isEqualTo("test_string");

// Test nested row with decimal and timestamp
org.apache.paimon.data.InternalRow paimonDecimalTimestampRow =
flussRecordAsPaimonRow.getRow(2, 2);
assertThat(paimonDecimalTimestampRow).isNotNull();
assertThat(paimonDecimalTimestampRow.getDecimal(0, 10, 2).toBigDecimal())
.isEqualTo(new BigDecimal("123.45"));
assertThat(paimonDecimalTimestampRow.getTimestamp(1, 3).getMillisecond())
.isEqualTo(1698235273182L);

// Test deeply nested row
org.apache.paimon.data.InternalRow paimonMiddleRow = flussRecordAsPaimonRow.getRow(3, 2);
assertThat(paimonMiddleRow).isNotNull();
assertThat(paimonMiddleRow.getInt(0)).isEqualTo(500);
org.apache.paimon.data.InternalRow paimonInnerRow = paimonMiddleRow.getRow(1, 1);
assertThat(paimonInnerRow).isNotNull();
assertThat(paimonInnerRow.getInt(0)).isEqualTo(999);

// Test nested row with array field
org.apache.paimon.data.InternalRow paimonRowWithArray = flussRecordAsPaimonRow.getRow(4, 2);
assertThat(paimonRowWithArray).isNotNull();
assertThat(paimonRowWithArray.getInt(0)).isEqualTo(123);
org.apache.paimon.data.InternalArray arrayInRow = paimonRowWithArray.getArray(1);
assertThat(arrayInRow).isNotNull();
assertThat(arrayInRow.size()).isEqualTo(3);
assertThat(arrayInRow.getInt(0)).isEqualTo(10);
assertThat(arrayInRow.getInt(1)).isEqualTo(20);
assertThat(arrayInRow.getInt(2)).isEqualTo(30);

// Test array of nested rows
org.apache.paimon.data.InternalArray arrayOfRows = flussRecordAsPaimonRow.getArray(5);
assertThat(arrayOfRows).isNotNull();
assertThat(arrayOfRows.size()).isEqualTo(2);
org.apache.paimon.data.InternalRow paimonRow1 = arrayOfRows.getRow(0, 2);
assertThat(paimonRow1.getInt(0)).isEqualTo(1);
assertThat(paimonRow1.getString(1).toString()).isEqualTo("Alice");
org.apache.paimon.data.InternalRow paimonRow2 = arrayOfRows.getRow(1, 2);
assertThat(paimonRow2.getInt(0)).isEqualTo(2);
assertThat(paimonRow2.getString(1).toString()).isEqualTo("Bob");

// Test null nested row
assertThat(flussRecordAsPaimonRow.isNullAt(6)).isTrue();

// Test nested row with nullable fields
org.apache.paimon.data.InternalRow paimonNullableFieldsRow =
flussRecordAsPaimonRow.getRow(7, 2);
assertThat(paimonNullableFieldsRow).isNotNull();
assertThat(paimonNullableFieldsRow.getInt(0)).isEqualTo(42);
assertThat(paimonNullableFieldsRow.isNullAt(1)).isTrue();

// Verify system columns
assertThat(flussRecordAsPaimonRow.getInt(8)).isEqualTo(tableBucket);
assertThat(flussRecordAsPaimonRow.getLong(9)).isEqualTo(logOffset);
assertThat(flussRecordAsPaimonRow.getLong(10)).isEqualTo(timeStamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,93 @@ private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
return "[" + String.join(",", partitionOffsetStrs) + "]";
}

private static Stream<Arguments> tieringNestedRowWriteArgs() {
return Stream.of(Arguments.of(true), Arguments.of(false));
}

@ParameterizedTest
@MethodSource("tieringNestedRowWriteArgs")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace by @ValueSource(booleans = {true, false}) and remove tieringNestedRowWriteArgs method.

void testTieringForNestedRow(boolean isPrimaryKeyTable) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IT cases are already quite heavy. This PR adds two new IT cases, but we should avoid introducing additional ones if we can reuse existing tests instead.

In particular, it would be more efficient to add the new row-based test scenarios directly into FlinkUnionReadPrimaryKeyTableITCase by simply introducing a new field. This approach adds minimal overhead to test execution time while keeping the test suite lean.

For reference, see how this was done in: #2166.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IT cases are already quite heavy. This PR adds two new IT cases, but we should avoid introducing additional ones if we can reuse existing tests instead.

In particular, it would be more efficient to add the new row-based test scenarios directly into FlinkUnionReadPrimaryKeyTableITCase by simply introducing a new field. This approach adds minimal overhead to test execution time while keeping the test suite lean.

For reference, see how this was done in: #2166.

Okay, I'll make the changes.

TablePath t1 =
TablePath.of(
DEFAULT_DB,
isPrimaryKeyTable ? "pkTableForNestedRow" : "logTableForNestedRow");

Schema.Builder builder =
Schema.newBuilder()
.column("c0", DataTypes.INT())
.column("c1", DataTypes.STRING())
.column(
"c2",
DataTypes.ROW(
DataTypes.FIELD("nested_int", DataTypes.INT()),
DataTypes.FIELD("nested_string", DataTypes.STRING()),
DataTypes.FIELD("nested_double", DataTypes.DOUBLE())));

if (isPrimaryKeyTable) {
builder.primaryKey("c0", "c1");
}

TableDescriptor.Builder tableDescriptor =
TableDescriptor.builder()
.schema(builder.build())
.distributedBy(1, "c0")
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
tableDescriptor.customProperties(Collections.emptyMap());
tableDescriptor.properties(Collections.emptyMap());
long t1Id = createTable(t1, tableDescriptor.build());
TableBucket t1Bucket = new TableBucket(t1Id, 0);

List<InternalRow> rows =
Collections.singletonList(
row(
builder.build().getRowType(),
1,
"outer_value",
new Object[] {100, "nested_value", 3.14}));
writeRows(t1, rows, !isPrimaryKeyTable);

if (isPrimaryKeyTable) {
waitUntilSnapshot(t1Id, 1, 0);
}

JobClient jobClient = buildTieringJob(execEnv);

try {
assertReplicaStatus(t1Bucket, 1);

Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(t1);
for (InternalRow expectedRow : rows) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
assertThat(row.getString(1).toString())
.isEqualTo(expectedRow.getString(1).toString());

org.apache.paimon.data.InternalRow paimonNestedRow = row.getRow(2, 3);
assertThat(paimonNestedRow).isNotNull();
org.apache.fluss.row.InternalRow flussNestedRow = expectedRow.getRow(2, 3);
assertThat(paimonNestedRow.getInt(0)).isEqualTo(flussNestedRow.getInt(0));
assertThat(paimonNestedRow.getString(1).toString())
.isEqualTo(flussNestedRow.getString(1).toString());
assertThat(paimonNestedRow.getDouble(2)).isEqualTo(flussNestedRow.getDouble(2));
}

Map<String, String> properties =
new HashMap<String, String>() {
{
put(
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
"[{\"bucket\":0,\"offset\":1}]");
}
};
checkSnapshotPropertyInPaimon(t1, properties);
} finally {
jobClient.cancel().get();
}
}

@Test
void testTieringToDvEnabledTable() throws Exception {
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
Expand Down
Loading