-
Notifications
You must be signed in to change notification settings - Fork 15
Description
Hi,
In below code I am trying to read data from one MySQL server and to load it into table in another mysql server.
Job is completing successfully, but target table is not loaded with any record.
Instead of loading data into target table in target server (ip2), its loads data into source server(ip1).
Looks like it's creating single jobConf for single MR job and it creates just one connection instead of two.( as per code in cascading.jdbc.db.DBConfiguration). If I add a groupBy pipe in between source Tap and target Tap it works fine as it creates 2 MR jobs. Each MR job will have one JobConf.
Currently I am using Hadoop2MR1FlowConnector() as we are not using Tez for project. It works fine with Hadoop2TezFlowConnector().
`
public class TableToTable {
public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException {
String jdbcurl1 ="jdbc:mysql://ip2:3306/cascading_jdbc?user=root&password=mysql";
String jdbcurl2 ="jdbc:mysql://ip1:3306/cascading_jdbc?user=root&password=mysql";
String driverName="com.mysql.jdbc.Driver";
Class<? extends DBInputFormat> inputFormatClass = MySqlDBInputFormat.class;
String TESTING_TABLE_NAME_SOURCE = "testingtable12";
String TESTING_TABLE_NAME_TARGET = "testingtable13";
Fields fields = new Fields( new Comparable[]{"num", "lwr", "upr"}, new Type[]{int.class, String.class, String.class} );
Pipe parsePipe = new Pipe( "insert" );
String[] columnNames = {"num", "lwr", "upr"};
String[] columnDefs = {"INT NOT NULL", "VARCHAR(100) NOT NULL", "VARCHAR(100) NOT NULL"};
String[] primaryKeys = null;
TableDesc tableDescS = new TableDesc( TESTING_TABLE_NAME_SOURCE, columnNames, columnDefs, primaryKeys );
JDBCScheme schemeS = new JDBCScheme(inputFormatClass, fields, columnNames );
JDBCTap sourceTap = new JDBCTap( jdbcurl1, driverName, tableDescS, schemeS,SinkMode.REPLACE);
sourceTap.setBatchSize( 1 );
TableDesc tableDescT = new TableDesc( TESTING_TABLE_NAME_TARGET, columnNames, columnDefs, primaryKeys );
JDBCScheme schemeT = new JDBCScheme(inputFormatClass, fields, columnNames );
JDBCTap targetTapT = new JDBCTap( jdbcurl2, driverName, tableDescT, schemeT, SinkMode.REPLACE);
targetTapT.setBatchSize( 1 );
Flow<?> parseFlow = new Hadoop2MR1FlowConnector().connect( sourceTap, targetTapT, parsePipe );
parseFlow.complete();
}
}
`
https://groups.google.com/forum/#!topic/cascading-user/eBiV9vaomQo