|
19 | 19 | package org.apache.seatunnel.connectors.seatunnel.jdbc;
|
20 | 20 |
|
21 | 21 | import org.apache.seatunnel.shade.com.google.common.collect.Lists;
|
| 22 | +import org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyConnection; |
22 | 23 |
|
23 | 24 | import org.apache.seatunnel.api.configuration.ReadonlyConfig;
|
24 | 25 | import org.apache.seatunnel.api.table.catalog.CatalogTable;
|
|
33 | 34 | import org.apache.seatunnel.common.utils.ReflectionUtils;
|
34 | 35 | import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
|
35 | 36 | import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
|
| 37 | +import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager; |
36 | 38 | import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink;
|
37 | 39 | import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkFactory;
|
38 | 40 | import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter;
|
@@ -459,6 +461,7 @@ private String getUrl() {
|
459 | 461 | public void parametersTest() throws Exception {
|
460 | 462 | defaultSinkParametersTest();
|
461 | 463 | defaultSourceParametersTest();
|
| 464 | + defaultMultiSinkParametersTest(); |
462 | 465 | }
|
463 | 466 |
|
464 | 467 | void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFoundException {
|
@@ -546,6 +549,118 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound
|
546 | 549 | Assertions.assertEquals(connectionProperties4.get("rewriteBatchedStatements"), "false");
|
547 | 550 | }
|
548 | 551 |
|
| 552 | + void defaultMultiSinkParametersTest() throws IOException, SQLException, ClassNotFoundException { |
| 553 | + TableSchema tableSchema = |
| 554 | + TableSchema.builder() |
| 555 | + .column( |
| 556 | + PhysicalColumn.of( |
| 557 | + "c_bigint", |
| 558 | + BasicType.LONG_TYPE, |
| 559 | + 22, |
| 560 | + false, |
| 561 | + null, |
| 562 | + "c_bigint")) |
| 563 | + .build(); |
| 564 | + CatalogTable catalogTable = |
| 565 | + CatalogTable.of( |
| 566 | + TableIdentifier.of("test_catalog", "seatunnel", "source"), |
| 567 | + tableSchema, |
| 568 | + new HashMap<>(), |
| 569 | + new ArrayList<>(), |
| 570 | + "User table"); |
| 571 | + |
| 572 | + // case1 url not contains parameters and properties not contains parameters |
| 573 | + Map<String, Object> map1 = getDefaultConfigMap(); |
| 574 | + map1.put("url", getUrl()); |
| 575 | + ReadonlyConfig config1 = ReadonlyConfig.fromMap(map1); |
| 576 | + TableSinkFactoryContext context1 = |
| 577 | + TableSinkFactoryContext.replacePlaceholderAndCreate( |
| 578 | + catalogTable, |
| 579 | + config1, |
| 580 | + Thread.currentThread().getContextClassLoader(), |
| 581 | + Collections.emptyList()); |
| 582 | + JdbcSink jdbcSink1 = (JdbcSink) new JdbcSinkFactory().createSink(context1).createSink(); |
| 583 | + JdbcMultiTableResourceManager multiTableResourceManager1 = |
| 584 | + (JdbcMultiTableResourceManager) |
| 585 | + jdbcSink1.createWriter(null).initMultiTableResourceManager(1, 1); |
| 586 | + Properties connectionProperties1 = getMultiSinkProperties(multiTableResourceManager1); |
| 587 | + Assertions.assertEquals(connectionProperties1.get("rewriteBatchedStatements"), "true"); |
| 588 | + |
| 589 | + // case2 url contains parameters and properties not contains parameters |
| 590 | + Map<String, Object> map2 = getDefaultConfigMap(); |
| 591 | + map2.put("url", getUrl() + "?rewriteBatchedStatements=false"); |
| 592 | + ReadonlyConfig config2 = ReadonlyConfig.fromMap(map2); |
| 593 | + TableSinkFactoryContext context2 = |
| 594 | + TableSinkFactoryContext.replacePlaceholderAndCreate( |
| 595 | + catalogTable, |
| 596 | + config2, |
| 597 | + Thread.currentThread().getContextClassLoader(), |
| 598 | + Collections.emptyList()); |
| 599 | + JdbcSink jdbcSink2 = (JdbcSink) new JdbcSinkFactory().createSink(context2).createSink(); |
| 600 | + JdbcMultiTableResourceManager multiTableResourceManager2 = |
| 601 | + (JdbcMultiTableResourceManager) |
| 602 | + jdbcSink2.createWriter(null).initMultiTableResourceManager(1, 1); |
| 603 | + Properties connectionProperties2 = getMultiSinkProperties(multiTableResourceManager2); |
| 604 | + Assertions.assertEquals(connectionProperties2.get("rewriteBatchedStatements"), "false"); |
| 605 | + |
| 606 | + // case3 url not contains parameters and properties not contains parameters |
| 607 | + Map<String, Object> map3 = getDefaultConfigMap(); |
| 608 | + Map<String, String> properties3 = new HashMap<>(); |
| 609 | + properties3.put("rewriteBatchedStatements", "false"); |
| 610 | + map3.put("properties", properties3); |
| 611 | + map3.put("url", getUrl()); |
| 612 | + ReadonlyConfig config3 = ReadonlyConfig.fromMap(map3); |
| 613 | + TableSinkFactoryContext context3 = |
| 614 | + TableSinkFactoryContext.replacePlaceholderAndCreate( |
| 615 | + catalogTable, |
| 616 | + config3, |
| 617 | + Thread.currentThread().getContextClassLoader(), |
| 618 | + Collections.emptyList()); |
| 619 | + JdbcSink jdbcSink3 = (JdbcSink) new JdbcSinkFactory().createSink(context3).createSink(); |
| 620 | + JdbcMultiTableResourceManager multiTableResourceManager3 = |
| 621 | + (JdbcMultiTableResourceManager) |
| 622 | + jdbcSink3.createWriter(null).initMultiTableResourceManager(1, 1); |
| 623 | + Properties connectionProperties3 = getMultiSinkProperties(multiTableResourceManager3); |
| 624 | + Assertions.assertEquals(connectionProperties3.get("rewriteBatchedStatements"), "false"); |
| 625 | + |
| 626 | + // case4 url contains parameters and properties contains parameters |
| 627 | + Map<String, Object> map4 = getDefaultConfigMap(); |
| 628 | + Map<String, String> properties4 = new HashMap<>(); |
| 629 | + properties4.put("useSSL", "true"); |
| 630 | + properties4.put("rewriteBatchedStatements", "false"); |
| 631 | + map4.put("properties", properties4); |
| 632 | + map4.put("url", getUrl() + "?useSSL=false&rewriteBatchedStatements=true"); |
| 633 | + ReadonlyConfig config4 = ReadonlyConfig.fromMap(map4); |
| 634 | + TableSinkFactoryContext context4 = |
| 635 | + TableSinkFactoryContext.replacePlaceholderAndCreate( |
| 636 | + catalogTable, |
| 637 | + config4, |
| 638 | + Thread.currentThread().getContextClassLoader(), |
| 639 | + Collections.emptyList()); |
| 640 | + JdbcSink jdbcSink4 = (JdbcSink) new JdbcSinkFactory().createSink(context4).createSink(); |
| 641 | + JdbcMultiTableResourceManager multiTableResourceManager4 = |
| 642 | + (JdbcMultiTableResourceManager) |
| 643 | + jdbcSink4.createWriter(null).initMultiTableResourceManager(1, 1); |
| 644 | + Properties connectionProperties4 = getMultiSinkProperties(multiTableResourceManager4); |
| 645 | + Assertions.assertEquals(connectionProperties4.get("useSSL"), "true"); |
| 646 | + Assertions.assertEquals(connectionProperties4.get("rewriteBatchedStatements"), "false"); |
| 647 | + } |
| 648 | + |
| 649 | + private Properties getMultiSinkProperties( |
| 650 | + JdbcMultiTableResourceManager multiTableResourceManager) throws SQLException { |
| 651 | + HikariProxyConnection hikariProxyConnection = |
| 652 | + (HikariProxyConnection) |
| 653 | + multiTableResourceManager |
| 654 | + .getSharedResource() |
| 655 | + .get() |
| 656 | + .getConnectionPool() |
| 657 | + .getConnection(); |
| 658 | + Properties connectionProperties = |
| 659 | + ((ConnectionImpl) ReflectionUtils.getField(hikariProxyConnection, "delegate").get()) |
| 660 | + .getProperties(); |
| 661 | + return connectionProperties; |
| 662 | + } |
| 663 | + |
549 | 664 | void defaultSourceParametersTest() throws Exception {
|
550 | 665 | // case1 url not contains parameters and properties not contains parameters
|
551 | 666 | Map<String, Object> map1 = getDefaultConfigMap();
|
|
0 commit comments