Skip to content

Commit 9708d74

Browse files
authored
[flink] add flink support for VectorType (apache#7238)
1 parent 4149e72 commit 9708d74

File tree

9 files changed

+420
-6
lines changed

9 files changed

+420
-6
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataTypeToLogicalType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ public LogicalType visit(ArrayType arrayType) {
162162

163163
@Override
164164
public LogicalType visit(VectorType vectorType) {
165-
throw new UnsupportedOperationException("Not support VectorType yet.");
165+
return new org.apache.flink.table.types.logical.ArrayType(
166+
vectorType.isNullable(), vectorType.getElementType().accept(this));
166167
}
167168

168169
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
156156
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
157157
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
158+
import static org.apache.paimon.flink.LogicalTypeConversion.toVectorType;
158159
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
159160
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
160161
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
@@ -1040,14 +1041,32 @@ public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
10401041
field ->
10411042
schemaBuilder.column(
10421043
field.getName(),
1043-
blobFields.contains(field.getName())
1044-
? toBlobType(field.getType())
1045-
: toDataType(field.getType()),
1044+
resolveDataType(field.getName(), field.getType(), options),
10461045
columnComments.get(field.getName())));
10471046

10481047
return schemaBuilder.build();
10491048
}
10501049

1050+
private static org.apache.paimon.types.DataType resolveDataType(
1051+
String fieldName,
1052+
org.apache.flink.table.types.logical.LogicalType logicalType,
1053+
Map<String, String> options) {
1054+
List<String> blobFields = CoreOptions.blobField(options);
1055+
if (blobFields.contains(fieldName)) {
1056+
return toBlobType(logicalType);
1057+
}
1058+
if (logicalType instanceof org.apache.flink.table.types.logical.ArrayType) {
1059+
String vectorDim = options.get(String.format("field.%s.vector-dim", fieldName));
1060+
if (vectorDim != null) {
1061+
org.apache.flink.table.types.logical.LogicalType elementType =
1062+
((org.apache.flink.table.types.logical.ArrayType) logicalType)
1063+
.getElementType();
1064+
return toVectorType(elementType, vectorDim);
1065+
}
1066+
}
1067+
return toDataType(logicalType);
1068+
}
1069+
10511070
private static Map<String, String> getColumnComments(CatalogBaseTable catalogTable) {
10521071
return catalogTable.getUnresolvedSchema().getColumns().stream()
10531072
.filter(c -> c.getComment().isPresent())

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public InternalArray getArray(int pos) {
163163

164164
@Override
165165
public InternalVector getVector(int pos) {
166-
throw new UnsupportedOperationException("Not support VectorType yet.");
166+
return new FlinkVectorWrapper(row.getArray(pos));
167167
}
168168

169169
@Override
@@ -268,7 +268,7 @@ public InternalArray getArray(int pos) {
268268

269269
@Override
270270
public InternalVector getVector(int pos) {
271-
throw new UnsupportedOperationException("Not support VectorType yet.");
271+
return new FlinkVectorWrapper(array.getArray(pos));
272272
}
273273

274274
@Override
@@ -317,6 +317,12 @@ public double[] toDoubleArray() {
317317
}
318318
}
319319

320+
private static class FlinkVectorWrapper extends FlinkArrayWrapper implements InternalVector {
321+
private FlinkVectorWrapper(org.apache.flink.table.data.ArrayData array) {
322+
super(array);
323+
}
324+
}
325+
320326
private static class FlinkMapWrapper implements InternalMap {
321327

322328
private final org.apache.flink.table.data.MapData map;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.apache.paimon.types.BlobType;
2222
import org.apache.paimon.types.DataType;
23+
import org.apache.paimon.types.DataTypes;
2324
import org.apache.paimon.types.RowType;
25+
import org.apache.paimon.types.VectorType;
2426

2527
import org.apache.flink.table.types.logical.BinaryType;
2628
import org.apache.flink.table.types.logical.LogicalType;
@@ -49,6 +51,19 @@ public static BlobType toBlobType(LogicalType logicalType) {
4951
return new BlobType();
5052
}
5153

54+
public static VectorType toVectorType(LogicalType elementType, String vectorDim) {
55+
checkArgument(
56+
!vectorDim.trim().isEmpty(),
57+
"Expected an integer for vector-dim, but got empty value.");
58+
try {
59+
int dim = Integer.parseInt(vectorDim);
60+
return DataTypes.VECTOR(dim, toDataType(elementType));
61+
} catch (NumberFormatException e) {
62+
throw new IllegalArgumentException(
63+
"Expected an integer for vector-dim, but got: " + vectorDim);
64+
}
65+
}
66+
5267
public static RowType toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
5368
return (RowType) toDataType(logicalType, new AtomicInteger(-1));
5469
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink;
20+
21+
import org.apache.paimon.data.BinaryVector;
22+
import org.apache.paimon.data.GenericRow;
23+
import org.apache.paimon.data.InternalRow;
24+
25+
import org.apache.flink.table.data.ArrayData;
26+
import org.junit.jupiter.api.Test;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/** Tests for {@link FlinkRowData} and vector type. */
31+
public class FlinkRowDataWithVectorTest {
32+
33+
@Test
34+
public void testVectorAsArrayData() {
35+
float[] values = new float[] {1.0f, 2.0f, 3.0f};
36+
InternalRow row = GenericRow.of(1, BinaryVector.fromPrimitiveArray(values));
37+
38+
FlinkRowData rowData = new FlinkRowData(row);
39+
ArrayData arrayData = rowData.getArray(1);
40+
41+
assertThat(arrayData.toFloatArray()).isEqualTo(values);
42+
}
43+
44+
@Test
45+
public void testNullVector() {
46+
InternalRow row = GenericRow.of(1, null);
47+
FlinkRowData rowData = new FlinkRowData(row);
48+
49+
assertThat(rowData.isNullAt(1)).isTrue();
50+
}
51+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink;
20+
21+
import org.apache.flink.table.data.GenericArrayData;
22+
import org.apache.flink.table.data.GenericRowData;
23+
import org.junit.jupiter.api.Test;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
27+
/** Tests for {@link FlinkRowWrapper} and vector type. */
28+
public class FlinkRowWrapperWithVectorTest {
29+
30+
@Test
31+
public void testVectorAccess() {
32+
GenericRowData row = new GenericRowData(2);
33+
row.setField(0, 1);
34+
row.setField(1, new GenericArrayData(new float[] {1.0f, 2.0f, 3.0f}));
35+
36+
FlinkRowWrapper wrapper = new FlinkRowWrapper(row);
37+
assertThat(wrapper.getVector(1).toFloatArray()).isEqualTo(new float[] {1.0f, 2.0f, 3.0f});
38+
}
39+
40+
@Test
41+
public void testNullVector() {
42+
GenericRowData row = new GenericRowData(1);
43+
row.setField(0, null);
44+
45+
FlinkRowWrapper wrapper = new FlinkRowWrapper(row);
46+
assertThat(wrapper.isNullAt(0)).isTrue();
47+
}
48+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink;
20+
21+
import org.apache.paimon.types.DataTypes;
22+
import org.apache.paimon.types.VectorType;
23+
24+
import org.apache.flink.table.types.logical.ArrayType;
25+
import org.apache.flink.table.types.logical.FloatType;
26+
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.junit.jupiter.api.Test;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
32+
/** Tests for {@link LogicalTypeConversion}. */
33+
public class LogicalTypeConversionTest {
34+
35+
@Test
36+
public void testToVectorType() {
37+
VectorType vectorType = LogicalTypeConversion.toVectorType(new FloatType(), "3");
38+
assertThat(vectorType).isEqualTo(DataTypes.VECTOR(3, DataTypes.FLOAT()));
39+
}
40+
41+
@Test
42+
public void testToVectorTypeInvalidDim() {
43+
assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new FloatType(), ""))
44+
.isInstanceOf(IllegalArgumentException.class);
45+
assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new FloatType(), "abc"))
46+
.isInstanceOf(IllegalArgumentException.class);
47+
assertThatThrownBy(() -> LogicalTypeConversion.toVectorType(new FloatType(), "0"))
48+
.isInstanceOf(IllegalArgumentException.class);
49+
}
50+
51+
@Test
52+
public void testVectorTypeToLogicalType() {
53+
LogicalType logicalType =
54+
LogicalTypeConversion.toLogicalType(DataTypes.VECTOR(4, DataTypes.FLOAT()));
55+
assertThat(logicalType).isInstanceOf(ArrayType.class);
56+
ArrayType arrayType = (ArrayType) logicalType;
57+
assertThat(arrayType.getElementType()).isInstanceOf(FloatType.class);
58+
}
59+
}

0 commit comments

Comments
 (0)