From 296a4e3b4d94798ebdfa916ed6d767b844df8fac Mon Sep 17 00:00:00 2001 From: Sree Raman Date: Mon, 1 Apr 2019 08:28:02 -0700 Subject: [PATCH 1/4] Add Inputformat from Sqoop --- .../main/java/io/cdap/plugin/db/DBRecord.java | 11 ++-- .../db/batch/source/AbstractDBSource.java | 58 ++++++++++--------- .../source/DataDrivenETLDBInputFormat.java | 12 ++-- .../java/io/cdap/plugin/util/DBUtils.java | 6 +- db2-plugin/pom.xml | 5 +- mssql-plugin/pom.xml | 5 +- mysql-plugin/pom.xml | 5 +- netezza-plugin/pom.xml | 5 +- oracle-plugin/pom.xml | 5 +- pom.xml | 16 +++++ postgresql-plugin/pom.xml | 5 +- 11 files changed, 88 insertions(+), 45 deletions(-) diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java index 755de2a54..074e91664 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java @@ -51,10 +51,11 @@ * @see org.apache.hadoop.mapreduce.lib.db.DBOutputFormat DBOutputFormat * @see DBWritable DBWritable */ -public class DBRecord implements Writable, DBWritable, Configurable { +public class DBRecord implements Writable, org.apache.sqoop.mapreduce.DBWritable, Configurable { protected StructuredRecord record; protected Configuration conf; - + protected List schemaFields; + protected Schema schema; /** * Need to cache {@link ResultSetMetaData} of the record for use during writing to a table. * This is because we cannot rely on JDBC drivers to properly set metadata in the {@link PreparedStatement} @@ -97,8 +98,10 @@ public StructuredRecord getRecord() { */ public void readFields(ResultSet resultSet) throws SQLException { ResultSetMetaData metadata = resultSet.getMetaData(); - List schemaFields = getSchemaReader().getSchemaFields(resultSet, conf.get(DBUtils.OVERRIDE_SCHEMA)); - Schema schema = Schema.recordOf("dbRecord", schemaFields); + if (schemaFields == null) { + schemaFields = DBUtils.getSchemaFields(resultSet, conf.get(DBUtils.OVERRIDE_SCHEMA)); + schema = Schema.recordOf("dbRecord", schemaFields); + } StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); for (int i = 0; i < schemaFields.size(); i++) { Schema.Field field = schemaFields.get(i); diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java index dfcc54624..9587a9894 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java @@ -16,25 +16,25 @@ package io.cdap.plugin.db.batch.source; +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Macro; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.data.batch.Input; +import co.cask.cdap.api.data.format.StructuredRecord; +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.dataset.lib.KeyValue; +import co.cask.cdap.api.plugin.EndpointPluginContext; +import co.cask.cdap.api.plugin.PluginConfig; +import co.cask.cdap.api.plugin.PluginProperties; +import co.cask.cdap.etl.api.Emitter; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.batch.BatchRuntimeContext; +import co.cask.cdap.etl.api.batch.BatchSourceContext; +import co.cask.hydrator.common.LineageRecorder; +import co.cask.hydrator.common.ReferenceBatchSource; +import co.cask.hydrator.common.ReferencePluginConfig; +import co.cask.hydrator.common.SourceInputFormatProvider; import com.google.common.base.Strings; -import io.cdap.cdap.api.annotation.Description; -import io.cdap.cdap.api.annotation.Macro; -import io.cdap.cdap.api.annotation.Name; -import io.cdap.cdap.api.data.batch.Input; -import io.cdap.cdap.api.data.format.StructuredRecord; -import io.cdap.cdap.api.data.schema.Schema; -import io.cdap.cdap.api.dataset.lib.KeyValue; -import io.cdap.cdap.api.plugin.EndpointPluginContext; -import io.cdap.cdap.api.plugin.PluginConfig; -import io.cdap.cdap.api.plugin.PluginProperties; -import io.cdap.cdap.etl.api.Emitter; -import io.cdap.cdap.etl.api.PipelineConfigurer; -import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; -import io.cdap.cdap.etl.api.batch.BatchSourceContext; -import io.cdap.plugin.common.LineageRecorder; -import io.cdap.plugin.common.ReferenceBatchSource; -import io.cdap.plugin.common.ReferencePluginConfig; -import io.cdap.plugin.common.SourceInputFormatProvider; import io.cdap.plugin.db.CommonSchemaReader; import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.DBConfig; @@ -43,11 +43,11 @@ import io.cdap.plugin.db.batch.TransactionIsolationLevel; import io.cdap.plugin.util.DBUtils; import io.cdap.plugin.util.DriverCleanup; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +72,7 @@ public abstract class AbstractDBSource extends ReferenceBatchSource driverClass; + protected FieldCase fieldCase; public AbstractDBSource(DBSourceConfig sourceConfig) { super(new ReferencePluginConfig(sourceConfig.referenceName)); @@ -115,7 +116,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Path("getSchema") public Schema getSchema(GetSchemaRequest request, EndpointPluginContext pluginContext) throws IllegalAccessException, - SQLException, InstantiationException { + SQLException, InstantiationException, ClassNotFoundException { DriverCleanup driverCleanup; try { @@ -144,7 +145,7 @@ protected SchemaReader getSchemaReader() { private DriverCleanup loadPluginClassAndGetDriver(GetSchemaRequest request, EndpointPluginContext pluginContext) - throws IllegalAccessException, InstantiationException, SQLException { + throws IllegalAccessException, InstantiationException, SQLException, ClassNotFoundException { Class driverClass = pluginContext.loadPluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, @@ -196,16 +197,18 @@ public void prepareRun(BatchSourceContext context) throws Exception { ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.jdbcPluginName, connectionString, sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery()); - Configuration hConf = new Configuration(); + JobConf hConf = new JobConf(); hConf.clear(); + int fetchSize = 1000; // Load the plugin class to make sure it is available. Class driverClass = context.loadPluginClass(getJDBCPluginId()); if (sourceConfig.user == null && sourceConfig.password == null) { - DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString); + DBConfiguration.configureDB(hConf, driverClass.getName(), sourceConfig.connectionString, fetchSize); } else { DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, - sourceConfig.user, sourceConfig.password); + sourceConfig.user, sourceConfig.password, fetchSize); + hConf.set("co.cask.cdap.jdbc.passwd", sourceConfig.password); } DataDrivenETLDBInputFormat.setInput(hConf, getDBRecordType(), @@ -247,12 +250,13 @@ protected Class getDBRecordType() { @Override public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); - driverClass = context.loadPluginClass(getJDBCPluginId()); + driverClass = context.loadPluginClass(getJDBCPluginId());; + fieldCase = FieldCase.toFieldCase(sourceConfig.columnNameCase); } @Override public void transform(KeyValue input, Emitter emitter) throws Exception { - emitter.emit(input.getValue().getRecord()); + emitter.emit(StructuredRecordUtils.convertCase(input.getValue().getRecord(), fieldCase)); } @Override diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java index 48f412e69..497af8649 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java @@ -20,7 +20,6 @@ import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.JDBCDriverShim; import io.cdap.plugin.db.batch.NoOpCommitConnection; -import io.cdap.plugin.db.batch.TransactionIsolationLevel; import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; @@ -49,6 +48,7 @@ public class DataDrivenETLDBInputFormat extends DataDrivenDBInputFormat { private static final Logger LOG = LoggerFactory.getLogger(DataDrivenETLDBInputFormat.class); private Driver driver; private JDBCDriverShim driverShim; + private Connection connection; static void setInput(Configuration conf, Class inputClass, @@ -92,7 +92,7 @@ public Connection getConnection() { Properties properties = ConnectionConfig.getConnectionArguments(conf.get(DBUtils.CONNECTION_ARGUMENTS), conf.get(DBConfiguration.USERNAME_PROPERTY), - conf.get(DBConfiguration.PASSWORD_PROPERTY)); + conf.get("co.cask.cdap.jdbc.passwd")); connection = DriverManager.getConnection(url, properties); @@ -103,9 +103,11 @@ public Connection getConnection() { } else { this.connection.setAutoCommit(false); } - String level = conf.get(TransactionIsolationLevel.CONF_KEY); - LOG.debug("Transaction isolation level: {}", level); - connection.setTransactionIsolation(TransactionIsolationLevel.getLevel(level)); + this.connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); +// +// String level = conf.get(TransactionIsolationLevel.CONF_KEY); +// LOG.debug("Transaction isolation level: {}", level); +// connection.setTransactionIsolation(TransactionIsolationLevel.getLevel(level)); } catch (Exception e) { throw Throwables.propagate(e); } diff --git a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java index 12794dbfa..8e9e7f09d 100644 --- a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java +++ b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java @@ -16,14 +16,14 @@ package io.cdap.plugin.util; +import co.cask.cdap.api.plugin.PluginProperties; +import co.cask.cdap.etl.api.PipelineConfigurer; +import co.cask.cdap.etl.api.validation.InvalidConfigPropertyException; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.data.schema.UnsupportedTypeException; -import io.cdap.cdap.api.plugin.PluginProperties; -import io.cdap.cdap.etl.api.PipelineConfigurer; -import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.JDBCDriverShim; import org.slf4j.Logger; diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml index cd16e1389..972162c5b 100644 --- a/db2-plugin/pom.xml +++ b/db2-plugin/pom.xml @@ -93,7 +93,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/mssql-plugin/pom.xml b/mssql-plugin/pom.xml index 25f7f40ef..9a0c01348 100644 --- a/mssql-plugin/pom.xml +++ b/mssql-plugin/pom.xml @@ -93,7 +93,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/mysql-plugin/pom.xml b/mysql-plugin/pom.xml index b706b557d..32843e315 100644 --- a/mysql-plugin/pom.xml +++ b/mysql-plugin/pom.xml @@ -93,7 +93,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/netezza-plugin/pom.xml b/netezza-plugin/pom.xml index 407fc94da..e3d54f997 100644 --- a/netezza-plugin/pom.xml +++ b/netezza-plugin/pom.xml @@ -93,7 +93,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/oracle-plugin/pom.xml b/oracle-plugin/pom.xml index 9b32e92f6..92fc09d17 100644 --- a/oracle-plugin/pom.xml +++ b/oracle-plugin/pom.xml @@ -98,7 +98,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true diff --git a/pom.xml b/pom.xml index 36003755f..f2beecd33 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ 0.9.0 + 1.4.7 @@ -230,6 +231,21 @@ + + org.apache.sqoop + sqoop + hadoop260 + ${sqoop.version} + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + + + org.slf4j + log4j-over-slf4j + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/postgresql-plugin/pom.xml b/postgresql-plugin/pom.xml index bee71bafa..3451374e8 100644 --- a/postgresql-plugin/pom.xml +++ b/postgresql-plugin/pom.xml @@ -99,7 +99,10 @@ io.cdap.plugin.*; org.apache.commons.lang; org.apache.commons.logging.*; - org.codehaus.jackson.* + org.codehaus.jackson.*; + org.apache.sqoop.*; + com.cloudera.sqoop.*; + org.apache.log4j.* *;inline=false;scope=compile true From 689d6b06f7c3a4644675777b0e49697ada3e583f Mon Sep 17 00:00:00 2001 From: Sree Raman Date: Mon, 1 Apr 2019 14:31:04 -0700 Subject: [PATCH 2/4] UpdateS --- database-commons/pom.xml | 5 +++++ .../plugin/db/batch/source/AbstractDBSource.java | 16 ++++++++++++++-- .../batch/source/DataDrivenETLDBInputFormat.java | 11 +++++------ .../main/java/io/cdap/plugin/db2/Db2Source.java | 10 +++++----- .../io/cdap/plugin/netezza/NetezzaSource.java | 11 +++++------ .../java/io/cdap/plugin/oracle/OracleSource.java | 8 ++++---- 6 files changed, 38 insertions(+), 23 deletions(-) diff --git a/database-commons/pom.xml b/database-commons/pom.xml index 79cc52ba4..43b889bc7 100644 --- a/database-commons/pom.xml +++ b/database-commons/pom.xml @@ -59,6 +59,11 @@ cdap-api compile + + org.apache.sqoop + sqoop + hadoop260 + diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java index 9587a9894..067a8aed6 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java @@ -204,7 +204,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { // Load the plugin class to make sure it is available. Class driverClass = context.loadPluginClass(getJDBCPluginId()); if (sourceConfig.user == null && sourceConfig.password == null) { - DBConfiguration.configureDB(hConf, driverClass.getName(), sourceConfig.connectionString, fetchSize); + DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, fetchSize); } else { DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, sourceConfig.user, sourceConfig.password, fetchSize); @@ -243,7 +243,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, hConf))); } - protected Class getDBRecordType() { + protected Class getDBRecordType() { return DBRecord.class; } @@ -280,6 +280,7 @@ public abstract static class DBSourceConfig extends DBConfig { public static final String NUM_SPLITS = "numSplits"; public static final String SCHEMA = "schema"; public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; + public static final String COLUMN_NAME_CASE = "columnCase"; @Name(IMPORT_QUERY) @Description("The SELECT query to use to import data from the specified table. " + @@ -319,6 +320,17 @@ public abstract static class DBSourceConfig extends DBConfig { "is not correctly getting marked as nullable.") public String schema; + + @Name(COLUMN_NAME_CASE) + @Description("Sets the case of the column names returned from the query. " + + "Possible options are upper or lower. By default or for any other input, the column names are not modified and " + + "the names returned from the database are used as-is. Note that setting this property provides predictability " + + "of column name cases across different databases but might result in column name conflicts if multiple column " + + "names are the same when the case is ignored.") + @Nullable + public String columnNameCase; + + private String getImportQuery() { return cleanQuery(importQuery); } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java index 497af8649..e58ce8003 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java @@ -25,10 +25,9 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; -import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; +import org.apache.sqoop.mapreduce.DBWritable; +import org.apache.sqoop.mapreduce.db.DBConfiguration; +import org.apache.sqoop.mapreduce.db.DBInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,8 +41,8 @@ /** * Class that extends {@link DBInputFormat} to load the database driver class correctly. */ -public class DataDrivenETLDBInputFormat extends DataDrivenDBInputFormat { - public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.autocommit.enabled"; +public class DataDrivenETLDBInputFormat extends org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat { + public static final String AUTO_COMMIT_ENABLED = "co.cask.hydrator.db.autocommit.enabled"; private static final Logger LOG = LoggerFactory.getLogger(DataDrivenETLDBInputFormat.class); private Driver driver; diff --git a/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java b/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java index cb5cc8088..45748918a 100644 --- a/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java +++ b/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java @@ -16,13 +16,13 @@ package io.cdap.plugin.db2; -import io.cdap.cdap.api.annotation.Description; -import io.cdap.cdap.api.annotation.Name; -import io.cdap.cdap.api.annotation.Plugin; -import io.cdap.cdap.etl.api.batch.BatchSource; +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.etl.api.batch.BatchSource; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; /** diff --git a/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java b/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java index 92235a371..2fe9ec55b 100644 --- a/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java +++ b/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java @@ -16,14 +16,13 @@ package io.cdap.plugin.netezza; -import io.cdap.cdap.api.annotation.Description; -import io.cdap.cdap.api.annotation.Name; -import io.cdap.cdap.api.annotation.Plugin; -import io.cdap.cdap.etl.api.batch.BatchSource; +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; +import co.cask.cdap.etl.api.batch.BatchSource; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; - +import org.apache.sqoop.mapreduce.DBWritable; /** * Batch source to read from Netezza. diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 518513fdd..2425ebf86 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -16,14 +16,14 @@ package io.cdap.plugin.oracle; +import co.cask.cdap.api.annotation.Description; +import co.cask.cdap.api.annotation.Name; +import co.cask.cdap.api.annotation.Plugin; import com.google.common.collect.ImmutableMap; -import io.cdap.cdap.api.annotation.Description; -import io.cdap.cdap.api.annotation.Name; -import io.cdap.cdap.api.annotation.Plugin; import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; import java.util.Map; import javax.annotation.Nullable; From 4d2362c9efb09743e38209bb3ef7b6440e0796d3 Mon Sep 17 00:00:00 2001 From: Sree Raman Date: Mon, 1 Apr 2019 16:25:14 -0700 Subject: [PATCH 3/4] Add SLF-4j --- db2-plugin/pom.xml | 8 ++++++++ generic-database-plugin/pom.xml | 8 ++++++++ mssql-plugin/pom.xml | 8 ++++++++ mysql-plugin/pom.xml | 8 ++++++++ netezza-plugin/pom.xml | 8 ++++++++ oracle-plugin/pom.xml | 8 ++++++++ pom.xml | 1 + 7 files changed, 49 insertions(+) diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml index 972162c5b..70e17ba45 100644 --- a/db2-plugin/pom.xml +++ b/db2-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/generic-database-plugin/pom.xml b/generic-database-plugin/pom.xml index f8524dde7..58b0aa74b 100644 --- a/generic-database-plugin/pom.xml +++ b/generic-database-plugin/pom.xml @@ -83,6 +83,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/mssql-plugin/pom.xml b/mssql-plugin/pom.xml index 9a0c01348..89dbeb466 100644 --- a/mssql-plugin/pom.xml +++ b/mssql-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/mysql-plugin/pom.xml b/mysql-plugin/pom.xml index 32843e315..d4234671c 100644 --- a/mysql-plugin/pom.xml +++ b/mysql-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/netezza-plugin/pom.xml b/netezza-plugin/pom.xml index e3d54f997..cc272e56c 100644 --- a/netezza-plugin/pom.xml +++ b/netezza-plugin/pom.xml @@ -78,6 +78,14 @@ RELEASE test + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/oracle-plugin/pom.xml b/oracle-plugin/pom.xml index 92fc09d17..790bd1fd9 100644 --- a/oracle-plugin/pom.xml +++ b/oracle-plugin/pom.xml @@ -83,6 +83,14 @@ RELEASE compile + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + diff --git a/pom.xml b/pom.xml index f2beecd33..c389feadf 100644 --- a/pom.xml +++ b/pom.xml @@ -245,6 +245,7 @@ org.slf4j log4j-over-slf4j + ${slf4j.version} org.apache.hadoop From c1fc53928610b4f14361a308c1e9d1b23417d648 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Thu, 11 Apr 2019 16:07:25 -0700 Subject: [PATCH 4/4] Rebase on develop and rename packages --- .../postgres/AuroraPostgresSource.java | 2 +- .../java/io/cdap/plugin/db/FieldCase.java | 40 ++++++++++ .../cdap/plugin/db/StructuredRecordUtils.java | 79 +++++++++++++++++++ .../db/batch/source/AbstractDBSource.java | 41 +++++----- .../source/DataDrivenETLDBInputFormat.java | 4 +- .../java/io/cdap/plugin/util/DBUtils.java | 6 +- .../plugin/db/StructuredRecordUtilsTest.java | 69 ++++++++++++++++ .../java/io/cdap/plugin/db2/Db2Source.java | 8 +- .../io/cdap/plugin/netezza/NetezzaSource.java | 8 +- .../io/cdap/plugin/oracle/OracleSource.java | 6 +- .../cdap/plugin/postgres/PostgresSource.java | 2 +- 11 files changed, 227 insertions(+), 38 deletions(-) create mode 100644 database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java create mode 100644 database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java create mode 100644 database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java diff --git a/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java b/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java index 365df0ccc..3639f1495 100644 --- a/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java +++ b/aurora-postgresql-plugin/src/main/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSource.java @@ -24,7 +24,7 @@ import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; import java.util.Map; import javax.annotation.Nullable; diff --git a/database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java b/database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java new file mode 100644 index 000000000..b65980d40 --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/FieldCase.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db; + +import com.google.common.base.Strings; + +/** + * Enum to denote case of Structured Record field. + */ +public enum FieldCase { + LOWER, + UPPER, + NONE; + + public static FieldCase toFieldCase(String fieldCase) { + if (Strings.isNullOrEmpty(fieldCase)) { + return FieldCase.NONE; + } + + try { + return FieldCase.valueOf(fieldCase.toUpperCase()); + } catch (IllegalArgumentException e) { + return FieldCase.NONE; + } + } +} diff --git a/database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java b/database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java new file mode 100644 index 000000000..6334a3f9b --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/StructuredRecordUtils.java @@ -0,0 +1,79 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utils class that contains StructuredRecord related transformations. + */ +public class StructuredRecordUtils { + + /** + * Converts the field names in the input {@link StructuredRecord} to a desired case + * + * @param input {@link StructuredRecord} + * @param fieldCase {@link FieldCase} + * @return {@link StructuredRecord} which contains field names confirming to the {@link FieldCase} passed in + * @throws Exception if there is a conflict in the field names while converting the case + */ + public static StructuredRecord convertCase(StructuredRecord input, FieldCase fieldCase) throws Exception { + if (fieldCase.equals(FieldCase.NONE)) { + return input; + } + + Schema oldSchema = input.getSchema(); + Map fieldNameMap = new HashMap<>(); + List newFields = new ArrayList<>(); + for (Schema.Field field : oldSchema.getFields()) { + String newName = changeName(field.getName(), fieldCase); + if (fieldNameMap.containsValue(newName)) { + // field name used already. indication of field names conflict. can't do anything. + throw new IllegalStateException(String.format( + "Duplicate field/column name %s found when trying to confirm to the chosen case option %s. " + + "Check Database Table schema.", field.getName(), fieldCase)); + } + fieldNameMap.put(field.getName(), newName); + newFields.add(Schema.Field.of(newName, field.getSchema())); + } + StructuredRecord.Builder recordBuilder = StructuredRecord.builder(Schema.recordOf("dbRecord", newFields)); + for (Map.Entry nameMap : fieldNameMap.entrySet()) { + recordBuilder.set(nameMap.getValue(), input.get(nameMap.getKey())); + } + return recordBuilder.build(); + } + + private StructuredRecordUtils() { + } + + private static String changeName(String oldName, FieldCase fieldCase) { + switch (fieldCase) { + case LOWER: + return oldName.toLowerCase(); + case UPPER: + return oldName.toUpperCase(); + default: + return oldName; + } + } +} diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java index 067a8aed6..fd3230766 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java @@ -16,37 +16,38 @@ package io.cdap.plugin.db.batch.source; -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Macro; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.data.batch.Input; -import co.cask.cdap.api.data.format.StructuredRecord; -import co.cask.cdap.api.data.schema.Schema; -import co.cask.cdap.api.dataset.lib.KeyValue; -import co.cask.cdap.api.plugin.EndpointPluginContext; -import co.cask.cdap.api.plugin.PluginConfig; -import co.cask.cdap.api.plugin.PluginProperties; -import co.cask.cdap.etl.api.Emitter; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.batch.BatchRuntimeContext; -import co.cask.cdap.etl.api.batch.BatchSourceContext; -import co.cask.hydrator.common.LineageRecorder; -import co.cask.hydrator.common.ReferenceBatchSource; -import co.cask.hydrator.common.ReferencePluginConfig; -import co.cask.hydrator.common.SourceInputFormatProvider; import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.batch.Input; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.plugin.EndpointPluginContext; +import io.cdap.cdap.api.plugin.PluginConfig; +import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.ReferenceBatchSource; +import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.common.SourceInputFormatProvider; import io.cdap.plugin.db.CommonSchemaReader; import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.DBConfig; import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.FieldCase; import io.cdap.plugin.db.SchemaReader; +import io.cdap.plugin.db.StructuredRecordUtils; import io.cdap.plugin.db.batch.TransactionIsolationLevel; import io.cdap.plugin.util.DBUtils; import io.cdap.plugin.util.DriverCleanup; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.sqoop.mapreduce.db.DBConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,7 +209,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { } else { DBConfiguration.configureDB(hConf, driverClass.getName(), connectionString, sourceConfig.user, sourceConfig.password, fetchSize); - hConf.set("co.cask.cdap.jdbc.passwd", sourceConfig.password); + hConf.set("io.cdap.cdap.jdbc.passwd", sourceConfig.password); } DataDrivenETLDBInputFormat.setInput(hConf, getDBRecordType(), diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java index e58ce8003..24ce4b665 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/source/DataDrivenETLDBInputFormat.java @@ -42,7 +42,7 @@ * Class that extends {@link DBInputFormat} to load the database driver class correctly. */ public class DataDrivenETLDBInputFormat extends org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat { - public static final String AUTO_COMMIT_ENABLED = "co.cask.hydrator.db.autocommit.enabled"; + public static final String AUTO_COMMIT_ENABLED = "io.cdap.plugin.db.autocommit.enabled"; private static final Logger LOG = LoggerFactory.getLogger(DataDrivenETLDBInputFormat.class); private Driver driver; @@ -91,7 +91,7 @@ public Connection getConnection() { Properties properties = ConnectionConfig.getConnectionArguments(conf.get(DBUtils.CONNECTION_ARGUMENTS), conf.get(DBConfiguration.USERNAME_PROPERTY), - conf.get("co.cask.cdap.jdbc.passwd")); + conf.get("io.cdap.cdap.jdbc.passwd")); connection = DriverManager.getConnection(url, properties); diff --git a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java index 8e9e7f09d..12794dbfa 100644 --- a/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java +++ b/database-commons/src/main/java/io/cdap/plugin/util/DBUtils.java @@ -16,14 +16,14 @@ package io.cdap.plugin.util; -import co.cask.cdap.api.plugin.PluginProperties; -import co.cask.cdap.etl.api.PipelineConfigurer; -import co.cask.cdap.etl.api.validation.InvalidConfigPropertyException; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.data.schema.UnsupportedTypeException; +import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; import io.cdap.plugin.db.ConnectionConfig; import io.cdap.plugin.db.JDBCDriverShim; import org.slf4j.Logger; diff --git a/database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java b/database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java new file mode 100644 index 000000000..1d731da58 --- /dev/null +++ b/database-commons/src/test/java/io/cdap/plugin/db/StructuredRecordUtilsTest.java @@ -0,0 +1,69 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit Tests for {@link StructuredRecordUtils}. + */ +public class StructuredRecordUtilsTest { + + @Test + public void testLowerAndUpperCaseTransformation() throws Exception { + StructuredRecord record = StructuredRecord.builder( + Schema.recordOf("dbrecord", + Schema.Field.of("Name", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("Age", Schema.of(Schema.Type.INT)))).set("Name", "Abcd").set("Age", 10).build(); + StructuredRecord upperCaseRecord = StructuredRecordUtils.convertCase( + record, FieldCase.toFieldCase("upPer")); + Assert.assertEquals("Abcd", upperCaseRecord.get("NAME")); + Assert.assertEquals(10, upperCaseRecord.get("AGE").intValue()); + Assert.assertNull(upperCaseRecord.get("Age")); + Assert.assertNull(upperCaseRecord.get("Name")); + + StructuredRecord lowerCaseRecord = StructuredRecordUtils.convertCase( + record, FieldCase.toFieldCase("lowEr")); + Assert.assertEquals("Abcd", lowerCaseRecord.get("name")); + Assert.assertEquals(10, lowerCaseRecord.get("age").intValue()); + Assert.assertNull(upperCaseRecord.get("Age")); + Assert.assertNull(upperCaseRecord.get("Name")); + + StructuredRecord noChangeRecord = StructuredRecordUtils.convertCase( + record, FieldCase.toFieldCase("no change")); + Assert.assertEquals("Abcd", noChangeRecord.get("Name")); + Assert.assertEquals(10, noChangeRecord.get("Age").intValue()); + } + + @Test + public void testInvalidTransformation() throws Exception { + StructuredRecord record = StructuredRecord.builder( + Schema.recordOf("dbrecord", + Schema.Field.of("age", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("Age", Schema.of(Schema.Type.INT)))).set("age", "10").set("Age", 10).build(); + + try { + StructuredRecordUtils.convertCase(record, FieldCase.toFieldCase("lower")); + Assert.fail(); + } catch (Exception e) { + //expected + } + } +} diff --git a/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java b/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java index 45748918a..75bed821b 100644 --- a/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java +++ b/db2-plugin/src/main/java/io/cdap/plugin/db2/Db2Source.java @@ -16,10 +16,10 @@ package io.cdap.plugin.db2; -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; import org.apache.sqoop.mapreduce.DBWritable; diff --git a/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java b/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java index 2fe9ec55b..2811d0b4b 100644 --- a/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java +++ b/netezza-plugin/src/main/java/io/cdap/plugin/netezza/NetezzaSource.java @@ -16,10 +16,10 @@ package io.cdap.plugin.netezza; -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; -import co.cask.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; import org.apache.sqoop.mapreduce.DBWritable; diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 2425ebf86..5f70053fe 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -16,10 +16,10 @@ package io.cdap.plugin.oracle; -import co.cask.cdap.api.annotation.Description; -import co.cask.cdap.api.annotation.Name; -import co.cask.cdap.api.annotation.Plugin; import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index 1550f2f19..75d61e17e 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -24,7 +24,7 @@ import io.cdap.plugin.db.SchemaReader; import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; import io.cdap.plugin.db.batch.source.AbstractDBSource; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.sqoop.mapreduce.DBWritable; import java.util.Map; import javax.annotation.Nullable;