Skip to content

[Fix][JDBC] fix jdbc default connection parameter invalid #8185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public void jettyShadeCheck() {
log.info("check jetty shade successfully");
}

@Test
public void hikariShadeCheck() {
Map<String, List<String>> errorMap =
checkImportClassPrefixWithAll(Collections.singletonList("com.zaxxer.hikari"));
Assertions.assertEquals(0, errorMap.size(), shadeErrorMsg("hikari", errorMap));
log.info("check hikari shade successfully");
}

@Test
public void janinoShadeCheck() {
Map<String, List<String>> errorMap =
Expand Down
17 changes: 7 additions & 10 deletions seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
Expand Down Expand Up @@ -98,10 +92,6 @@
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
Expand All @@ -116,6 +106,13 @@
<version>${commons-lang3.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hikari</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.seatunnel.connectors.cdc.base.relational.connection;

import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zaxxer.hikari.HikariDataSource;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.seatunnel.connectors.cdc.base.relational.connection;

import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariConfig;
import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;

/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public abstract class JdbcConnectionPoolFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.seatunnel.connectors.cdc.base.relational.connection;

import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;

import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zaxxer.hikari.HikariDataSource;

import java.util.HashMap;
import java.util.Map;

Expand Down
46 changes: 4 additions & 42 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,10 @@
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikari.version}</version>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hikari</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
</dependency>

<dependency>
Expand Down Expand Up @@ -353,43 +354,4 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<createSourcesJar>false</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<!-- rename hikari to avoid jar conflict from spark -->
<relocation>
<pattern>com.zaxxer.hikari</pattern>
<shadedPattern>${seatunnel.shade.package}.com.zaxxer.hikari</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import com.zaxxer.hikari.HikariDataSource;
import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;

import lombok.Getter;

import java.sql.Connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;

import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
Expand All @@ -31,7 +33,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;

import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand Down Expand Up @@ -88,6 +89,7 @@ public MultiTableResourceManager<ConnectionPoolManager> initMultiTableResourceMa
ds.setPassword(jdbcSinkConfig.getJdbcConnectionConfig().getPassword().get());
}
ds.setAutoCommit(jdbcSinkConfig.getJdbcConnectionConfig().isAutoCommit());
jdbcSinkConfig.getJdbcConnectionConfig().getProperties().forEach(ds::addDataSourceProperty);
return new JdbcMultiTableResourceManager(new ConnectionPoolManager(ds));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.zaxxer.hikari.pool.HikariProxyConnection;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand All @@ -33,6 +34,7 @@
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcMultiTableResourceManager;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter;
Expand Down Expand Up @@ -459,6 +461,7 @@ private String getUrl() {
public void parametersTest() throws Exception {
defaultSinkParametersTest();
defaultSourceParametersTest();
defaultMultiSinkParametersTest();
}

void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFoundException {
Expand Down Expand Up @@ -546,6 +549,118 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound
Assertions.assertEquals(connectionProperties4.get("rewriteBatchedStatements"), "false");
}

void defaultMultiSinkParametersTest() throws IOException, SQLException, ClassNotFoundException {
TableSchema tableSchema =
TableSchema.builder()
.column(
PhysicalColumn.of(
"c_bigint",
BasicType.LONG_TYPE,
22,
false,
null,
"c_bigint"))
.build();
CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("test_catalog", "seatunnel", "source"),
tableSchema,
new HashMap<>(),
new ArrayList<>(),
"User table");

// case1 url not contains parameters and properties not contains parameters
Map<String, Object> map1 = getDefaultConfigMap();
map1.put("url", getUrl());
ReadonlyConfig config1 = ReadonlyConfig.fromMap(map1);
TableSinkFactoryContext context1 =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
config1,
Thread.currentThread().getContextClassLoader(),
Collections.emptyList());
JdbcSink jdbcSink1 = (JdbcSink) new JdbcSinkFactory().createSink(context1).createSink();
JdbcMultiTableResourceManager multiTableResourceManager1 =
(JdbcMultiTableResourceManager)
jdbcSink1.createWriter(null).initMultiTableResourceManager(1, 1);
Properties connectionProperties1 = getMultiSinkProperties(multiTableResourceManager1);
Assertions.assertEquals(connectionProperties1.get("rewriteBatchedStatements"), "true");

// case2 url contains parameters and properties not contains parameters
Map<String, Object> map2 = getDefaultConfigMap();
map2.put("url", getUrl() + "?rewriteBatchedStatements=false");
ReadonlyConfig config2 = ReadonlyConfig.fromMap(map2);
TableSinkFactoryContext context2 =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
config2,
Thread.currentThread().getContextClassLoader(),
Collections.emptyList());
JdbcSink jdbcSink2 = (JdbcSink) new JdbcSinkFactory().createSink(context2).createSink();
JdbcMultiTableResourceManager multiTableResourceManager2 =
(JdbcMultiTableResourceManager)
jdbcSink2.createWriter(null).initMultiTableResourceManager(1, 1);
Properties connectionProperties2 = getMultiSinkProperties(multiTableResourceManager2);
Assertions.assertEquals(connectionProperties2.get("rewriteBatchedStatements"), "false");

// case3 url not contains parameters and properties not contains parameters
Map<String, Object> map3 = getDefaultConfigMap();
Map<String, String> properties3 = new HashMap<>();
properties3.put("rewriteBatchedStatements", "false");
map3.put("properties", properties3);
map3.put("url", getUrl());
ReadonlyConfig config3 = ReadonlyConfig.fromMap(map3);
TableSinkFactoryContext context3 =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
config3,
Thread.currentThread().getContextClassLoader(),
Collections.emptyList());
JdbcSink jdbcSink3 = (JdbcSink) new JdbcSinkFactory().createSink(context3).createSink();
JdbcMultiTableResourceManager multiTableResourceManager3 =
(JdbcMultiTableResourceManager)
jdbcSink3.createWriter(null).initMultiTableResourceManager(1, 1);
Properties connectionProperties3 = getMultiSinkProperties(multiTableResourceManager3);
Assertions.assertEquals(connectionProperties3.get("rewriteBatchedStatements"), "false");

// case4 url contains parameters and properties contains parameters
Map<String, Object> map4 = getDefaultConfigMap();
Map<String, String> properties4 = new HashMap<>();
properties4.put("useSSL", "true");
properties4.put("rewriteBatchedStatements", "false");
map4.put("properties", properties4);
map4.put("url", getUrl() + "?useSSL=false&rewriteBatchedStatements=true");
ReadonlyConfig config4 = ReadonlyConfig.fromMap(map4);
TableSinkFactoryContext context4 =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
config4,
Thread.currentThread().getContextClassLoader(),
Collections.emptyList());
JdbcSink jdbcSink4 = (JdbcSink) new JdbcSinkFactory().createSink(context4).createSink();
JdbcMultiTableResourceManager multiTableResourceManager4 =
(JdbcMultiTableResourceManager)
jdbcSink4.createWriter(null).initMultiTableResourceManager(1, 1);
Properties connectionProperties4 = getMultiSinkProperties(multiTableResourceManager4);
Assertions.assertEquals(connectionProperties4.get("useSSL"), "true");
Assertions.assertEquals(connectionProperties4.get("rewriteBatchedStatements"), "false");
}

private Properties getMultiSinkProperties(
JdbcMultiTableResourceManager multiTableResourceManager) throws SQLException {
HikariProxyConnection hikariProxyConnection =
(HikariProxyConnection)
multiTableResourceManager
.getSharedResource()
.get()
.getConnectionPool()
.getConnection();
Properties connectionProperties =
((ConnectionImpl) ReflectionUtils.getField(hikariProxyConnection, "delegate").get())
.getProperties();
return connectionProperties;
}

void defaultSourceParametersTest() throws Exception {
// case1 url not contains parameters and properties not contains parameters
Map<String, Object> map1 = getDefaultConfigMap();
Expand Down
1 change: 1 addition & 0 deletions seatunnel-shade/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<module>seatunnel-jetty9-9.4.56</module>
<module>seatunnel-hadoop-aws</module>
<module>seatunnel-arrow</module>
<module>seatunnel-hikari</module>
</modules>

<build>
Expand Down
Loading
Loading