Skip to content

Commit aa2f77c

Browse files
committed
Add TRANSACTION_ISOLATION_LEVEL config in PostgreSQL plugins
1 parent cde7167 commit aa2f77c

21 files changed

+192
-16
lines changed

database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public abstract class ConnectionConfig extends PluginConfig implements DatabaseC
4545
public static final String CONNECTION_ARGUMENTS = "connectionArguments";
4646
public static final String JDBC_PLUGIN_NAME = "jdbcPluginName";
4747
public static final String JDBC_PLUGIN_TYPE = "jdbc";
48+
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
49+
4850

4951
@Name(JDBC_PLUGIN_NAME)
5052
@Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " +

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

+3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.cdap.plugin.db.TransactionIsolationLevel;
2929
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
3030
import io.cdap.plugin.db.source.AbstractDBSource;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
import java.io.IOException;
3335
import java.util.Collections;
@@ -49,6 +51,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
4951
public static final String DATABASE = "database";
5052
public static final String FETCH_SIZE = "fetchSize";
5153
public static final String DEFAULT_FETCH_SIZE = "1000";
54+
public static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificSourceConfig.class);
5255

5356
@Name(Constants.Reference.REFERENCE_NAME)
5457
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)

database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.cdap.plugin.common.KeyValueListParser;
2626
import io.cdap.plugin.common.db.DBConnectorProperties;
2727
import io.cdap.plugin.db.ConnectionConfig;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
import java.util.Collections;
3032
import java.util.HashMap;
@@ -37,6 +39,8 @@
3739
*/
3840
public abstract class AbstractDBConnectorConfig extends PluginConfig implements DBConnectorProperties {
3941

42+
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBConnectorConfig.class);
43+
4044
@Name(ConnectionConfig.JDBC_PLUGIN_NAME)
4145
@Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " +
4246
"file for the JDBC plugin.")

database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.hadoop.mapreduce.MRJobConfig;
3838
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
3939
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
4042

4143
import java.io.IOException;
4244
import java.sql.Connection;
@@ -56,6 +58,7 @@ public abstract class AbstractDBSpecificConnector<T extends DBWritable> extends
5658
implements BatchConnector<LongWritable, T> {
5759

5860
private final AbstractDBConnectorConfig config;
61+
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificConnector.class);
5962

6063
protected AbstractDBSpecificConnector(AbstractDBConnectorConfig config) {
6164
super(config);
@@ -99,6 +102,7 @@ public InputFormatProvider getInputFormatProvider(ConnectorContext context, Samp
99102
tableQuery, null, false);
100103
connectionConfigAccessor.setConnectionArguments(Maps.fromProperties(config.getConnectionArgumentsProperties()));
101104
connectionConfigAccessor.getConfiguration().setInt(MRJobConfig.NUM_MAPS, 1);
105+
LOG.debug("Moving inside AbstractDBConnectorConfig");
102106
Map<String, String> additionalArguments = config.getAdditionalArguments();
103107
for (Map.Entry<String, String> argument : additionalArguments.entrySet()) {
104108
connectionConfigAccessor.getConfiguration().set(argument.getKey(), argument.getValue());

database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnectorConfig.java

+30
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
import io.cdap.cdap.api.annotation.Macro;
2121
import io.cdap.cdap.api.annotation.Name;
2222
import io.cdap.plugin.db.ConnectionConfig;
23+
import io.cdap.plugin.db.TransactionIsolationLevel;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2326

2427
import java.util.Collections;
28+
import java.util.HashMap;
2529
import java.util.Map;
2630
import javax.annotation.Nullable;
2731

@@ -30,9 +34,11 @@
3034
*/
3135
public abstract class AbstractDBSpecificConnectorConfig extends AbstractDBConnectorConfig {
3236

37+
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificConnectorConfig.class);
3338
@Name(ConnectionConfig.HOST)
3439
@Description("Database host")
3540
@Macro
41+
3642
@Nullable
3743
protected String host;
3844

@@ -42,6 +48,12 @@ public abstract class AbstractDBSpecificConnectorConfig extends AbstractDBConnec
4248
@Nullable
4349
protected Integer port;
4450

51+
@Name(ConnectionConfig.TRANSACTION_ISOLATION_LEVEL)
52+
@Description("The transaction isolation level for the database session.")
53+
@Macro
54+
@Nullable
55+
protected String transactionIsolationLevel;
56+
4557
public String getHost() {
4658
return host;
4759
}
@@ -55,4 +67,22 @@ public int getPort() {
5567
public boolean canConnect() {
5668
return super.canConnect() && !containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT);
5769
}
70+
71+
@Override
72+
public Map<String, String> getAdditionalArguments() {
73+
Map<String, String> additonalArguments = new HashMap<>();
74+
LOG.debug("inside get AdditionalArguemnts of AbstractDBSpecificConnectorConfig");
75+
if (getTransactionIsolationLevel() != null) {
76+
additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel());
77+
}
78+
return additonalArguments;
79+
}
80+
81+
public String getTransactionIsolationLevel() {
82+
if (transactionIsolationLevel == null) {
83+
return null;
84+
}
85+
return TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
86+
}
5887
}
88+

mysql-plugin/docs/MySQL-connector.md

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ authentication. Optional for databases that do not require authentication.
2222

2323
**Password:** Password to use to connect to the specified database.
2424

25+
**Transaction Isolation Level** The transaction isolation level of the databse connection
26+
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
27+
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
28+
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
29+
- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
30+
2531
**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
2632
will be passed to the JDBC driver, as connection arguments, for JDBC drivers that may need additional configurations.
2733
This is a semicolon-separated list of key-value pairs, where each pair is separated by a equals '=' and specifies

mysql-plugin/docs/Mysql-batchsink.md

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ You also can use the macro function ${conn(connection-name)}.
3939

4040
**Password:** Password to use to connect to the specified database.
4141

42+
**Transaction Isolation Level** The transaction isolation level of the databse connection
43+
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
44+
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
45+
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
46+
- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
47+
4248
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
4349
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
4450

mysql-plugin/docs/Mysql-batchsource.md

+6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s
4949

5050
**Password:** Password to use to connect to the specified database.
5151

52+
**Transaction Isolation Level** The transaction isolation level of the databse connection
53+
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
54+
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
55+
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
56+
- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.
57+
5258
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
5359
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
5460

mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSource.java

+6
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,15 @@ public MysqlConnectorConfig getConnection() {
197197
return connection;
198198
}
199199

200+
@Override
201+
public String getTransactionIsolationLevel() {
202+
return connection.getTransactionIsolationLevel();
203+
}
204+
200205
@Override
201206
public void validate(FailureCollector collector) {
202207
ConfigUtil.validateConnection(this, useConnection, connection, collector);
208+
connection.getAdditionalArguments();
203209
super.validate(collector);
204210
}
205211

mysql-plugin/widgets/MySQL-connector.json

+14
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,20 @@
3030
"widget-attributes": {
3131
"default": "3306"
3232
}
33+
},
34+
{
35+
"widget-type": "select",
36+
"label": "Transaction Isolation Level",
37+
"name": "transactionIsolationLevel",
38+
"widget-attributes": {
39+
"values": [
40+
"TRANSACTION_READ_UNCOMMITTED",
41+
"TRANSACTION_READ_COMMITTED",
42+
"TRANSACTION_REPEATABLE_READ",
43+
"TRANSACTION_SERIALIZABLE"
44+
],
45+
"default": "TRANSACTION_SERIALIZABLE"
46+
}
3347
}
3448
]
3549
},

mysql-plugin/widgets/Mysql-batchsink.json

+18
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@
6565
"label": "Password",
6666
"name": "password"
6767
},
68+
{
69+
"widget-type": "select",
70+
"label": "Transaction Isolation Level",
71+
"name": "transactionIsolationLevel",
72+
"widget-attributes": {
73+
"values": [
74+
"TRANSACTION_READ_UNCOMMITTED",
75+
"TRANSACTION_READ_COMMITTED",
76+
"TRANSACTION_REPEATABLE_READ",
77+
"TRANSACTION_SERIALIZABLE"
78+
],
79+
"default": "TRANSACTION_SERIALIZABLE"
80+
}
81+
},
6882
{
6983
"widget-type": "keyvalue",
7084
"label": "Connection Arguments",
@@ -225,6 +239,10 @@
225239
"type": "property",
226240
"name": "password"
227241
},
242+
{
243+
"type": "property",
244+
"name": "transactionIsolationLevel"
245+
},
228246
{
229247
"type": "property",
230248
"name": "host"

mysql-plugin/widgets/Mysql-batchsource.json

+18
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@
6565
"label": "Password",
6666
"name": "password"
6767
},
68+
{
69+
"widget-type": "select",
70+
"label": "Transaction Isolation Level",
71+
"name": "transactionIsolationLevel",
72+
"widget-attributes": {
73+
"values": [
74+
"TRANSACTION_READ_UNCOMMITTED",
75+
"TRANSACTION_READ_COMMITTED",
76+
"TRANSACTION_REPEATABLE_READ",
77+
"TRANSACTION_SERIALIZABLE"
78+
],
79+
"default": "TRANSACTION_SERIALIZABLE"
80+
}
81+
},
6882
{
6983
"widget-type": "keyvalue",
7084
"label": "Connection Arguments",
@@ -277,6 +291,10 @@
277291
"type": "property",
278292
"name": "password"
279293
},
294+
{
295+
"type": "property",
296+
"name": "transactionIsolationLevel"
297+
},
280298
{
281299
"type": "property",
282300
"name": "host"

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnectorConfig.java

+2-16
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,6 @@ public String getConnectionString() {
8181
@Macro
8282
private String database;
8383

84-
@Name(OracleConstants.TRANSACTION_ISOLATION_LEVEL)
85-
@Description("The transaction isolation level for the database session.")
86-
@Macro
87-
@Nullable
88-
private String transactionIsolationLevel;
89-
9084
@Name(OracleConstants.USE_SSL)
9185
@Description("Turns on SSL encryption. Connection will fail if SSL is not available")
9286
@Nullable
@@ -124,6 +118,7 @@ public Properties getConnectionArgumentsProperties() {
124118
return prop;
125119
}
126120

121+
@Override
127122
public String getTransactionIsolationLevel() {
128123
//if null default to the highest isolation level possible
129124
if (transactionIsolationLevel == null) {
@@ -133,16 +128,7 @@ public String getTransactionIsolationLevel() {
133128
//This ensures that the role is mapped to the right serialization level, even w/ incorrect user input
134129
//if role is SYSDBA or SYSOP it will map to read_committed. else serialized
135130
return (!getRole().equals(ROLE_NORMAL)) ? TransactionIsolationLevel.Level.TRANSACTION_READ_COMMITTED.name() :
136-
TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
137-
}
138-
139-
@Override
140-
public Map<String, String> getAdditionalArguments() {
141-
Map<String, String> additonalArguments = new HashMap<>();
142-
if (getTransactionIsolationLevel() != null) {
143-
additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel());
144-
}
145-
return additonalArguments;
131+
TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
146132
}
147133

148134
@Override

postgresql-plugin/docs/PostgreSQL-connector.md

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ authentication. Optional for databases that do not require authentication.
2222

2323
**Password:** Password to use to connect to the specified database.
2424

25+
**Transaction Isolation Level** The transaction isolation level of the databse connection
26+
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
27+
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
28+
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
29+
2530
**Database:** The name of the database to connect to.
2631

2732
**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments

postgresql-plugin/docs/Postgres-batchsink.md

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ You also can use the macro function ${conn(connection-name)}.
3939

4040
**Password:** Password to use to connect to the specified database.
4141

42+
**Transaction Isolation Level** The transaction isolation level of the databse connection
43+
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
44+
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
45+
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
46+
4247
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
4348
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
4449

postgresql-plugin/docs/Postgres-batchsource.md

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s
4949

5050
**Password:** Password to use to connect to the specified database.
5151

52+
**Transaction Isolation Level** The transaction isolation level of the databse connection
53+
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
54+
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
55+
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
56+
5257
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
5358
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
5459

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnectorConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.cdap.cdap.api.annotation.Description;
2020
import io.cdap.cdap.api.annotation.Macro;
2121
import io.cdap.cdap.api.annotation.Name;
22+
import io.cdap.plugin.db.TransactionIsolationLevel;
2223
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;
2324

2425
/**

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

+10
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import io.cdap.plugin.db.source.AbstractDBSource;
3737
import io.cdap.plugin.util.DBUtils;
3838
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3941

4042
import java.util.Map;
4143
import javax.annotation.Nullable;
@@ -50,6 +52,8 @@
5052
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = PostgresConnector.NAME)})
5153
public class PostgresSource extends AbstractDBSource<PostgresSource.PostgresSourceConfig> {
5254

55+
private static final Logger LOG = LoggerFactory.getLogger(PostgresSource.class);
56+
5357
private final PostgresSourceConfig postgresSourceConfig;
5458

5559
public PostgresSource(PostgresSourceConfig postgresSourceConfig) {
@@ -143,9 +147,15 @@ protected PostgresConnectorConfig getConnection() {
143147
return connection;
144148
}
145149

150+
@Override
151+
public String getTransactionIsolationLevel() {
152+
return connection.getTransactionIsolationLevel();
153+
}
154+
146155
@Override
147156
public void validate(FailureCollector collector) {
148157
ConfigUtil.validateConnection(this, useConnection, connection, collector);
158+
connection.getAdditionalArguments();
149159
super.validate(collector);
150160
}
151161

0 commit comments

Comments
 (0)