|
17 | 17 |
|
18 | 18 | package org.apache.fluss.flink.sink; |
19 | 19 |
|
| 20 | +import org.apache.fluss.exception.InvalidTableException; |
20 | 21 | import org.apache.fluss.server.testutils.FlussClusterExtension; |
21 | 22 |
|
22 | 23 | import org.apache.flink.api.common.RuntimeExecutionMode; |
| 24 | +import org.apache.flink.api.common.typeinfo.TypeInformation; |
| 25 | +import org.apache.flink.api.common.typeinfo.Types; |
| 26 | +import org.apache.flink.api.java.typeutils.RowTypeInfo; |
| 27 | +import org.apache.flink.streaming.api.datastream.DataStream; |
23 | 28 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| 29 | +import org.apache.flink.table.api.DataTypes; |
24 | 30 | import org.apache.flink.table.api.EnvironmentSettings; |
| 31 | +import org.apache.flink.table.api.Schema; |
25 | 32 | import org.apache.flink.table.api.TableEnvironment; |
26 | 33 | import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; |
27 | 34 | import org.apache.flink.table.api.config.ExecutionConfigOptions; |
@@ -131,6 +138,47 @@ void testArrayTypesInLogTable() throws Exception { |
131 | 138 | assertResultsIgnoreOrder(rowIter, expectedRows, true); |
132 | 139 | } |
133 | 140 |
|
| 141 | + @Test |
| 142 | + void testArrayTypesInPartitionedLogTable() throws Exception { |
| 143 | + tEnv.executeSql( |
| 144 | + "create table array_log_test (" |
| 145 | + + "id int, " |
| 146 | + + "dt string, " |
| 147 | + + "int_array array<int>, " |
| 148 | + + "bigint_array array<bigint>, " |
| 149 | + + "float_array array<float>, " |
| 150 | + + "double_array array<double>, " |
| 151 | + + "string_array array<string>, " |
| 152 | + + "boolean_array array<boolean>, " |
| 153 | + + "nested_int_array array<array<int>>, " |
| 154 | + + "nested_string_array array<array<string>>, " |
| 155 | + + "deeply_nested_array array<array<array<int>>>" |
| 156 | + + ") PARTITIONED BY (dt) " |
| 157 | + + "with ('bucket.num' = '3')"); |
| 158 | + |
| 159 | + tEnv.executeSql( |
| 160 | + "INSERT INTO array_log_test VALUES " |
| 161 | + + "(1, '2014', ARRAY[1, 2, CAST(NULL AS INT)], ARRAY[100, CAST(NULL AS BIGINT), 300], " |
| 162 | + + "ARRAY[CAST(1.1 AS FLOAT), CAST(NULL AS FLOAT)], ARRAY[2.2, 3.3, CAST(NULL AS DOUBLE)], " |
| 163 | + + "ARRAY['a', CAST(NULL AS STRING), 'c'], ARRAY[true, CAST(NULL AS BOOLEAN), false], " |
| 164 | + + "ARRAY[ARRAY[1, 2], CAST(NULL AS ARRAY<INT>), ARRAY[3]], " |
| 165 | + + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS STRING), 'y']], " |
| 166 | + + "ARRAY[ARRAY[ARRAY[1, 2]], ARRAY[ARRAY[3, 4, 5]]]), " |
| 167 | + + "(2, '2013', CAST(NULL AS ARRAY<INT>), ARRAY[400, 500], " |
| 168 | + + "ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], " |
| 169 | + + "ARRAY['d', 'e'], ARRAY[true], " |
| 170 | + + "ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']], " |
| 171 | + + "ARRAY[ARRAY[ARRAY[9]]])") |
| 172 | + .await(); |
| 173 | + |
| 174 | + CloseableIterator<Row> rowIter = tEnv.executeSql("select * from array_log_test").collect(); |
| 175 | + List<String> expectedRows = |
| 176 | + Arrays.asList( |
| 177 | + "+I[1, 2014, [1, 2, null], [100, null, 300], [1.1, null], [2.2, 3.3, null], [a, null, c], [true, null, false], [[1, 2], null, [3]], [[x], [null, y]], [[[1, 2]], [[3, 4, 5]]]]", |
| 178 | + "+I[2, 2013, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]], [[[9]]]]"); |
| 179 | + assertResultsIgnoreOrder(rowIter, expectedRows, true); |
| 180 | + } |
| 181 | + |
134 | 182 | @Test |
135 | 183 | void testArrayTypesInPrimaryKeyTable() throws Exception { |
136 | 184 | tEnv.executeSql( |
@@ -180,52 +228,83 @@ void testArrayTypesInPrimaryKeyTable() throws Exception { |
180 | 228 | "-U[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]", |
181 | 229 | "+U[1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]", |
182 | 230 | "+I[4, [20, 30], [2000, 3000], [20.2], [30.3], [new], [true], [[200], [300]], [[new1], [new2]]]"); |
| 231 | + assertResultsIgnoreOrder(rowIter, expectedRows, false); |
| 232 | + |
| 233 | + // insert into with partial update test |
| 234 | + tEnv.executeSql( |
| 235 | + "INSERT INTO array_pk_test (id, string_array, bigint_array) VALUES " |
| 236 | + + "(2, ARRAY['partially', 'updated'], ARRAY[9999])") |
| 237 | + .await(); |
| 238 | + |
| 239 | + expectedRows = |
| 240 | + Arrays.asList( |
| 241 | + "-U[2, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]]]", |
| 242 | + "+U[2, null, [9999], [4.4], [5.5], [partially, updated], [true], [[6, 7, 8]], [[z]]]"); |
183 | 243 | assertResultsIgnoreOrder(rowIter, expectedRows, true); |
| 244 | + |
| 245 | + // test lookup join with array type, test partitioned table |
| 246 | + Schema srcSchema = |
| 247 | + Schema.newBuilder() |
| 248 | + .column("a", DataTypes.INT()) |
| 249 | + .column("name", DataTypes.STRING()) |
| 250 | + .column("c", DataTypes.INT()) |
| 251 | + .columnByExpression("proc", "PROCTIME()") |
| 252 | + .build(); |
| 253 | + RowTypeInfo srcTestTypeInfo = |
| 254 | + new RowTypeInfo( |
| 255 | + new TypeInformation[] {Types.INT, Types.STRING, Types.INT}, |
| 256 | + new String[] {"a", "name", "c"}); |
| 257 | + List<Row> testData = |
| 258 | + Arrays.asList( |
| 259 | + Row.of(1, "name1", 11), |
| 260 | + Row.of(2, "name2", 2), |
| 261 | + Row.of(3, "name33", 33), |
| 262 | + Row.of(10, "name0", 44)); |
| 263 | + DataStream<Row> srcDs = env.fromCollection(testData).returns(srcTestTypeInfo); |
| 264 | + tEnv.dropTemporaryView("src"); |
| 265 | + tEnv.createTemporaryView("src", tEnv.fromDataStream(srcDs, srcSchema)); |
| 266 | + CloseableIterator<Row> collected = |
| 267 | + tEnv.executeSql( |
| 268 | + "SELECT a, name, array_pk_test.* FROM src " |
| 269 | + + "LEFT JOIN array_pk_test FOR SYSTEM_TIME AS OF src.proc " |
| 270 | + + "ON src.a = array_pk_test.id") |
| 271 | + .collect(); |
| 272 | + List<String> expected = |
| 273 | + Arrays.asList( |
| 274 | + "+I[1, name1, 1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]", |
| 275 | + "+I[2, name2, 2, null, [9999], [4.4], [5.5], [partially, updated], [true], [[6, 7, 8]], [[z]]]", |
| 276 | + "+I[3, name33, 3, [10], [600], [7.7], [8.8], [f], [false], [[9]], [[w]]]", |
| 277 | + "+I[10, name0, null, null, null, null, null, null, null, null, null]"); |
| 278 | + assertResultsIgnoreOrder(collected, expected, true); |
184 | 279 | } |
185 | 280 |
|
186 | 281 | @Test |
187 | | - void testArrayTypeAsPartitionKeyThrowsException() { |
| 282 | + void testExceptionsForArrayTypeUsage() { |
188 | 283 | assertThatThrownBy( |
189 | 284 | () -> |
190 | 285 | tEnv.executeSql( |
191 | 286 | "create table array_partition_test (" |
192 | 287 | + "id int, " |
193 | 288 | + "data string, " |
194 | 289 | + "tags array<string>, " |
195 | | - + "primary key(id) not enforced" |
| 290 | + + "primary key(id, tags) not enforced" |
196 | 291 | + ") partitioned by (tags)")) |
197 | | - .cause() |
198 | | - .isInstanceOf(UnsupportedOperationException.class) |
199 | | - .hasMessageContaining("is not supported"); |
200 | | - } |
| 292 | + .hasRootCauseInstanceOf(InvalidTableException.class) |
| 293 | + .hasRootCauseMessage( |
| 294 | + "Primary key column 'tags' has unsupported data type ARRAY<STRING> NOT NULL. " |
| 295 | + + "Currently, primary key column does not support types: [ARRAY, MAP, ROW]."); |
201 | 296 |
|
202 | | - @Test |
203 | | - void testArrayTypeAsPrimaryKeyThrowsException() { |
204 | | - assertThatThrownBy( |
205 | | - () -> |
206 | | - tEnv.executeSql( |
207 | | - "create table array_pk_invalid (" |
208 | | - + "id int, " |
209 | | - + "data array<string>, " |
210 | | - + "primary key(data) not enforced" |
211 | | - + ")")) |
212 | | - .cause() |
213 | | - .isInstanceOf(UnsupportedOperationException.class) |
214 | | - .hasMessageContaining("is not supported"); |
215 | | - } |
216 | | - |
217 | | - @Test |
218 | | - void testArrayTypeAsBucketKeyThrowsException() { |
219 | 297 | assertThatThrownBy( |
220 | 298 | () -> |
221 | 299 | tEnv.executeSql( |
222 | 300 | "create table array_bucket_test (" |
223 | 301 | + "id int, " |
224 | | - + "data array<string>, " |
225 | | - + "primary key(id) not enforced" |
226 | | - + ") with ('bucket.key' = 'data', 'bucket.num' = '3')")) |
227 | | - .cause() |
228 | | - .isInstanceOf(UnsupportedOperationException.class) |
229 | | - .hasMessageContaining("is not supported"); |
| 302 | + + "data string, " |
| 303 | + + "tags array<string> " |
| 304 | + + ") with ('bucket.key' = 'tags')")) |
| 305 | + .hasRootCauseInstanceOf(InvalidTableException.class) |
| 306 | + .hasRootCauseMessage( |
| 307 | + "Bucket key column 'tags' has unsupported data type ARRAY<STRING>. " |
| 308 | + + "Currently, bucket key column does not support types: [ARRAY, MAP, ROW]."); |
230 | 309 | } |
231 | 310 | } |
0 commit comments