diff --git a/pom.xml b/pom.xml index 37bdfcfa3..617fa1c81 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ 2.12 3.4.4 1.20.0 + 1.39.0 17 17 diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java index 46df64ca8..0e56e36a6 100755 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java @@ -17,6 +17,9 @@ package org.apache.wayang.api.sql.context; +import org.apache.commons.lang3.StringUtils; + + import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.RelNode; @@ -25,6 +28,8 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.RuleSet; import org.apache.calcite.tools.RuleSets; + +import org.apache.wayang.api.sql.calcite.utils.ModelParser; import org.apache.wayang.api.sql.calcite.convention.WayangConvention; import org.apache.wayang.api.sql.calcite.optimizer.Optimizer; import org.apache.wayang.api.sql.calcite.rules.WayangRules; @@ -39,6 +44,13 @@ import org.apache.wayang.postgres.Postgres; import org.apache.wayang.spark.Spark; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; @@ -72,10 +84,103 @@ public SqlContext(final Configuration configuration, final List plugins) for (final Plugin plugin : plugins) { this.withPlugin(plugin); } - + calciteSchema = SchemaUtils.getSchema(configuration); } + /** + * Entry point for executing SQL statements while providing arguments. + * You need to provide at least a JDBC source. + * + * @param args args[0] = SQL statement path, args[1] = JDBC driver, args[2] = + * JDBC URL, args[3] = JDBC user, + *              args[4] = JDBC password, args[5] = outputPath, + * args[6...] = platforms + */ + public static void main(final String[] args) throws Exception { + if (args.length < 5) + throw new IllegalArgumentException( + "Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext [platforms...]"); + + final String queryPath = args[0]; + final String jdbcDriver = args[1]; + final String jdbcUrl = args[2]; + final String jdbcUser = args[3]; + final String jdbcPassword = args[4]; + final String outputPath = args[5]; + + final String query = StringUtils.chop( + Files.readString(Paths.get(queryPath)) + .stripTrailing()); + + final String driverPlatform = jdbcDriver.split("\\.")[0]; + + final String calciteModel = String.format( + "{\r\n" + + "\"calcite\": {\r\n" + + " \"version\": \"1.0\",\n" + + " \"defaultSchema\": \"wayang\",\n" + + " \"schemas\": [\n" + + " {\n" + + " \"name\": \"postgres\",\n" + + " \"type\": \"custom\",\n" + + " \"factory\": \"org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory\",\n" + + " \"operand\": {\n" + + " \"jdbcDriver\": \"%s\",\n" + + " \"jdbcUrl\": \"%s\",\n" + + " \"jdbcUser\": \"%s\",\n" + + " \"jdbcPassword\": \"%s\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}\r\n" + + "}", + jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword); + + final Configuration configuration = new Configuration(); + + configuration.setProperty("wayang.calcite.model", calciteModel); + configuration.setProperty(String.format("wayang.%s.jdbc.url", driverPlatform), jdbcUrl); + configuration.setProperty(String.format("wayang.%s.jdbc.user", driverPlatform), jdbcUser); + configuration.setProperty(String.format("wayang.%s.jdbc.password", driverPlatform), jdbcPassword); + + final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel); + + final Configuration parseModel = new ModelParser(configuration, calciteModelJSON).setProperties(); + + final SqlContext context = new SqlContext(parseModel, + List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin())); + + for (int i = 6; i < args.length; i++) { + final String platform = args[i]; + + switch (platform.toLowerCase()) { + case "spark": + context.withPlugin(Spark.basicPlugin()); + break; + case "java": + context.withPlugin(Java.basicPlugin()); + break; + case "postgres": + context.withPlugin(Postgres.plugin()); + break; + default: + throw new IllegalArgumentException("platform not supported " + platform); + } + } + + final Collection result = context.executeSql(query); + + try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(outputPath))) { + for (final Record record : result) { + writer.write(record.toString()); + writer.newLine(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + public Collection executeSql(final String sql) throws SqlParseException { final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); @@ -98,7 +203,7 @@ public Collection executeSql(final String sql) throws SqlParseException WayangRules.WAYANG_JOIN_RULE, WayangRules.WAYANG_AGGREGATE_RULE, WayangRules.WAYANG_SORT_RULE); - + final RelNode wayangRel = optimizer.optimize( relNode, relNode.getTraitSet().plus(WayangConvention.INSTANCE), diff --git a/wayang-assembly/pom.xml b/wayang-assembly/pom.xml index b819f6a16..72a0dd696 100644 --- a/wayang-assembly/pom.xml +++ b/wayang-assembly/pom.xml @@ -35,6 +35,7 @@ none package wayang-${project.version} + 1.35.0 @@ -113,7 +114,11 @@ gson 2.10.1 - + + org.apache.calcite + calcite-core + ${calcite.version} +