Skip to content

Commit 92cff92

Browse files
authored
Merge pull request #665 from harrygav/introduce_postgresql_sink
Add TableSink operator with Java/Spark implementations
2 parents de50095 + 2d453ed commit 92cff92

11 files changed

Lines changed: 1547 additions & 3 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.core.plan.wayangplan.UnarySink;
23+
import org.apache.wayang.core.types.DataSetType;
24+
25+
import java.util.Properties;
26+
27+
/**
28+
* {@link UnarySink} that writes Records to a database table.
29+
*/
30+
31+
public class TableSink<T> extends UnarySink<T> {
32+
private final String tableName;
33+
34+
private String[] columnNames;
35+
36+
private final Properties props;
37+
38+
private String mode;
39+
40+
/**
41+
* Creates a new instance.
42+
*
43+
* @param props database connection properties
44+
* @param mode write mode
45+
* @param tableName name of the table to be written
46+
* @param columnNames names of the columns in the tables
47+
*/
48+
public TableSink(Properties props, String mode, String tableName, String... columnNames) {
49+
this(props, mode, tableName, columnNames, (DataSetType<T>) DataSetType.createDefault(Record.class));
50+
}
51+
52+
public TableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) {
53+
super(type);
54+
this.tableName = tableName;
55+
this.columnNames = columnNames == null ? null : java.util.Arrays.copyOf(columnNames, columnNames.length);
56+
this.props = new Properties();
57+
if (props != null) {
58+
this.props.putAll(props);
59+
}
60+
this.mode = mode;
61+
}
62+
63+
/**
64+
* Copies an instance (exclusive of broadcasts).
65+
*
66+
* @param that that should be copied
67+
*/
68+
public TableSink(TableSink<T> that) {
69+
super(that);
70+
this.tableName = that.getTableName();
71+
this.columnNames = that.getColumnNames();
72+
this.props = that.getProperties();
73+
this.mode = that.getMode();
74+
}
75+
76+
public String getTableName() {
77+
return this.tableName;
78+
}
79+
80+
protected void setColumnNames(String[] columnNames) {
81+
this.columnNames = columnNames == null ? null : java.util.Arrays.copyOf(columnNames, columnNames.length);
82+
}
83+
84+
public String[] getColumnNames() {
85+
return this.columnNames == null ? null : java.util.Arrays.copyOf(this.columnNames, this.columnNames.length);
86+
}
87+
88+
public Properties getProperties() {
89+
Properties copy = new Properties();
90+
copy.putAll(this.props);
91+
return copy;
92+
}
93+
94+
public String getMode() {
95+
return mode;
96+
}
97+
98+
public void setMode(String mode) {
99+
this.mode = mode;
100+
}
101+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.util;
20+
21+
/**
22+
* Internal representation of database products to avoid external dependencies
23+
* in wayang-basic.
24+
*/
25+
public enum DatabaseProduct {
26+
POSTGRESQL,
27+
MYSQL,
28+
ORACLE,
29+
SQLITE,
30+
H2,
31+
DERBY,
32+
MSSQL,
33+
UNKNOWN
34+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.util;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import java.sql.Date;
23+
import java.sql.Timestamp;
24+
import java.time.LocalDate;
25+
import java.time.LocalDateTime;
26+
import java.util.ArrayList;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
/**
32+
* Utility for mapping Java types to SQL types across different dialects.
33+
*/
34+
public class SqlTypeUtils {
35+
36+
private static final Map<DatabaseProduct, Map<Class<?>, String>> dialectTypeMaps = new HashMap<>();
37+
38+
static {
39+
// Default mappings (Standard SQL)
40+
Map<Class<?>, String> defaultMap = new HashMap<>();
41+
defaultMap.put(Integer.class, "INT");
42+
defaultMap.put(int.class, "INT");
43+
defaultMap.put(Long.class, "BIGINT");
44+
defaultMap.put(long.class, "BIGINT");
45+
defaultMap.put(Double.class, "DOUBLE");
46+
defaultMap.put(double.class, "DOUBLE");
47+
defaultMap.put(Float.class, "FLOAT");
48+
defaultMap.put(float.class, "FLOAT");
49+
defaultMap.put(Boolean.class, "BOOLEAN");
50+
defaultMap.put(boolean.class, "BOOLEAN");
51+
defaultMap.put(String.class, "VARCHAR(255)");
52+
defaultMap.put(Date.class, "DATE");
53+
defaultMap.put(LocalDate.class, "DATE");
54+
defaultMap.put(Timestamp.class, "TIMESTAMP");
55+
defaultMap.put(LocalDateTime.class, "TIMESTAMP");
56+
57+
dialectTypeMaps.put(DatabaseProduct.UNKNOWN, defaultMap);
58+
59+
// PostgreSQL Overrides
60+
Map<Class<?>, String> pgMap = new HashMap<>(defaultMap);
61+
pgMap.put(Double.class, "DOUBLE PRECISION");
62+
pgMap.put(double.class, "DOUBLE PRECISION");
63+
dialectTypeMaps.put(DatabaseProduct.POSTGRESQL, pgMap);
64+
65+
// Add more dialects here as needed (MySQL, Oracle, etc.)
66+
}
67+
68+
/**
69+
* Detects the database product from a JDBC URL.
70+
*
71+
* @param url JDBC URL
72+
* @return detected DatabaseProduct
73+
*/
74+
public static DatabaseProduct detectProduct(String url) {
75+
if (url == null)
76+
return DatabaseProduct.UNKNOWN;
77+
String lowerUrl = url.toLowerCase();
78+
if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres"))
79+
return DatabaseProduct.POSTGRESQL;
80+
if (lowerUrl.contains("mysql"))
81+
return DatabaseProduct.MYSQL;
82+
if (lowerUrl.contains("oracle"))
83+
return DatabaseProduct.ORACLE;
84+
if (lowerUrl.contains("sqlite")) {
85+
return DatabaseProduct.SQLITE;
86+
}
87+
if (lowerUrl.contains("h2"))
88+
return DatabaseProduct.H2;
89+
if (lowerUrl.contains("derby"))
90+
return DatabaseProduct.DERBY;
91+
if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver"))
92+
return DatabaseProduct.MSSQL;
93+
return DatabaseProduct.UNKNOWN;
94+
}
95+
96+
/**
97+
* Returns the SQL type for a given Java class and database product.
98+
*
99+
* @param cls Java class
100+
* @param product database product
101+
* @return SQL type string
102+
*/
103+
public static String getSqlType(Class<?> cls, DatabaseProduct product) {
104+
Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product,
105+
dialectTypeMaps.get(DatabaseProduct.UNKNOWN));
106+
return typeMap.getOrDefault(cls, "VARCHAR(255)");
107+
}
108+
109+
/**
110+
* Extracts schema information from a POJO class or a Record.
111+
*
112+
* @param cls POJO class
113+
* @param product database product
114+
* @return a list of schema fields
115+
*/
116+
public static List<SchemaField> getSchema(Class<?> cls, DatabaseProduct product) {
117+
List<SchemaField> schema = new ArrayList<>();
118+
if (cls == Record.class) {
119+
// For Record.class without an instance, we can't derive names/types easily
120+
// Users should use the instance-based getSchema or provide columnNames
121+
return schema;
122+
}
123+
124+
for (java.lang.reflect.Method method : cls.getMethods()) {
125+
if (java.lang.reflect.Modifier.isStatic(method.getModifiers()) ||
126+
method.getParameterCount() > 0 ||
127+
method.getReturnType() == void.class ||
128+
method.getName().equals("getClass")) {
129+
continue;
130+
}
131+
132+
String name = method.getName();
133+
String propertyName = null;
134+
if (name.startsWith("get") && name.length() > 3) {
135+
propertyName = Character.toLowerCase(name.charAt(3)) + name.substring(4);
136+
} else if (name.startsWith("is") && name.length() > 2
137+
&& (method.getReturnType() == boolean.class || method.getReturnType() == Boolean.class)) {
138+
propertyName = Character.toLowerCase(name.charAt(2)) + name.substring(3);
139+
}
140+
141+
if (propertyName != null) {
142+
schema.add(new SchemaField(propertyName, method.getReturnType(),
143+
getSqlType(method.getReturnType(), product)));
144+
}
145+
}
146+
schema.sort(java.util.Comparator.comparing(SchemaField::getName));
147+
return schema;
148+
}
149+
150+
/**
151+
* Extracts schema information from a Record instance by inspecting its fields.
152+
*
153+
* @param record representative record
154+
* @param product database product
155+
* @param userNames optional user-provided column names
156+
* @return a list of schema fields
157+
*/
158+
public static List<SchemaField> getSchema(Record record, DatabaseProduct product, String[] userNames) {
159+
List<SchemaField> schema = new ArrayList<>();
160+
if (record == null)
161+
return schema;
162+
163+
int size = record.size();
164+
for (int i = 0; i < size; i++) {
165+
String name = (userNames != null && i < userNames.length) ? userNames[i] : "c_" + i;
166+
Object val = record.getField(i);
167+
Class<?> typeClass = val != null ? val.getClass() : String.class;
168+
String type = getSqlType(typeClass, product);
169+
schema.add(new SchemaField(name, typeClass, type));
170+
}
171+
return schema;
172+
}
173+
174+
public static class SchemaField {
175+
private final String name;
176+
private final Class<?> javaClass;
177+
private final String sqlType;
178+
179+
public SchemaField(String name, Class<?> javaClass, String sqlType) {
180+
this.name = name;
181+
this.javaClass = javaClass;
182+
this.sqlType = sqlType;
183+
}
184+
185+
public String getName() {
186+
return name;
187+
}
188+
189+
public Class<?> getJavaClass() {
190+
return javaClass;
191+
}
192+
193+
public String getSqlType() {
194+
return sqlType;
195+
}
196+
}
197+
}

0 commit comments

Comments
 (0)