-
Notifications
You must be signed in to change notification settings - Fork 458
[common] Introduce ROW type for ARROW, COMPACTED and INDEXED formats #2079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[common] Introduce ROW type for ARROW, COMPACTED and INDEXED formats #2079
Conversation
0634a97 to
b2d938b
Compare
|
@XuQianJin-Stars this PR is a godsend. With a few small tweaks, I was able to get nested rows inside array fields working in a Fluss PK table, with tiering to Paimon and union read enabled all running in flink sql client. Is there a way we could merge my changes into your pull request and combine our efforts? |
hi @binary-signal well, sure. You can also wait for this PR to be approved and then submit a PR to improve it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @XuQianJin-Stars .
I think there are the following 4 problems in this PR.
(1) The current implementation uses AlignedRow format as nested row format. We should use the same row format as the parant row, like if it's CompactedRow, then the nested row (like array<row>) should also use CompactedRow.
(2) The current implementation doesn't support nested row in arrays.
(3) Please add tests!! You can add tests to verify nested row types
- row<bool, int, long, string, bytes, timestamp_ltz, timestamp_ntz, date, time>
- array<row<array>>
- row<array<row>>
in following tests
DefaultCompletedFetchTest#testComplexTypeFetchFlinkComplexTypeITCase
(4) the dependencies and licenses changes seems not necessary.
I have appended a commit to fix the (1) and (2) problem. Could you fix the other problems?
fluss-client/pom.xml
Outdated
| <filter> | ||
| <artifact>*</artifact> | ||
| <excludes> | ||
| <exclude>LICENSE*</exclude> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exclude all the LICENSE files?
| * | ||
| * @param fieldType the field type of the indexed row | ||
| */ | ||
| public static FieldWriter createFieldWriter(DataType fieldType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a duplicated code block, this can be replaced by BinaryWriter.createValueWriter(fieldType).
| return (flussField) -> FlinkArrayConverter.deserialize(flussDataType, flussField); | ||
| case MAP: | ||
| // TODO: Add Map type support in future | ||
| throw new UnsupportedOperationException("Map type not supported yet"); | ||
| case ROW: | ||
| return (flussField) -> FlinkRowConverter.deserialize(flussDataType, flussField); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I still prefer the original implementation of the conversion which is very clean by using elementGetter and elementConverter. Introducing FlinkArrayConverter and FlinkRowConverter seems unnecessary, and they don't need to implement ArrayData and RowData interfaces.
| return new FlinkArrayConverter(flussDataType, flussField).getArrayData(); | ||
| } | ||
|
|
||
| private static Object getFieldValue(InternalArray array, int pos, DataType dataType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to do the element getter again, just use the InternalArray.createElementGetter.
| if (!eleType.isNullable()) { | ||
| switch (eleType.getTypeRoot()) { | ||
| case BOOLEAN: | ||
| return new GenericArrayData(from.toBooleanArray()); | ||
| case TINYINT: | ||
| return new GenericArrayData(from.toByteArray()); | ||
| case SMALLINT: | ||
| return new GenericArrayData(from.toShortArray()); | ||
| case INTEGER: | ||
| case DATE: | ||
| case TIME_WITHOUT_TIME_ZONE: | ||
| return new GenericArrayData(from.toIntArray()); | ||
| case BIGINT: | ||
| return new GenericArrayData(from.toLongArray()); | ||
| case FLOAT: | ||
| return new GenericArrayData(from.toFloatArray()); | ||
| case DOUBLE: | ||
| return new GenericArrayData(from.toDoubleArray()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the nullability information in Flink’s DataType is not always reliable. To ensure robustness, we should wrap this code block in a try-catch and fall back to using the element converter if an exception occurs. This guarantees safe execution even when nullability metadata is inaccurate.
fluss-common/pom.xml
Outdated
| <dependency> | ||
| <groupId>org.eclipse.collections</groupId> | ||
| <artifactId>eclipse-collections</artifactId> | ||
| <version>11.1.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this, this should be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this, this should be removed.
Reason 1: Purpose
Arrow uses eclipse-collections to provide primitive type collections (such as IntList, LongList) to optimize memory usage and performance, avoiding the overhead of Java boxing/unboxing operations.
Reason 2: Why Explicit Declaration is Required
Although the project uses fluss-shaded-arrow, the eclipse-collections dependency was not included during the shading process, therefore it must be explicitly declared as a separate dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XuQianJin-Stars , sorry, I didn't find where Arrow uses eclipse-collections. I tried to remove all the license files and pom changes. If the CI passes, I will merge the PR.
| "f20", | ||
| DataTypes.ARRAY(DataTypes.FLOAT().copy(false))), // vector embedding type | ||
| new DataField( | ||
| "f21", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))) // nested array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removes the nested array test data? I think they are very useful.
| 20, | ||
| new GenericArrayData( | ||
| new float[] {0.1f, 1.1f, -0.5f, 6.6f, Float.MAX_VALUE, Float.MIN_VALUE})); | ||
| genericRowData.setField( | ||
| 21, | ||
| new GenericArrayData( | ||
| new GenericArrayData[] { | ||
| new GenericArrayData( | ||
| new StringData[] {fromString("a"), null, fromString("c")}), | ||
| null, | ||
| new GenericArrayData( | ||
| new StringData[] {fromString("hello"), fromString("world")}) | ||
| })); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removes the original array test data? I think they are useful to our test.
| new FlinkAsFlussArray(flinkRow.getArray(21).getArray(2)) | ||
| .toObjectArray(DataTypes.STRING()); | ||
| assertThat(stringArray2) | ||
| .isEqualTo(new BinaryString[] {fromString("hello"), fromString("world")}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. shouldn't remove
| @Override | ||
| public InternalRow getRow(int pos, int numFields) { | ||
| // TODO: Support Row type conversion from Iceberg to Fluss | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw UnsupportedOperationException
b2d938b to
0d29df6
Compare
well,i will fix the other problems. |
e8bbdf6 to
158b328
Compare
158b328 to
cfc1a30
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XuQianJin-Stars , sorry, I didn't find where Arrow uses eclipse-collections. I tried to remove all the license files and pom changes. If the CI passes, I will merge the PR.
cfc1a30 to
143dac8
Compare
…lace dependency on eclipse-collections by arrow
143dac8 to
a3bc24e
Compare
Purpose
Linked issue: close #1974
This PR introduces support for nested ROW type in ARROW, COMPACTED, and INDEXED formats, enabling Fluss to handle complex nested data structures including nested rows and nested arrays.
Brief change log
Core Changes:
ArrowRowColumnVectorto support reading ROW type from Arrow formatArrowRowWriterto support writing ROW type to Arrow formatRowColumnVectorinterface for columnar row operationsRowSerializerfor ROW type serialization/deserializationIndexedRowWriterandIndexedRowReaderto support nested ROW typeCompactedRowWriterandCompactedRowReaderto support nested ROW typeType System Enhancements:
InternalRowandInternalArrayinterfaces withgetRow()methodDataGettersto include ROW type getterArrowUtilsto create Arrow column vectors for ROW typeConnector Integration:
FlinkRowConverterandFlinkArrayConverterfor Flink-Fluss type conversionPaimonRowAsFlussRowandPaimonArrayAsFlussArrayfor Paimon-Fluss type conversionTest Coverage:
ArrowReaderWriterTestwith nested ROW and nested ARRAY test casesIndexedRowTestandIndexedRowReaderTestfor ROW type validationThis change enables Fluss to store and process complex nested data structures, which is essential for advanced analytics and complex data modeling scenarios.
Tests
This PR includes the following unit tests to verify the nested ROW and ARRAY type support in Arrow format:
Unit Tests:
ArrowReaderWriterTest#testReaderWriter()- Validates that Arrow reader and writer can correctly handle nested ROW and nested ARRAY typesARRAY(ARRAY(STRING))ROW(INT, ROW(INT, STRING, BIGINT), STRING)IndexedRowReaderTest- Verifies IndexedRow format support for ROW type read/write operationsIndexedRowTest- Validates IndexedRow handling of nested types in various scenariosTest Coverage:
Test Data:
All test cases pass with
mvn clean verify.API and Format
This change affects the storage format:
Documentation
This change introduces a new feature (nested ROW type support). Documentation is not required as per user request.