diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java index 3ca5ce82..dbfbe67c 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java @@ -328,6 +328,7 @@ String buildInsertStatement( * Build an INSERT statement for multiple rows. * * @param table the identifier of the table; may not be null + * @param tableDefinition the table definition; may be null if unknown * @param records number of rows which will be inserted; must be a positive number * @param keyColumns the identifiers of the columns in the primary/unique key; may not be null * but may be empty @@ -337,6 +338,7 @@ String buildInsertStatement( */ String buildMultiInsertStatement( TableId table, + TableDefinition tableDefinition, int records, Collection keyColumns, Collection nonKeyColumns diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java index a1607aa1..a59b84c7 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -1367,6 +1367,7 @@ public String buildInsertStatement( @Override public String buildMultiInsertStatement(final TableId table, + final TableDefinition tableDefinition, final int records, final Collection keyColumns, final Collection nonKeyColumns) { diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index ae0c0448..19210d3f 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -49,6 +49,8 @@ import io.aiven.connect.jdbc.util.TableDefinition; import io.aiven.connect.jdbc.util.TableId; +import static java.util.stream.IntStream.range; + /** * A {@link DatabaseDialect} for PostgreSQL. */ @@ -325,6 +327,44 @@ public String buildInsertStatement(final TableId table, .toString(); } + @Override + public String buildMultiInsertStatement(final TableId table, + final TableDefinition tableDefinition, + final int records, + final Collection keyColumns, + final Collection nonKeyColumns) { + + if (records < 1) { + throw new IllegalArgumentException("number of records must be a positive number, but got: " + records); + } + + final String insertStatement = expressionBuilder() + .append("INSERT INTO ") + .append(table) + .append("(") + .appendList() + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNames()) + .of(keyColumns, nonKeyColumns) + .append(") VALUES") + .toString(); + + final String singleRowPlaceholder = expressionBuilder() + .append("(") + .appendList() + .delimitedBy(",") + .transformedBy(transformColumn(tableDefinition)) + .of(keyColumns, nonKeyColumns) + .append(")") + .toString(); + + final String allRowsPlaceholder = range(1, records + 1) + .mapToObj(i -> singleRowPlaceholder) + .collect(Collectors.joining(",")); + + return insertStatement + allRowsPlaceholder; + } + @Override public String buildUpdateStatement(final TableId table, final TableDefinition tableDefinition, diff --git a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java index 305db362..d916d9f3 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java @@ -319,6 +319,7 @@ private String getMultiInsertSql() { try { return dbDialect.buildMultiInsertStatement( tableId, + tableDefinition, records.size(), asColumns(fieldsMetadata.keyFieldNames), asColumns(fieldsMetadata.nonKeyFieldNames) diff --git a/src/test/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index 5db9ff35..f13467c8 100644 --- a/src/test/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/aiven/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -225,6 +225,42 @@ public void shouldBuildInsertStatement() { assertQueryEquals(expected, actual); } + @Test + public void shouldBuildMultiInsertStatementMultipleRecords() { + final String expected = readQueryResourceForThisTest("multi_2_insert0"); + final String actual = dialect.buildMultiInsertStatement( + castTypesTableId, + castTypesTableDefinition, + 2, + List.of(castTypesPkColumn), + List.of(columnUuid, columnJson, columnJsonb)); + assertQueryEquals(expected, actual); + } + + @Test + public void shouldBuildMultiInsertStatementSingleRecord() { + final String expected = readQueryResourceForThisTest("multi_1_insert0"); + final String actual = dialect.buildMultiInsertStatement( + castTypesTableId, + castTypesTableDefinition, + 1, + List.of(castTypesPkColumn), + List.of(columnUuid, columnJson, columnJsonb)); + assertQueryEquals(expected, actual); + } + + @Test + public void shouldBuildMultiInsertStatementZeroRecords() { + assertThatThrownBy(() -> dialect.buildMultiInsertStatement( + castTypesTableId, + castTypesTableDefinition, + 0, + List.of(castTypesPkColumn), + List.of(columnUuid, columnJson, columnJsonb))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("number of records must be a positive number, but got: 0"); + } + @Test public void shouldBuildUpdateStatement() { final String expected = readQueryResourceForThisTest("update0"); diff --git a/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_1_insert0-nonquoted.txt b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_1_insert0-nonquoted.txt new file mode 100644 index 00000000..a547db74 --- /dev/null +++ b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_1_insert0-nonquoted.txt @@ -0,0 +1 @@ +INSERT INTO cast_types_table(pk,uuid_col,json_col,jsonb_col) VALUES(?,?::uuid,?::json,?::jsonb) \ No newline at end of file diff --git a/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_1_insert0-quoted.txt b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_1_insert0-quoted.txt new file mode 100644 index 00000000..928e061a --- /dev/null +++ b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_1_insert0-quoted.txt @@ -0,0 +1 @@ +INSERT INTO "cast_types_table"("pk","uuid_col","json_col","jsonb_col") VALUES(?,?::uuid,?::json,?::jsonb) \ No newline at end of file diff --git a/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_2_insert0-nonquoted.txt b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_2_insert0-nonquoted.txt new file mode 100644 index 00000000..66306776 --- /dev/null +++ b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_2_insert0-nonquoted.txt @@ -0,0 +1 @@ +INSERT INTO cast_types_table(pk,uuid_col,json_col,jsonb_col) VALUES(?,?::uuid,?::json,?::jsonb),(?,?::uuid,?::json,?::jsonb) \ No newline at end of file diff --git a/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_2_insert0-quoted.txt b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_2_insert0-quoted.txt new file mode 100644 index 00000000..4edce224 --- /dev/null +++ b/src/test/resources/io.aiven.connect.jdbc.dialect/PostgreSqlDatabaseDialectTest/multi_2_insert0-quoted.txt @@ -0,0 +1 @@ +INSERT INTO "cast_types_table"("pk","uuid_col","json_col","jsonb_col") VALUES(?,?::uuid,?::json,?::jsonb),(?,?::uuid,?::json,?::jsonb) \ No newline at end of file