Skip to content

Commit ca4a65f

Browse files
authored
feat: jdbc support copy in statement. (#6443)
1 parent cf45d1d commit ca4a65f

File tree

13 files changed

+403
-8
lines changed

13 files changed

+403
-8
lines changed

docs/en/connector-v2/sink/Jdbc.md

+7
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
5656
| data_save_mode | Enum | No | APPEND_DATA |
5757
| custom_sql | String | No | - |
5858
| enable_upsert | Boolean | No | true |
59+
| use_copy_statement | Boolean | No | false |
5960

6061
### driver [string]
6162

@@ -197,6 +198,12 @@ When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL
197198

198199
Enable upsert by primary_keys exist, If the task has no key duplicate data, setting this parameter to `false` can speed up data import
199200

201+
### use_copy_statement [boolean]
202+
203+
Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with `getCopyAPI()` method connections are supported. e.g.: Postgresql driver `org.postgresql.Driver`.
204+
205+
NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.
206+
200207
## tips
201208

202209
In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup :

pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
<commons-lang3.version>3.5</commons-lang3.version>
105105
<commons-io.version>2.11.0</commons-io.version>
106106
<commons-collections4.version>4.4</commons-collections4.version>
107+
<commons-csv.version>1.10.0</commons-csv.version>
107108
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
108109
<protostuff.version>1.8.0</protostuff.version>
109110
<spark.scope>provided</spark.scope>
@@ -329,6 +330,11 @@
329330
<artifactId>commons-collections4</artifactId>
330331
<version>${commons-collections4.version}</version>
331332
</dependency>
333+
<dependency>
334+
<groupId>org.apache.commons</groupId>
335+
<artifactId>commons-csv</artifactId>
336+
<version>${commons-csv.version}</version>
337+
</dependency>
332338

333339
<dependency>
334340
<groupId>com.beust</groupId>

seatunnel-common/pom.xml

+4-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444
<groupId>org.apache.commons</groupId>
4545
<artifactId>commons-collections4</artifactId>
4646
</dependency>
47+
<dependency>
48+
<groupId>org.apache.commons</groupId>
49+
<artifactId>commons-csv</artifactId>
50+
</dependency>
4751

4852
<dependency>
4953
<groupId>org.apache.seatunnel</groupId>
@@ -63,7 +67,6 @@
6367
<groupId>commons-codec</groupId>
6468
<artifactId>commons-codec</artifactId>
6569
</dependency>
66-
6770
</dependencies>
6871

6972
</project>

seatunnel-connectors-v2/connector-jdbc/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@
192192
</dependencyManagement>
193193

194194
<dependencies>
195-
196195
<dependency>
197196
<groupId>org.apache.seatunnel</groupId>
198197
<artifactId>connector-common</artifactId>

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java

+6
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ public interface JdbcOptions {
151151
.defaultValue(false)
152152
.withDescription("support upsert by insert only");
153153

154+
Option<Boolean> USE_COPY_STATEMENT =
155+
Options.key("use_copy_statement")
156+
.booleanType()
157+
.defaultValue(false)
158+
.withDescription("support copy in statement (postgresql)");
159+
154160
/** source config */
155161
Option<String> PARTITION_COLUMN =
156162
Options.key("partition_column")

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class JdbcSinkConfig implements Serializable {
4343
private boolean enableUpsert;
4444
@Builder.Default private boolean isPrimaryKeyUpdated = true;
4545
private boolean supportUpsertByInsertOnly;
46+
private boolean useCopyStatement;
4647

4748
public static JdbcSinkConfig of(ReadonlyConfig config) {
4849
JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
@@ -55,6 +56,7 @@ public static JdbcSinkConfig of(ReadonlyConfig config) {
5556
builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED));
5657
builder.supportUpsertByInsertOnly(config.get(SUPPORT_UPSERT_BY_INSERT_ONLY));
5758
builder.simpleSql(config.get(JdbcOptions.QUERY));
59+
builder.useCopyStatement(config.get(JdbcOptions.USE_COPY_STATEMENT));
5860
return builder.build();
5961
}
6062
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public enum JdbcConnectorErrorCode implements SeaTunnelErrorCode {
2929
"JDBC-05", "transaction operation failed, such as (commit, rollback) etc.."),
3030
NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory found"),
3131
DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"),
32-
KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication failed");
32+
KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication failed"),
33+
NO_SUPPORT_OPERATION_FAILED("JDBC-09", "The jdbc driver not support operation.");
3334
private final String code;
3435

3536
private final String description;

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
2828
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
2929
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
30+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor;
3031
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
3132
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
3233
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
@@ -63,7 +64,13 @@ public JdbcOutputFormat build() {
6364
jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable()));
6465

6566
final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
66-
if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
67+
if (jdbcSinkConfig.isUseCopyStatement()) {
68+
statementExecutorFactory =
69+
() ->
70+
createCopyInBufferStatementExecutor(
71+
createCopyInBatchStatementExecutor(
72+
dialect, table, tableSchema));
73+
} else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
6774
statementExecutorFactory =
6875
() ->
6976
createSimpleBufferedExecutor(
@@ -185,6 +192,22 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(
185192
dialect, database, table, tableSchema, pkNames, isPrimaryKeyUpdated);
186193
}
187194

195+
private static JdbcBatchStatementExecutor<SeaTunnelRow> createCopyInBufferStatementExecutor(
196+
CopyManagerBatchStatementExecutor copyManagerBatchStatementExecutor) {
197+
return new BufferedBatchStatementExecutor(
198+
copyManagerBatchStatementExecutor, Function.identity());
199+
}
200+
201+
private static CopyManagerBatchStatementExecutor createCopyInBatchStatementExecutor(
202+
JdbcDialect dialect, String table, TableSchema tableSchema) {
203+
String columns =
204+
Arrays.stream(tableSchema.getFieldNames())
205+
.map(dialect::quoteIdentifier)
206+
.collect(Collectors.joining(",", "(", ")"));
207+
String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV", table, columns);
208+
return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema);
209+
}
210+
188211
private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(
189212
JdbcDialect dialect, String database, String table, TableSchema tableSchema) {
190213

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
18+
19+
import org.apache.seatunnel.api.table.catalog.TableSchema;
20+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
24+
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
25+
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
26+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
27+
28+
import org.apache.commons.csv.CSVFormat;
29+
import org.apache.commons.csv.CSVPrinter;
30+
31+
import java.io.IOException;
32+
import java.io.StringReader;
33+
import java.lang.reflect.InvocationTargetException;
34+
import java.math.BigDecimal;
35+
import java.sql.Connection;
36+
import java.sql.SQLException;
37+
import java.time.LocalDate;
38+
import java.time.LocalDateTime;
39+
import java.time.LocalTime;
40+
import java.util.ArrayList;
41+
import java.util.List;
42+
43+
public class CopyManagerBatchStatementExecutor implements JdbcBatchStatementExecutor<SeaTunnelRow> {
44+
45+
private final String copySql;
46+
private final TableSchema tableSchema;
47+
CopyManagerProxy copyManagerProxy;
48+
CSVFormat csvFormat = CSVFormat.POSTGRESQL_CSV;
49+
CSVPrinter csvPrinter;
50+
51+
public CopyManagerBatchStatementExecutor(String copySql, TableSchema tableSchema) {
52+
this.copySql = copySql;
53+
this.tableSchema = tableSchema;
54+
}
55+
56+
public static void copyManagerProxyChecked(JdbcConnectionProvider connectionProvider) {
57+
try (Connection connection = connectionProvider.getConnection()) {
58+
new CopyManagerProxy(connection);
59+
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
60+
throw new JdbcConnectorException(
61+
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
62+
"unable to open CopyManager Operation in this JDBC writer. Please configure option use_copy_statement = false.",
63+
e);
64+
} catch (SQLException e) {
65+
throw new JdbcConnectorException(
66+
JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to open JDBC writer", e);
67+
}
68+
}
69+
70+
@Override
71+
public void prepareStatements(Connection connection) throws SQLException {
72+
try {
73+
this.copyManagerProxy = new CopyManagerProxy(connection);
74+
this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat);
75+
} catch (NoSuchMethodException
76+
| IllegalAccessException
77+
| InvocationTargetException
78+
| IOException e) {
79+
throw new JdbcConnectorException(
80+
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
81+
"unable to open CopyManager Operation in this JDBC writer. Please configure option use_copy_statement = false.",
82+
e);
83+
} catch (SQLException e) {
84+
throw new JdbcConnectorException(
85+
JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to open JDBC writer", e);
86+
}
87+
}
88+
89+
@Override
90+
public void addToBatch(SeaTunnelRow record) throws SQLException {
91+
try {
92+
this.csvPrinter.printRecord(toExtract(record));
93+
} catch (IOException e) {
94+
throw new RuntimeException(e);
95+
}
96+
}
97+
98+
private List<Object> toExtract(SeaTunnelRow record) {
99+
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
100+
List<Object> csvRecord = new ArrayList<>();
101+
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
102+
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
103+
Object fieldValue = record.getField(fieldIndex);
104+
if (fieldValue == null) {
105+
csvRecord.add(null);
106+
continue;
107+
}
108+
switch (seaTunnelDataType.getSqlType()) {
109+
case STRING:
110+
csvRecord.add((String) record.getField(fieldIndex));
111+
break;
112+
case BOOLEAN:
113+
csvRecord.add((Boolean) record.getField(fieldIndex));
114+
break;
115+
case TINYINT:
116+
csvRecord.add((Byte) record.getField(fieldIndex));
117+
break;
118+
case SMALLINT:
119+
csvRecord.add((Short) record.getField(fieldIndex));
120+
break;
121+
case INT:
122+
csvRecord.add((Integer) record.getField(fieldIndex));
123+
break;
124+
case BIGINT:
125+
csvRecord.add((Long) record.getField(fieldIndex));
126+
break;
127+
case FLOAT:
128+
csvRecord.add((Float) record.getField(fieldIndex));
129+
break;
130+
case DOUBLE:
131+
csvRecord.add((Double) record.getField(fieldIndex));
132+
break;
133+
case DECIMAL:
134+
csvRecord.add((BigDecimal) record.getField(fieldIndex));
135+
break;
136+
case DATE:
137+
LocalDate localDate = (LocalDate) record.getField(fieldIndex);
138+
csvRecord.add((java.sql.Date) java.sql.Date.valueOf(localDate));
139+
break;
140+
case TIME:
141+
LocalTime localTime = (LocalTime) record.getField(fieldIndex);
142+
csvRecord.add((java.sql.Time) java.sql.Time.valueOf(localTime));
143+
break;
144+
case TIMESTAMP:
145+
LocalDateTime localDateTime = (LocalDateTime) record.getField(fieldIndex);
146+
csvRecord.add((java.sql.Timestamp) java.sql.Timestamp.valueOf(localDateTime));
147+
break;
148+
case BYTES:
149+
csvRecord.add(
150+
org.apache.commons.codec.binary.Base64.encodeBase64String(
151+
(byte[]) record.getField(fieldIndex)));
152+
break;
153+
case NULL:
154+
csvRecord.add(null);
155+
break;
156+
case MAP:
157+
case ARRAY:
158+
case ROW:
159+
default:
160+
throw new JdbcConnectorException(
161+
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
162+
"Unexpected value: " + seaTunnelDataType);
163+
}
164+
}
165+
return csvRecord;
166+
}
167+
168+
@Override
169+
public void executeBatch() throws SQLException {
170+
try {
171+
this.csvPrinter.flush();
172+
this.copyManagerProxy.doCopy(
173+
copySql, new StringReader(this.csvPrinter.getOut().toString()));
174+
} catch (InvocationTargetException | IllegalAccessException | IOException e) {
175+
throw new JdbcConnectorException(
176+
CommonErrorCodeDeprecated.SQL_OPERATION_FAILED, "Sql command: " + copySql);
177+
} finally {
178+
try {
179+
this.csvPrinter.close();
180+
this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat);
181+
} catch (Exception ignore) {
182+
}
183+
}
184+
}
185+
186+
@Override
187+
public void closeStatements() throws SQLException {
188+
this.copyManagerProxy = null;
189+
try {
190+
this.csvPrinter.close();
191+
this.csvPrinter = null;
192+
} catch (Exception ignore) {
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)