Skip to content

Commit 2fb5095

Browse files
authored
Merge pull request #105 from avalori/mergeDB
Add a new Database consumer which merges the records to the target DB rather than truncating and inserting as the current DatabaseDataSetConsumer does
2 parents 96c1271 + aa9877e commit 2fb5095

File tree

16 files changed

+1113
-607
lines changed

16 files changed

+1113
-607
lines changed

git

Whitespace-only changes.

morf-core/src/main/java/org/alfasoftware/morf/dataset/TableLoader.java

+39-21
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616
package org.alfasoftware.morf.dataset;
1717

18+
import static java.util.stream.Collectors.toList;
19+
import static org.alfasoftware.morf.sql.SqlUtils.parameter;
20+
import static org.alfasoftware.morf.sql.SqlUtils.tableRef;
21+
1822
import java.sql.Connection;
1923
import java.sql.PreparedStatement;
2024

@@ -25,8 +29,9 @@
2529
import org.alfasoftware.morf.metadata.SchemaUtils;
2630
import org.alfasoftware.morf.metadata.Table;
2731
import org.alfasoftware.morf.sql.InsertStatement;
32+
import org.alfasoftware.morf.sql.MergeStatement;
33+
import org.alfasoftware.morf.sql.SelectStatement;
2834
import org.alfasoftware.morf.sql.element.SqlParameter;
29-
import org.alfasoftware.morf.sql.element.TableReference;
3035
import org.apache.commons.logging.Log;
3136
import org.apache.commons.logging.LogFactory;
3237

@@ -43,12 +48,12 @@ public class TableLoader {
4348
private final Connection connection;
4449
private final SqlDialect sqlDialect;
4550
private final boolean explicitCommit;
51+
private final boolean merge;
4652
private final SqlScriptExecutor sqlExecutor;
4753
private final Table table;
4854
private final boolean insertingWithPresetAutonums;
4955
private final boolean insertingUnderAutonumLimit;
5056
private final boolean truncateBeforeLoad;
51-
private final String insertStatement;
5257
private final int batchSize;
5358

5459

@@ -64,28 +69,26 @@ public static TableLoaderBuilder builder() {
6469
* Constructor.
6570
*/
6671
TableLoader(Connection connection,
67-
SqlScriptExecutor sqlScriptExecutor,
68-
SqlDialect dialect,
69-
boolean explicitCommit,
70-
Table table,
71-
boolean insertingWithPresetAutonums,
72-
boolean insertingUnderAutonumLimit,
73-
boolean truncateBeforeLoad,
74-
int batchSize) {
72+
SqlScriptExecutor sqlScriptExecutor,
73+
SqlDialect dialect,
74+
boolean explicitCommit,
75+
boolean merge,
76+
Table table,
77+
boolean insertingWithPresetAutonums,
78+
boolean insertingUnderAutonumLimit,
79+
boolean truncateBeforeLoad,
80+
int batchSize) {
7581
super();
7682
this.connection = connection;
7783
this.sqlExecutor = sqlScriptExecutor;
7884
this.sqlDialect = dialect;
7985
this.explicitCommit = explicitCommit;
86+
this.merge = merge;
8087
this.table = table;
8188
this.insertingWithPresetAutonums = insertingWithPresetAutonums;
8289
this.insertingUnderAutonumLimit = insertingUnderAutonumLimit;
8390
this.truncateBeforeLoad = truncateBeforeLoad;
8491
this.batchSize = batchSize;
85-
this.insertStatement = sqlDialect.convertStatementToSQL(
86-
new InsertStatement().into(new TableReference(table.getName())),
87-
SchemaUtils.schema(table)
88-
);
8992
}
9093

9194

@@ -100,7 +103,7 @@ public void load(final Iterable<Record> records) {
100103
truncate(table, connection);
101104
}
102105

103-
insertRecords(records);
106+
insertOrMergeRecords(records);
104107
}
105108

106109

@@ -131,13 +134,13 @@ private void truncate(Table table, Connection connection) {
131134
* @param records The records to insert
132135
* @param bulk Indicates if we should call {@link SqlDialect#preInsertStatements(Table)} and {@link SqlDialect#postInsertStatements(Table)}
133136
*/
134-
private void insertRecords(Iterable<Record> records) {
137+
private void insertOrMergeRecords(Iterable<Record> records) {
135138

136139
if (insertingWithPresetAutonums) {
137140
sqlExecutor.execute(sqlDialect.preInsertWithPresetAutonumStatements(table, insertingUnderAutonumLimit), connection);
138141
}
139142

140-
sqlInsertLoad(table, records, connection);
143+
sqlInsertOrMergeLoad(table, records, connection);
141144

142145
if (insertingWithPresetAutonums) {
143146
sqlDialect.postInsertWithPresetAutonumStatements(table, sqlExecutor, connection,insertingUnderAutonumLimit);
@@ -152,11 +155,26 @@ private void insertRecords(Iterable<Record> records) {
152155
* @param records The data to insert.
153156
* @param connection The connection.
154157
*/
155-
private void sqlInsertLoad(Table table, Iterable<Record> records, Connection connection) {
158+
private void sqlInsertOrMergeLoad(Table table, Iterable<Record> records, Connection connection) {
156159
try {
157-
sqlExecutor.executeStatementBatch(insertStatement, SqlParameter.parametersFromColumns(table.columns()), records, connection, explicitCommit, batchSize);
158-
} catch (Exception e) {
159-
throw new RuntimeException(String.format("Failure in batch insert for table [%s]", table.getName()), e);
160+
if (merge && !table.primaryKey().isEmpty()) { // if the table has no primary we don't have a merge ON criterion
161+
SelectStatement selectStatement = SelectStatement.select()
162+
.fields(table.columns().stream().filter(c -> !c.isAutoNumbered()).map(c -> parameter(c)).collect(toList()))
163+
.build();
164+
MergeStatement mergeStatement = MergeStatement.merge()
165+
.from(selectStatement)
166+
.into(tableRef(table.getName()))
167+
.tableUniqueKey(table.primaryKey().stream().map(c -> parameter(c)).collect(toList()))
168+
.build();
169+
String mergeSQL = sqlDialect.convertStatementToSQL(mergeStatement);
170+
sqlExecutor.executeStatementBatch(mergeSQL, SqlParameter.parametersFromColumns(table.columns()), records, connection, explicitCommit, batchSize);
171+
} else {
172+
InsertStatement insertStatement = InsertStatement.insert().into(tableRef(table.getName())).build();
173+
String insertSQL = sqlDialect.convertStatementToSQL(insertStatement, SchemaUtils.schema(table));
174+
sqlExecutor.executeStatementBatch(insertSQL, SqlParameter.parametersFromColumns(table.columns()), records, connection, explicitCommit, batchSize);
175+
}
176+
} catch (Exception exceptionOnBatch) {
177+
throw new RuntimeException(String.format("Failure in batch insert for table [%s]", table.getName()), exceptionOnBatch);
160178
}
161179
}
162180
}

morf-core/src/main/java/org/alfasoftware/morf/dataset/TableLoaderBuilder.java

+27-9
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,16 @@ public interface TableLoaderBuilder {
8282
*/
8383
TableLoaderBuilder explicitCommit(boolean explicitCommit);
8484

85+
86+
/**
87+
* Should we merge records rather than inserting?
88+
*
89+
* <p>Merging will overwrite records matched by primary key with the new data.</p>
90+
*
91+
* <p>Defaults to false if not specified.</p>
92+
*/
93+
TableLoaderBuilder merge(boolean ignoreDuplicateKeys);
94+
8595
/**
8696
* Should the table be truncated before the load?
8797
*
@@ -142,6 +152,7 @@ class TableLoaderBuilderImpl implements TableLoaderBuilder {
142152
private boolean insertingUnderAutonumLimit;
143153
private Provider<SqlDialect> sqlDialect;
144154
private int batchSize = 1000;
155+
private boolean merge;
145156

146157
TableLoaderBuilderImpl() {
147158
super();
@@ -188,6 +199,12 @@ public TableLoaderBuilder explicitCommit(boolean explicitCommit) {
188199
return this;
189200
}
190201

202+
@Override
203+
public TableLoaderBuilder merge(boolean merge) {
204+
this.merge = merge;
205+
return this;
206+
}
207+
191208
@Override
192209
public TableLoaderBuilder truncateBeforeLoad() {
193210
this.truncateBeforeLoad = true;
@@ -222,15 +239,16 @@ public TableLoader forTable(Table table) {
222239
executor = sqlScriptExecutorProvider.get();
223240
}
224241
return new TableLoader(
225-
connection,
226-
executor,
227-
sqlDialect.get(),
228-
explicitCommit,
229-
table,
230-
insertingWithPresetAutonums,
231-
insertingUnderAutonumLimit,
232-
truncateBeforeLoad,
233-
batchSize);
242+
connection,
243+
executor,
244+
sqlDialect.get(),
245+
explicitCommit,
246+
merge,
247+
table,
248+
insertingWithPresetAutonums,
249+
insertingUnderAutonumLimit,
250+
truncateBeforeLoad,
251+
batchSize);
234252
}
235253
}
236254
}

morf-core/src/main/java/org/alfasoftware/morf/jdbc/DatabaseDataSetConsumer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ public class DatabaseDataSetConsumer implements DataSetConsumer {
4848
/**
4949
* Database connection to which the data is written.
5050
*/
51-
private Connection connection;
51+
protected Connection connection;
5252

5353
/**
5454
* The SQL dialect
5555
*/
56-
private SqlDialect sqlDialect;
56+
protected SqlDialect sqlDialect;
5757

5858
/**
5959
* Used to run SQL statements.
6060
*/
61-
private final SqlScriptExecutor sqlExecutor;
61+
protected final SqlScriptExecutor sqlExecutor;
6262

6363
/**
6464
* DataSource providing database connections.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/* Copyright 2020 Alfa Financial Software
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.alfasoftware.morf.jdbc;
17+
18+
import javax.sql.DataSource;
19+
20+
import org.alfasoftware.morf.dataset.DataSetConsumer;
21+
import org.alfasoftware.morf.dataset.Record;
22+
import org.alfasoftware.morf.dataset.TableLoader;
23+
import org.alfasoftware.morf.metadata.Table;
24+
25+
import com.google.inject.Inject;
26+
27+
28+
/**
29+
* Implementation of {@link DataSetConsumer} that pipes records into a JDBC
30+
* database connection without truncating tables and merging duplicate records.
31+
*
32+
*
33+
* @author Copyright (c) Alfa Financial Software 2020
34+
*/
35+
public class MergingDatabaseDataSetConsumer extends DatabaseDataSetConsumer implements DataSetConsumer {
36+
37+
38+
/**
39+
* Creates an instance of the database data set consumer.
40+
*
41+
* @param connectionResources Provides database connection interfaces.
42+
* @param sqlScriptExecutorProvider a provider of {@link SqlScriptExecutor}s.
43+
*/
44+
public MergingDatabaseDataSetConsumer(ConnectionResources connectionResources, SqlScriptExecutorProvider sqlScriptExecutorProvider) {
45+
this(connectionResources, sqlScriptExecutorProvider, connectionResources.getDataSource());
46+
}
47+
48+
49+
/**
50+
* Creates an instance of the database data set consumer.
51+
*
52+
* <p>Can also be injected from Guice.</p>
53+
*
54+
* @param connectionResources Provides database connection interfaces.
55+
* @param sqlScriptExecutorProvider a provider of {@link SqlScriptExecutor}s.
56+
* @param dataSource DataSource providing database connections.
57+
*/
58+
@Inject
59+
public MergingDatabaseDataSetConsumer(ConnectionResources connectionResources, SqlScriptExecutorProvider sqlScriptExecutorProvider, DataSource dataSource) {
60+
super(connectionResources, sqlScriptExecutorProvider, dataSource);
61+
}
62+
63+
64+
/**
65+
* @see DataSetConsumer#table(Table, Iterable)
66+
*/
67+
@Override
68+
public void table(Table table, Iterable<Record> records) {
69+
TableLoader.builder()
70+
.withConnection(connection)
71+
.withSqlScriptExecutor(sqlExecutor)
72+
.withDialect(sqlDialect)
73+
.explicitCommit(true)
74+
.merge(true)
75+
.insertingWithPresetAutonums()
76+
.forTable(table)
77+
.load(records);
78+
}
79+
}

0 commit comments

Comments
 (0)