1717
1818package org .apache .wayang .api .sql .context ;
1919
20+ import org .apache .commons .lang3 .StringUtils ;
21+
22+
2023import org .apache .calcite .jdbc .CalciteSchema ;
2124import org .apache .calcite .jdbc .JavaTypeFactoryImpl ;
2225import org .apache .calcite .rel .RelNode ;
2528import org .apache .calcite .sql .parser .SqlParseException ;
2629import org .apache .calcite .tools .RuleSet ;
2730import org .apache .calcite .tools .RuleSets ;
31+
32+ import org .apache .wayang .api .sql .calcite .utils .ModelParser ;
2833import org .apache .wayang .api .sql .calcite .convention .WayangConvention ;
2934import org .apache .wayang .api .sql .calcite .optimizer .Optimizer ;
3035import org .apache .wayang .api .sql .calcite .rules .WayangRules ;
3944import org .apache .wayang .postgres .Postgres ;
4045import org .apache .wayang .spark .Spark ;
4146
47+ import org .json .simple .JSONObject ;
48+ import org .json .simple .parser .JSONParser ;
49+
50+ import java .io .BufferedWriter ;
51+ import java .io .IOException ;
52+ import java .nio .file .Files ;
53+ import java .nio .file .Paths ;
4254import java .sql .SQLException ;
4355import java .util .ArrayList ;
4456import java .util .Collection ;
@@ -72,10 +84,103 @@ public SqlContext(final Configuration configuration, final List<Plugin> plugins)
7284 for (final Plugin plugin : plugins ) {
7385 this .withPlugin (plugin );
7486 }
75-
87+
7688 calciteSchema = SchemaUtils .getSchema (configuration );
7789 }
7890
91+ /**
92+ * Entry point for executing SQL statements while providing arguments.
93+ * You need to provide at least a JDBC source.
94+ *
95+ * @param args args[0] = SQL statement path, args[1] = JDBC driver, args[2] =
96+ * JDBC URL, args[3] = JDBC user,
97+ * args[4] = JDBC password, args[5] = outputPath,
98+ * args[6...] = platforms
99+ */
100+ public static void main (final String [] args ) throws Exception {
101+ if (args .length < 5 )
102+ throw new IllegalArgumentException (
103+ "Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <SQL statement path> <JDBC driver> <JDBC URL> <JDBC user> <JDBC password> <Result output path> [platforms...]" );
104+
105+ final String queryPath = args [0 ];
106+ final String jdbcDriver = args [1 ];
107+ final String jdbcUrl = args [2 ];
108+ final String jdbcUser = args [3 ];
109+ final String jdbcPassword = args [4 ];
110+ final String outputPath = args [5 ];
111+
112+ final String query = StringUtils .chop (
113+ Files .readString (Paths .get (queryPath ))
114+ .stripTrailing ());
115+
116+ final String driverPlatform = jdbcDriver .split ("\\ ." )[0 ];
117+
118+ final String calciteModel = String .format (
119+ "{\r \n " +
120+ "\" calcite\" : {\r \n " +
121+ " \" version\" : \" 1.0\" ,\n " +
122+ " \" defaultSchema\" : \" wayang\" ,\n " +
123+ " \" schemas\" : [\n " +
124+ " {\n " +
125+ " \" name\" : \" postgres\" ,\n " +
126+ " \" type\" : \" custom\" ,\n " +
127+ " \" factory\" : \" org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory\" ,\n " +
128+ " \" operand\" : {\n " +
129+ " \" jdbcDriver\" : \" %s\" ,\n " +
130+ " \" jdbcUrl\" : \" %s\" ,\n " +
131+ " \" jdbcUser\" : \" %s\" ,\n " +
132+ " \" jdbcPassword\" : \" %s\" \n " +
133+ " }\n " +
134+ " }\n " +
135+ " ]\n " +
136+ "}\r \n " +
137+ "}" ,
138+ jdbcDriver , jdbcUrl , jdbcUser , jdbcPassword );
139+
140+ final Configuration configuration = new Configuration ();
141+
142+ configuration .setProperty ("wayang.calcite.model" , calciteModel );
143+ configuration .setProperty (String .format ("wayang.%s.jdbc.url" , driverPlatform ), jdbcUrl );
144+ configuration .setProperty (String .format ("wayang.%s.jdbc.user" , driverPlatform ), jdbcUser );
145+ configuration .setProperty (String .format ("wayang.%s.jdbc.password" , driverPlatform ), jdbcPassword );
146+
147+ final JSONObject calciteModelJSON = (JSONObject ) new JSONParser ().parse (calciteModel );
148+
149+ final Configuration parseModel = new ModelParser (configuration , calciteModelJSON ).setProperties ();
150+
151+ final SqlContext context = new SqlContext (parseModel ,
152+ List .of (Java .channelConversionPlugin (), Postgres .conversionPlugin ()));
153+
154+ for (int i = 6 ; i < args .length ; i ++) {
155+ final String platform = args [i ];
156+
157+ switch (platform .toLowerCase ()) {
158+ case "spark" :
159+ context .withPlugin (Spark .basicPlugin ());
160+ break ;
161+ case "java" :
162+ context .withPlugin (Java .basicPlugin ());
163+ break ;
164+ case "postgres" :
165+ context .withPlugin (Postgres .plugin ());
166+ break ;
167+ default :
168+ throw new IllegalArgumentException ("platform not supported " + platform );
169+ }
170+ }
171+
172+ final Collection <Record > result = context .executeSql (query );
173+
174+ try (BufferedWriter writer = Files .newBufferedWriter (Paths .get (outputPath ))) {
175+ for (final Record record : result ) {
176+ writer .write (record .toString ());
177+ writer .newLine ();
178+ }
179+ } catch (IOException e ) {
180+ e .printStackTrace ();
181+ }
182+ }
183+
79184 public Collection <Record > executeSql (final String sql ) throws SqlParseException {
80185
81186 final Properties configProperties = Optimizer .ConfigProperties .getDefaults ();
@@ -98,7 +203,7 @@ public Collection<Record> executeSql(final String sql) throws SqlParseException
98203 WayangRules .WAYANG_JOIN_RULE ,
99204 WayangRules .WAYANG_AGGREGATE_RULE ,
100205 WayangRules .WAYANG_SORT_RULE );
101-
206+
102207 final RelNode wayangRel = optimizer .optimize (
103208 relNode ,
104209 relNode .getTraitSet ().plus (WayangConvention .INSTANCE ),
0 commit comments