Skip to content

Commit b2a59c4

Browse files
committed
introduce abstract TableSink operator with Java and Spark platform implementation and tests
1 parent 260cb19 commit b2a59c4

10 files changed

Lines changed: 1475 additions & 0 deletions

File tree

wayang-commons/wayang-basic/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@
120120
<version>20231013</version>
121121
</dependency>
122122

123+
<dependency>
124+
<groupId>org.apache.calcite</groupId>
125+
<artifactId>calcite-core</artifactId>
126+
<version>${calcite.version}</version>
127+
</dependency>
123128
<dependency>
124129
<groupId>com.azure</groupId>
125130
<artifactId>azure-storage-blob</artifactId>
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.core.plan.wayangplan.UnarySink;
23+
import org.apache.wayang.core.types.DataSetType;
24+
25+
import java.util.Properties;
26+
27+
/**
28+
* {@link UnarySink} that writes Records to a database table.
29+
*/
30+
31+
public class TableSink<T> extends UnarySink<T> {
32+
private final String tableName;
33+
34+
private String[] columnNames;
35+
36+
private final Properties props;
37+
38+
private String mode;
39+
40+
/**
41+
* Creates a new instance.
42+
*
43+
* @param props database connection properties
44+
* @param mode write mode
45+
* @param tableName name of the table to be written
46+
* @param columnNames names of the columns in the tables
47+
*/
48+
public TableSink(Properties props, String mode, String tableName, String... columnNames) {
49+
this(props, mode, tableName, columnNames, (DataSetType<T>) DataSetType.createDefault(Record.class));
50+
}
51+
52+
public TableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) {
53+
super(type);
54+
this.tableName = tableName;
55+
this.columnNames = columnNames;
56+
this.props = props;
57+
this.mode = mode;
58+
}
59+
60+
/**
61+
* Copies an instance (exclusive of broadcasts).
62+
*
63+
* @param that that should be copied
64+
*/
65+
public TableSink(TableSink<T> that) {
66+
super(that);
67+
this.tableName = that.getTableName();
68+
this.columnNames = that.getColumnNames();
69+
this.props = that.getProperties();
70+
this.mode = that.getMode();
71+
}
72+
73+
public String getTableName() {
74+
return this.tableName;
75+
}
76+
77+
protected void setColumnNames(String[] columnNames) {
78+
this.columnNames = columnNames;
79+
}
80+
81+
public String[] getColumnNames() {
82+
return this.columnNames;
83+
}
84+
85+
public Properties getProperties() {
86+
return this.props;
87+
}
88+
89+
public String getMode() {
90+
return mode;
91+
}
92+
93+
public void setMode(String mode) {
94+
this.mode = mode;
95+
}
96+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.util;
20+
21+
import org.apache.calcite.sql.SqlDialect;
22+
import org.apache.wayang.basic.data.Record;
23+
24+
import java.lang.reflect.Field;
25+
import java.sql.Date;
26+
import java.sql.Timestamp;
27+
import java.time.LocalDate;
28+
import java.time.LocalDateTime;
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/**
35+
* Utility for mapping Java types to SQL types across different dialects.
36+
*/
37+
public class SqlTypeUtils {
38+
39+
private static final Map<SqlDialect.DatabaseProduct, Map<Class<?>, String>> dialectTypeMaps = new HashMap<>();
40+
41+
static {
42+
// Default mappings (Standard SQL)
43+
Map<Class<?>, String> defaultMap = new HashMap<>();
44+
defaultMap.put(Integer.class, "INT");
45+
defaultMap.put(int.class, "INT");
46+
defaultMap.put(Long.class, "BIGINT");
47+
defaultMap.put(long.class, "BIGINT");
48+
defaultMap.put(Double.class, "DOUBLE");
49+
defaultMap.put(double.class, "DOUBLE");
50+
defaultMap.put(Float.class, "FLOAT");
51+
defaultMap.put(float.class, "FLOAT");
52+
defaultMap.put(Boolean.class, "BOOLEAN");
53+
defaultMap.put(boolean.class, "BOOLEAN");
54+
defaultMap.put(String.class, "VARCHAR(255)");
55+
defaultMap.put(Date.class, "DATE");
56+
defaultMap.put(LocalDate.class, "DATE");
57+
defaultMap.put(Timestamp.class, "TIMESTAMP");
58+
defaultMap.put(LocalDateTime.class, "TIMESTAMP");
59+
60+
dialectTypeMaps.put(SqlDialect.DatabaseProduct.UNKNOWN, defaultMap);
61+
62+
// PostgreSQL Overrides
63+
Map<Class<?>, String> pgMap = new HashMap<>(defaultMap);
64+
pgMap.put(Double.class, "DOUBLE PRECISION");
65+
pgMap.put(double.class, "DOUBLE PRECISION");
66+
dialectTypeMaps.put(SqlDialect.DatabaseProduct.POSTGRESQL, pgMap);
67+
68+
// Add more dialects here as needed (MySQL, Oracle, etc.)
69+
}
70+
71+
/**
72+
* Detects the database product from a JDBC URL.
73+
*
74+
* @param url JDBC URL
75+
* @return detected DatabaseProduct
76+
*/
77+
public static SqlDialect.DatabaseProduct detectProduct(String url) {
78+
if (url == null)
79+
return SqlDialect.DatabaseProduct.UNKNOWN;
80+
String lowerUrl = url.toLowerCase();
81+
if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres"))
82+
return SqlDialect.DatabaseProduct.POSTGRESQL;
83+
if (lowerUrl.contains("mysql"))
84+
return SqlDialect.DatabaseProduct.MYSQL;
85+
if (lowerUrl.contains("oracle"))
86+
return SqlDialect.DatabaseProduct.ORACLE;
87+
if (lowerUrl.contains("sqlite")) {
88+
try {
89+
return SqlDialect.DatabaseProduct.valueOf("SQLITE");
90+
} catch (Exception e) {
91+
return SqlDialect.DatabaseProduct.UNKNOWN;
92+
}
93+
}
94+
if (lowerUrl.contains("h2"))
95+
return SqlDialect.DatabaseProduct.H2;
96+
if (lowerUrl.contains("derby"))
97+
return SqlDialect.DatabaseProduct.DERBY;
98+
if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver"))
99+
return SqlDialect.DatabaseProduct.MSSQL;
100+
return SqlDialect.DatabaseProduct.UNKNOWN;
101+
}
102+
103+
/**
104+
* Returns the SQL type for a given Java class and database product.
105+
*
106+
* @param cls Java class
107+
* @param product database product
108+
* @return SQL type string
109+
*/
110+
public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct product) {
111+
Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product,
112+
dialectTypeMaps.get(SqlDialect.DatabaseProduct.UNKNOWN));
113+
return typeMap.getOrDefault(cls, "VARCHAR(255)");
114+
}
115+
116+
/**
117+
* Extracts schema information from a POJO class or a Record.
118+
*
119+
* @param cls POJO class
120+
* @param product database product
121+
* @return a list of schema fields
122+
*/
123+
public static List<SchemaField> getSchema(Class<?> cls, SqlDialect.DatabaseProduct product) {
124+
List<SchemaField> schema = new ArrayList<>();
125+
if (cls == Record.class) {
126+
// For Record.class without an instance, we can't derive names/types easily
127+
// Users should use the instance-based getSchema or provide columnNames
128+
return schema;
129+
}
130+
131+
for (Field field : cls.getDeclaredFields()) {
132+
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
133+
continue;
134+
}
135+
schema.add(new SchemaField(field.getName(), field.getType(), getSqlType(field.getType(), product)));
136+
}
137+
return schema;
138+
}
139+
140+
/**
141+
* Extracts schema information from a Record instance by inspecting its fields.
142+
*
143+
* @param record representative record
144+
* @param product database product
145+
* @param userNames optional user-provided column names
146+
* @return a list of schema fields
147+
*/
148+
public static List<SchemaField> getSchema(Record record, SqlDialect.DatabaseProduct product, String[] userNames) {
149+
List<SchemaField> schema = new ArrayList<>();
150+
if (record == null)
151+
return schema;
152+
153+
int size = record.size();
154+
for (int i = 0; i < size; i++) {
155+
String name = (userNames != null && i < userNames.length) ? userNames[i] : "c_" + i;
156+
Object val = record.getField(i);
157+
Class<?> typeClass = val != null ? val.getClass() : String.class;
158+
String type = getSqlType(typeClass, product);
159+
schema.add(new SchemaField(name, typeClass, type));
160+
}
161+
return schema;
162+
}
163+
164+
public static class SchemaField {
165+
private final String name;
166+
private final Class<?> javaClass;
167+
private final String sqlType;
168+
169+
public SchemaField(String name, Class<?> javaClass, String sqlType) {
170+
this.name = name;
171+
this.javaClass = javaClass;
172+
this.sqlType = sqlType;
173+
}
174+
175+
public String getName() {
176+
return name;
177+
}
178+
179+
public Class<?> getJavaClass() {
180+
return javaClass;
181+
}
182+
183+
public String getSqlType() {
184+
return sqlType;
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)