[FLINK-39106] Support ROW and ARRAY<ROW> data types in DynamoDB API / SQL sink#234
[FLINK-39106] Support ROW and ARRAY<ROW> data types in DynamoDB API / SQL sink#234ddebowczyk92 wants to merge 4 commits into
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
|
|
||
| private static AttributeConverter<Row> createRowDocumentConverter( | ||
| TableSchema<Row> tableSchema) { | ||
| return new AttributeConverter<>() { |
There was a problem hiding this comment.
i think it should be
return new AttributeConverter<Row>()
instead
There was a problem hiding this comment.
Hey @gguptp! It's diamond operator - Java infers the generic type from the return type declaration, <Row> would be redundant here
There was a problem hiding this comment.
Yep, im aware, but for some reason, it was throwing an error in my ide
|
@ferenc-csaky can we have this merged? |
| } | ||
|
|
||
| private java.util.Optional<AttributeConverter> buildRowAttributeConverter(DataType dataType) { | ||
| if (LogicalTypeRoot.ROW.equals(dataType.getLogicalType().getTypeRoot())) { |
There was a problem hiding this comment.
Enums can be compared with the equality operator, so I'd rather use that instead of .equals. Same in L117 and L159
| }); | ||
| } | ||
|
|
||
| private java.util.Optional<AttributeConverter> buildRowAttributeConverter(DataType dataType) { |
There was a problem hiding this comment.
Let's add a qualified import for Optional.
| builder.attributeConverterProviders( | ||
| newAttributeConverterProvider, AttributeConverterProvider.defaultProvider()); | ||
|
|
||
| DataTypes.Field[] fields = DataType.getFields(dataType).toArray(new DataTypes.Field[0]); |
There was a problem hiding this comment.
Any reason for the array conversion? getFields returns an ArrayList, so calling .get on the list element has no downside, hence I do not see a reason to make it an array.
Purpose of the change
Adds support for ROW and ARRAY data types in the DynamoDB Table API / SQL sink. DynamoDB's Map (M) / document attribute type is a natural fit for Flink's
ROWtype, but was previously unsupported - the converter would fall through toEnhancedType.of(RowData.class)which has no built-inAttributeConverter, causing a runtime failure. Recursive nesting (ROWwithinROW,ARRAY<ROW>withinROW) is fully supported.Verifying this change
This change added tests and can be verified as follows:
testRowDataType— single-level nestedROWwithSTRINGandINTfieldstestRowDataTypeNullInnerField— null field inside nestedROWrenders asAttributeValue(NUL=true)testNestedRowDataType— 3-level deep nesting (outer -> middle -> leaf)testRowDataTypeArray—ARRAY<ROW<name STRING, value INT>>with 2 elements, asserts DynamoDB List of MapsROWandARRAY<ROW>columns toDynamoDbDynamicSinkFactoryTestschema and IT case SQL definitionSignificant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving))