|
58 | 58 |
|
59 | 59 | import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; |
60 | 60 |
|
61 | | -import org.apache.calcite.sql.validate.SqlValidatorException; |
62 | 61 | import org.assertj.core.api.Assertions; |
63 | 62 | import org.assertj.core.api.ThrowableAssert; |
64 | 63 | import org.codehaus.commons.compiler.CompileException; |
@@ -241,6 +240,136 @@ void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception { |
241 | 240 | "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); |
242 | 241 | } |
243 | 242 |
|
| 243 | + @ParameterizedTest(name = "API version: {0}") |
| 244 | + @EnumSource(ValuesDataSink.SinkApi.class) |
| 245 | + void testFilterUpdateOpTypeConversion(ValuesDataSink.SinkApi sinkApi) throws Exception { |
| 246 | + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); |
| 247 | + |
| 248 | + Configuration sourceConfig = new Configuration(); |
| 249 | + sourceConfig.set( |
| 250 | + ValuesDataSourceOptions.EVENT_SET_ID, |
| 251 | + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); |
| 252 | + |
| 253 | + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); |
| 254 | + Schema table1Schema = |
| 255 | + Schema.newBuilder() |
| 256 | + .physicalColumn("id", DataTypes.INT()) |
| 257 | + .physicalColumn("name", DataTypes.STRING()) |
| 258 | + .physicalColumn("age", DataTypes.INT()) |
| 259 | + .primaryKey("id") |
| 260 | + .build(); |
| 261 | + |
| 262 | + BinaryRecordDataGenerator generator = |
| 263 | + new BinaryRecordDataGenerator( |
| 264 | + table1Schema.getColumnDataTypes().toArray(new DataType[0])); |
| 265 | + |
| 266 | + List<Event> events = new ArrayList<>(); |
| 267 | + events.add(new CreateTableEvent(myTable1, table1Schema)); |
| 268 | + // Case 1: before=Y, after=Y -> UPDATE |
| 269 | + events.add( |
| 270 | + DataChangeEvent.insertEvent( |
| 271 | + myTable1, |
| 272 | + generator.generate( |
| 273 | + new Object[] {1, BinaryStringData.fromString("Alice"), 30}))); |
| 274 | + events.add( |
| 275 | + DataChangeEvent.updateEvent( |
| 276 | + myTable1, |
| 277 | + generator.generate( |
| 278 | + new Object[] {1, BinaryStringData.fromString("Alice"), 30}), |
| 279 | + generator.generate( |
| 280 | + new Object[] {1, BinaryStringData.fromString("Alice"), 40}))); |
| 281 | + // Case 2: before=Y, after=N -> DELETE |
| 282 | + events.add( |
| 283 | + DataChangeEvent.insertEvent( |
| 284 | + myTable1, |
| 285 | + generator.generate( |
| 286 | + new Object[] {2, BinaryStringData.fromString("Bob"), 30}))); |
| 287 | + events.add( |
| 288 | + DataChangeEvent.updateEvent( |
| 289 | + myTable1, |
| 290 | + generator.generate( |
| 291 | + new Object[] {2, BinaryStringData.fromString("Bob"), 30}), |
| 292 | + generator.generate( |
| 293 | + new Object[] {2, BinaryStringData.fromString("Bob"), 20}))); |
| 294 | + // Case 3: before=N, after=Y -> INSERT |
| 295 | + events.add( |
| 296 | + DataChangeEvent.insertEvent( |
| 297 | + myTable1, |
| 298 | + generator.generate( |
| 299 | + new Object[] {3, BinaryStringData.fromString("Carol"), 20}))); |
| 300 | + events.add( |
| 301 | + DataChangeEvent.updateEvent( |
| 302 | + myTable1, |
| 303 | + generator.generate( |
| 304 | + new Object[] {3, BinaryStringData.fromString("Carol"), 20}), |
| 305 | + generator.generate( |
| 306 | + new Object[] {3, BinaryStringData.fromString("Carol"), 35}))); |
| 307 | + // Case 4: before=N, after=N -> drop |
| 308 | + events.add( |
| 309 | + DataChangeEvent.insertEvent( |
| 310 | + myTable1, |
| 311 | + generator.generate( |
| 312 | + new Object[] {4, BinaryStringData.fromString("Dave"), 10}))); |
| 313 | + events.add( |
| 314 | + DataChangeEvent.updateEvent( |
| 315 | + myTable1, |
| 316 | + generator.generate( |
| 317 | + new Object[] {4, BinaryStringData.fromString("Dave"), 10}), |
| 318 | + generator.generate( |
| 319 | + new Object[] {4, BinaryStringData.fromString("Dave"), 15}))); |
| 320 | + |
| 321 | + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); |
| 322 | + |
| 323 | + SourceDef sourceDef = |
| 324 | + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); |
| 325 | + |
| 326 | + Configuration sinkConfig = new Configuration(); |
| 327 | + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); |
| 328 | + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); |
| 329 | + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); |
| 330 | + |
| 331 | + Configuration pipelineConfig = new Configuration(); |
| 332 | + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); |
| 333 | + PipelineDef pipelineDef = |
| 334 | + new PipelineDef( |
| 335 | + sourceDef, |
| 336 | + sinkDef, |
| 337 | + Collections.emptyList(), |
| 338 | + Collections.singletonList( |
| 339 | + new TransformDef( |
| 340 | + "default_namespace.default_schema.\\.*", |
| 341 | + null, |
| 342 | + "age > 25", |
| 343 | + null, |
| 344 | + null, |
| 345 | + null, |
| 346 | + null, |
| 347 | + null)), |
| 348 | + Collections.emptyList(), |
| 349 | + pipelineConfig); |
| 350 | + |
| 351 | + PipelineExecution execution = composer.compose(pipelineDef); |
| 352 | + execution.execute(); |
| 353 | + |
| 354 | + String[] outputEvents = outCaptor.toString().trim().split("\n"); |
| 355 | + |
| 356 | + assertThat(outputEvents) |
| 357 | + .containsExactlyInAnyOrder( |
| 358 | + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", |
| 359 | + // INSERT id=1 (age=30 passes) |
| 360 | + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 30], op=INSERT, meta=()}", |
| 361 | + // UPDATE id=1 (30->40): before=Y, after=Y -> UPDATE |
| 362 | + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[1, Alice, 30], after=[1, Alice, 40], op=UPDATE, meta=()}", |
| 363 | + // INSERT id=2 (age=30 passes) |
| 364 | + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 30], op=INSERT, meta=()}", |
| 365 | + // UPDATE id=2 (30->20): before=Y, after=N -> DELETE |
| 366 | + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 30], after=[], op=DELETE, meta=()}", |
| 367 | + // UPDATE id=3 (20->35): before=N, after=Y -> INSERT |
| 368 | + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Carol, 35], op=INSERT, meta=()}"); |
| 369 | + // INSERT id=3 (age=20 fails), INSERT id=4 (age=10 fails), |
| 370 | + // UPDATE id=4 (10->15, both fail) are all filtered out. |
| 371 | + } |
| 372 | + |
244 | 373 | /** |
245 | 374 | * This tests if transform rule could be used to classify source records based on filtering |
246 | 375 | * rules. |
@@ -2507,7 +2636,6 @@ void testTransformErrorMessage() { |
2507 | 2636 | + "to schema\n" |
2508 | 2637 | + "\t(Unknown).") |
2509 | 2638 | .rootCause() |
2510 | | - .isExactlyInstanceOf(SqlValidatorException.class) |
2511 | 2639 | .hasMessage("Column 'id1' not found in any table"); |
2512 | 2640 |
|
2513 | 2641 | // Unexpected column in filter rule |
|
0 commit comments