Skip to content

Commit eb8736b

Browse files
committed
[server][flink] Validate array type in server side and add more array type Flink IT cases (apache#2040)
1 parent fe7d0cb commit eb8736b

File tree

5 files changed

+150
-395
lines changed

5 files changed

+150
-395
lines changed

fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,23 @@
2222
import org.apache.fluss.row.columnar.ColumnVector;
2323
import org.apache.fluss.row.columnar.ColumnarRow;
2424
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
25-
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
2625

2726
import static org.apache.fluss.utils.Preconditions.checkNotNull;
2827

2928
/** {@link ArrowReader} which read the underlying Arrow format data as {@link InternalRow}. */
3029
@Internal
3130
public class ArrowReader {
3231

33-
/**
34-
* The arrow root which holds vector resources and should be released when the reader is closed.
35-
*/
36-
private final VectorSchemaRoot root;
37-
3832
/**
3933
* An array of vectors which are responsible for the deserialization of each column of the rows.
4034
*/
4135
private final ColumnVector[] columnVectors;
4236

4337
private final int rowCount;
4438

45-
public ArrowReader(VectorSchemaRoot root, ColumnVector[] columnVectors) {
46-
this.root = root;
39+
public ArrowReader(ColumnVector[] columnVectors, int rowCount) {
4740
this.columnVectors = checkNotNull(columnVectors);
48-
this.rowCount = root.getRowCount();
41+
this.rowCount = rowCount;
4942
}
5043

5144
public int getRowCount() {
@@ -56,9 +49,4 @@ public int getRowCount() {
5649
public ColumnarRow read(int rowId) {
5750
return new ColumnarRow(new VectorizedColumnBatch(columnVectors), rowId);
5851
}
59-
60-
public void close() {
61-
// Do not close the VectorSchemaRoot here.
62-
// The root will be closed when LogRecordReadContext closes.
63-
}
6452
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java

Lines changed: 0 additions & 122 deletions
This file was deleted.

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java

Lines changed: 108 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,18 @@
1717

1818
package org.apache.fluss.flink.sink;
1919

20+
import org.apache.fluss.exception.InvalidTableException;
2021
import org.apache.fluss.server.testutils.FlussClusterExtension;
2122

2223
import org.apache.flink.api.common.RuntimeExecutionMode;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.common.typeinfo.Types;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.streaming.api.datastream.DataStream;
2328
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
29+
import org.apache.flink.table.api.DataTypes;
2430
import org.apache.flink.table.api.EnvironmentSettings;
31+
import org.apache.flink.table.api.Schema;
2532
import org.apache.flink.table.api.TableEnvironment;
2633
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2734
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -131,6 +138,47 @@ void testArrayTypesInLogTable() throws Exception {
131138
assertResultsIgnoreOrder(rowIter, expectedRows, true);
132139
}
133140

141+
@Test
142+
void testArrayTypesInPartitionedLogTable() throws Exception {
143+
tEnv.executeSql(
144+
"create table array_log_test ("
145+
+ "id int, "
146+
+ "dt string, "
147+
+ "int_array array<int>, "
148+
+ "bigint_array array<bigint>, "
149+
+ "float_array array<float>, "
150+
+ "double_array array<double>, "
151+
+ "string_array array<string>, "
152+
+ "boolean_array array<boolean>, "
153+
+ "nested_int_array array<array<int>>, "
154+
+ "nested_string_array array<array<string>>, "
155+
+ "deeply_nested_array array<array<array<int>>>"
156+
+ ") PARTITIONED BY (dt) "
157+
+ "with ('bucket.num' = '3')");
158+
159+
tEnv.executeSql(
160+
"INSERT INTO array_log_test VALUES "
161+
+ "(1, '2014', ARRAY[1, 2, CAST(NULL AS INT)], ARRAY[100, CAST(NULL AS BIGINT), 300], "
162+
+ "ARRAY[CAST(1.1 AS FLOAT), CAST(NULL AS FLOAT)], ARRAY[2.2, 3.3, CAST(NULL AS DOUBLE)], "
163+
+ "ARRAY['a', CAST(NULL AS STRING), 'c'], ARRAY[true, CAST(NULL AS BOOLEAN), false], "
164+
+ "ARRAY[ARRAY[1, 2], CAST(NULL AS ARRAY<INT>), ARRAY[3]], "
165+
+ "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS STRING), 'y']], "
166+
+ "ARRAY[ARRAY[ARRAY[1, 2]], ARRAY[ARRAY[3, 4, 5]]]), "
167+
+ "(2, '2013', CAST(NULL AS ARRAY<INT>), ARRAY[400, 500], "
168+
+ "ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], "
169+
+ "ARRAY['d', 'e'], ARRAY[true], "
170+
+ "ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']], "
171+
+ "ARRAY[ARRAY[ARRAY[9]]])")
172+
.await();
173+
174+
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from array_log_test").collect();
175+
List<String> expectedRows =
176+
Arrays.asList(
177+
"+I[1, 2014, [1, 2, null], [100, null, 300], [1.1, null], [2.2, 3.3, null], [a, null, c], [true, null, false], [[1, 2], null, [3]], [[x], [null, y]], [[[1, 2]], [[3, 4, 5]]]]",
178+
"+I[2, 2013, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]], [[[9]]]]");
179+
assertResultsIgnoreOrder(rowIter, expectedRows, true);
180+
}
181+
134182
@Test
135183
void testArrayTypesInPrimaryKeyTable() throws Exception {
136184
tEnv.executeSql(
@@ -180,52 +228,83 @@ void testArrayTypesInPrimaryKeyTable() throws Exception {
180228
"-U[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]",
181229
"+U[1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]",
182230
"+I[4, [20, 30], [2000, 3000], [20.2], [30.3], [new], [true], [[200], [300]], [[new1], [new2]]]");
231+
assertResultsIgnoreOrder(rowIter, expectedRows, false);
232+
233+
// insert into with partial update test
234+
tEnv.executeSql(
235+
"INSERT INTO array_pk_test (id, string_array, bigint_array) VALUES "
236+
+ "(2, ARRAY['partially', 'updated'], ARRAY[9999])")
237+
.await();
238+
239+
expectedRows =
240+
Arrays.asList(
241+
"-U[2, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]]]",
242+
"+U[2, null, [9999], [4.4], [5.5], [partially, updated], [true], [[6, 7, 8]], [[z]]]");
183243
assertResultsIgnoreOrder(rowIter, expectedRows, true);
244+
245+
// test lookup join with array type, test partitioned table
246+
Schema srcSchema =
247+
Schema.newBuilder()
248+
.column("a", DataTypes.INT())
249+
.column("name", DataTypes.STRING())
250+
.column("c", DataTypes.INT())
251+
.columnByExpression("proc", "PROCTIME()")
252+
.build();
253+
RowTypeInfo srcTestTypeInfo =
254+
new RowTypeInfo(
255+
new TypeInformation[] {Types.INT, Types.STRING, Types.INT},
256+
new String[] {"a", "name", "c"});
257+
List<Row> testData =
258+
Arrays.asList(
259+
Row.of(1, "name1", 11),
260+
Row.of(2, "name2", 2),
261+
Row.of(3, "name33", 33),
262+
Row.of(10, "name0", 44));
263+
DataStream<Row> srcDs = env.fromCollection(testData).returns(srcTestTypeInfo);
264+
tEnv.dropTemporaryView("src");
265+
tEnv.createTemporaryView("src", tEnv.fromDataStream(srcDs, srcSchema));
266+
CloseableIterator<Row> collected =
267+
tEnv.executeSql(
268+
"SELECT a, name, array_pk_test.* FROM src "
269+
+ "LEFT JOIN array_pk_test FOR SYSTEM_TIME AS OF src.proc "
270+
+ "ON src.a = array_pk_test.id")
271+
.collect();
272+
List<String> expected =
273+
Arrays.asList(
274+
"+I[1, name1, 1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]",
275+
"+I[2, name2, 2, null, [9999], [4.4], [5.5], [partially, updated], [true], [[6, 7, 8]], [[z]]]",
276+
"+I[3, name33, 3, [10], [600], [7.7], [8.8], [f], [false], [[9]], [[w]]]",
277+
"+I[10, name0, null, null, null, null, null, null, null, null, null]");
278+
assertResultsIgnoreOrder(collected, expected, true);
184279
}
185280

186281
@Test
187-
void testArrayTypeAsPartitionKeyThrowsException() {
282+
void testExceptionsForArrayTypeUsage() {
188283
assertThatThrownBy(
189284
() ->
190285
tEnv.executeSql(
191286
"create table array_partition_test ("
192287
+ "id int, "
193288
+ "data string, "
194289
+ "tags array<string>, "
195-
+ "primary key(id) not enforced"
290+
+ "primary key(id, tags) not enforced"
196291
+ ") partitioned by (tags)"))
197-
.cause()
198-
.isInstanceOf(UnsupportedOperationException.class)
199-
.hasMessageContaining("is not supported");
200-
}
292+
.hasRootCauseInstanceOf(InvalidTableException.class)
293+
.hasRootCauseMessage(
294+
"Primary key column 'tags' has unsupported data type ARRAY<STRING> NOT NULL. "
295+
+ "Currently, primary key column does not support types: [ARRAY, MAP, ROW].");
201296

202-
@Test
203-
void testArrayTypeAsPrimaryKeyThrowsException() {
204-
assertThatThrownBy(
205-
() ->
206-
tEnv.executeSql(
207-
"create table array_pk_invalid ("
208-
+ "id int, "
209-
+ "data array<string>, "
210-
+ "primary key(data) not enforced"
211-
+ ")"))
212-
.cause()
213-
.isInstanceOf(UnsupportedOperationException.class)
214-
.hasMessageContaining("is not supported");
215-
}
216-
217-
@Test
218-
void testArrayTypeAsBucketKeyThrowsException() {
219297
assertThatThrownBy(
220298
() ->
221299
tEnv.executeSql(
222300
"create table array_bucket_test ("
223301
+ "id int, "
224-
+ "data array<string>, "
225-
+ "primary key(id) not enforced"
226-
+ ") with ('bucket.key' = 'data', 'bucket.num' = '3')"))
227-
.cause()
228-
.isInstanceOf(UnsupportedOperationException.class)
229-
.hasMessageContaining("is not supported");
302+
+ "data string, "
303+
+ "tags array<string> "
304+
+ ") with ('bucket.key' = 'tags')"))
305+
.hasRootCauseInstanceOf(InvalidTableException.class)
306+
.hasRootCauseMessage(
307+
"Bucket key column 'tags' has unsupported data type ARRAY<STRING>. "
308+
+ "Currently, bucket key column does not support types: [ARRAY, MAP, ROW].");
230309
}
231310
}

0 commit comments

Comments
 (0)