Skip to content

Commit 2d453ed

Browse files
committed
hardening table sink abstraction, implementation and testing
1 parent b2a59c4 commit 2d453ed

12 files changed

Lines changed: 280 additions & 211 deletions

File tree

wayang-commons/wayang-basic/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,6 @@
120120
<version>20231013</version>
121121
</dependency>
122122

123-
<dependency>
124-
<groupId>org.apache.calcite</groupId>
125-
<artifactId>calcite-core</artifactId>
126-
<version>${calcite.version}</version>
127-
</dependency>
128123
<dependency>
129124
<groupId>com.azure</groupId>
130125
<artifactId>azure-storage-blob</artifactId>

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/TableSink.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,11 @@ public TableSink(Properties props, String mode, String tableName, String... colu
5252
public TableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) {
5353
super(type);
5454
this.tableName = tableName;
55-
this.columnNames = columnNames;
56-
this.props = props;
55+
this.columnNames = columnNames == null ? null : java.util.Arrays.copyOf(columnNames, columnNames.length);
56+
this.props = new Properties();
57+
if (props != null) {
58+
this.props.putAll(props);
59+
}
5760
this.mode = mode;
5861
}
5962

@@ -75,15 +78,17 @@ public String getTableName() {
7578
}
7679

7780
protected void setColumnNames(String[] columnNames) {
78-
this.columnNames = columnNames;
81+
this.columnNames = columnNames == null ? null : java.util.Arrays.copyOf(columnNames, columnNames.length);
7982
}
8083

8184
public String[] getColumnNames() {
82-
return this.columnNames;
85+
return this.columnNames == null ? null : java.util.Arrays.copyOf(this.columnNames, this.columnNames.length);
8386
}
8487

8588
public Properties getProperties() {
86-
return this.props;
89+
Properties copy = new Properties();
90+
copy.putAll(this.props);
91+
return copy;
8792
}
8893

8994
public String getMode() {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.util;
20+
21+
/**
22+
* Internal representation of database products to avoid external dependencies
23+
* in wayang-basic.
24+
*/
25+
public enum DatabaseProduct {
26+
POSTGRESQL,
27+
MYSQL,
28+
ORACLE,
29+
SQLITE,
30+
H2,
31+
DERBY,
32+
MSSQL,
33+
UNKNOWN
34+
}

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/util/SqlTypeUtils.java

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818

1919
package org.apache.wayang.basic.util;
2020

21-
import org.apache.calcite.sql.SqlDialect;
2221
import org.apache.wayang.basic.data.Record;
23-
24-
import java.lang.reflect.Field;
2522
import java.sql.Date;
2623
import java.sql.Timestamp;
2724
import java.time.LocalDate;
@@ -36,7 +33,7 @@
3633
*/
3734
public class SqlTypeUtils {
3835

39-
private static final Map<SqlDialect.DatabaseProduct, Map<Class<?>, String>> dialectTypeMaps = new HashMap<>();
36+
private static final Map<DatabaseProduct, Map<Class<?>, String>> dialectTypeMaps = new HashMap<>();
4037

4138
static {
4239
// Default mappings (Standard SQL)
@@ -57,13 +54,13 @@ public class SqlTypeUtils {
5754
defaultMap.put(Timestamp.class, "TIMESTAMP");
5855
defaultMap.put(LocalDateTime.class, "TIMESTAMP");
5956

60-
dialectTypeMaps.put(SqlDialect.DatabaseProduct.UNKNOWN, defaultMap);
57+
dialectTypeMaps.put(DatabaseProduct.UNKNOWN, defaultMap);
6158

6259
// PostgreSQL Overrides
6360
Map<Class<?>, String> pgMap = new HashMap<>(defaultMap);
6461
pgMap.put(Double.class, "DOUBLE PRECISION");
6562
pgMap.put(double.class, "DOUBLE PRECISION");
66-
dialectTypeMaps.put(SqlDialect.DatabaseProduct.POSTGRESQL, pgMap);
63+
dialectTypeMaps.put(DatabaseProduct.POSTGRESQL, pgMap);
6764

6865
// Add more dialects here as needed (MySQL, Oracle, etc.)
6966
}
@@ -74,30 +71,26 @@ public class SqlTypeUtils {
7471
* @param url JDBC URL
7572
* @return detected DatabaseProduct
7673
*/
77-
public static SqlDialect.DatabaseProduct detectProduct(String url) {
74+
public static DatabaseProduct detectProduct(String url) {
7875
if (url == null)
79-
return SqlDialect.DatabaseProduct.UNKNOWN;
76+
return DatabaseProduct.UNKNOWN;
8077
String lowerUrl = url.toLowerCase();
8178
if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres"))
82-
return SqlDialect.DatabaseProduct.POSTGRESQL;
79+
return DatabaseProduct.POSTGRESQL;
8380
if (lowerUrl.contains("mysql"))
84-
return SqlDialect.DatabaseProduct.MYSQL;
81+
return DatabaseProduct.MYSQL;
8582
if (lowerUrl.contains("oracle"))
86-
return SqlDialect.DatabaseProduct.ORACLE;
83+
return DatabaseProduct.ORACLE;
8784
if (lowerUrl.contains("sqlite")) {
88-
try {
89-
return SqlDialect.DatabaseProduct.valueOf("SQLITE");
90-
} catch (Exception e) {
91-
return SqlDialect.DatabaseProduct.UNKNOWN;
92-
}
85+
return DatabaseProduct.SQLITE;
9386
}
9487
if (lowerUrl.contains("h2"))
95-
return SqlDialect.DatabaseProduct.H2;
88+
return DatabaseProduct.H2;
9689
if (lowerUrl.contains("derby"))
97-
return SqlDialect.DatabaseProduct.DERBY;
90+
return DatabaseProduct.DERBY;
9891
if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver"))
99-
return SqlDialect.DatabaseProduct.MSSQL;
100-
return SqlDialect.DatabaseProduct.UNKNOWN;
92+
return DatabaseProduct.MSSQL;
93+
return DatabaseProduct.UNKNOWN;
10194
}
10295

10396
/**
@@ -107,9 +100,9 @@ public static SqlDialect.DatabaseProduct detectProduct(String url) {
107100
* @param product database product
108101
* @return SQL type string
109102
*/
110-
public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct product) {
103+
public static String getSqlType(Class<?> cls, DatabaseProduct product) {
111104
Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product,
112-
dialectTypeMaps.get(SqlDialect.DatabaseProduct.UNKNOWN));
105+
dialectTypeMaps.get(DatabaseProduct.UNKNOWN));
113106
return typeMap.getOrDefault(cls, "VARCHAR(255)");
114107
}
115108

@@ -120,20 +113,37 @@ public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct product
120113
* @param product database product
121114
* @return a list of schema fields
122115
*/
123-
public static List<SchemaField> getSchema(Class<?> cls, SqlDialect.DatabaseProduct product) {
116+
public static List<SchemaField> getSchema(Class<?> cls, DatabaseProduct product) {
124117
List<SchemaField> schema = new ArrayList<>();
125118
if (cls == Record.class) {
126119
// For Record.class without an instance, we can't derive names/types easily
127120
// Users should use the instance-based getSchema or provide columnNames
128121
return schema;
129122
}
130123

131-
for (Field field : cls.getDeclaredFields()) {
132-
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
124+
for (java.lang.reflect.Method method : cls.getMethods()) {
125+
if (java.lang.reflect.Modifier.isStatic(method.getModifiers()) ||
126+
method.getParameterCount() > 0 ||
127+
method.getReturnType() == void.class ||
128+
method.getName().equals("getClass")) {
133129
continue;
134130
}
135-
schema.add(new SchemaField(field.getName(), field.getType(), getSqlType(field.getType(), product)));
131+
132+
String name = method.getName();
133+
String propertyName = null;
134+
if (name.startsWith("get") && name.length() > 3) {
135+
propertyName = Character.toLowerCase(name.charAt(3)) + name.substring(4);
136+
} else if (name.startsWith("is") && name.length() > 2
137+
&& (method.getReturnType() == boolean.class || method.getReturnType() == Boolean.class)) {
138+
propertyName = Character.toLowerCase(name.charAt(2)) + name.substring(3);
139+
}
140+
141+
if (propertyName != null) {
142+
schema.add(new SchemaField(propertyName, method.getReturnType(),
143+
getSqlType(method.getReturnType(), product)));
144+
}
136145
}
146+
schema.sort(java.util.Comparator.comparing(SchemaField::getName));
137147
return schema;
138148
}
139149

@@ -145,7 +155,7 @@ public static List<SchemaField> getSchema(Class<?> cls, SqlDialect.DatabaseProdu
145155
* @param userNames optional user-provided column names
146156
* @return a list of schema fields
147157
*/
148-
public static List<SchemaField> getSchema(Record record, SqlDialect.DatabaseProduct product, String[] userNames) {
158+
public static List<SchemaField> getSchema(Record record, DatabaseProduct product, String[] userNames) {
149159
List<SchemaField> schema = new ArrayList<>();
150160
if (record == null)
151161
return schema;

wayang-commons/wayang-basic/src/test/java/org/apache/wayang/basic/util/SqlTypeUtilsTest.java

Lines changed: 60 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,9 @@
1818

1919
package org.apache.wayang.basic.util;
2020

21-
import org.apache.calcite.sql.SqlDialect;
2221
import org.apache.wayang.basic.data.Record;
2322
import org.junit.jupiter.api.Test;
2423

25-
import java.sql.Date;
26-
import java.sql.Timestamp;
2724
import java.util.List;
2825

2926
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -32,60 +29,68 @@ public class SqlTypeUtilsTest {
3229

3330
@Test
3431
public void testDetectProduct() {
35-
assertEquals(SqlDialect.DatabaseProduct.POSTGRESQL,
32+
assertEquals(DatabaseProduct.POSTGRESQL,
3633
SqlTypeUtils.detectProduct("jdbc:postgresql://localhost:5432/db"));
37-
assertEquals(SqlDialect.DatabaseProduct.MYSQL, SqlTypeUtils.detectProduct("jdbc:mysql://localhost:3306/db"));
38-
assertEquals(SqlDialect.DatabaseProduct.ORACLE,
34+
assertEquals(DatabaseProduct.MYSQL,
35+
SqlTypeUtils.detectProduct("jdbc:mysql://localhost:3306/db"));
36+
assertEquals(DatabaseProduct.ORACLE,
3937
SqlTypeUtils.detectProduct("jdbc:oracle:thin:@localhost:1521:xe"));
40-
assertEquals(SqlDialect.DatabaseProduct.H2, SqlTypeUtils.detectProduct("jdbc:h2:mem:test"));
41-
assertEquals(SqlDialect.DatabaseProduct.DERBY,
38+
assertEquals(DatabaseProduct.SQLITE,
39+
SqlTypeUtils.detectProduct("jdbc:sqlite:test.db"));
40+
assertEquals(DatabaseProduct.H2,
41+
SqlTypeUtils.detectProduct("jdbc:h2:mem:test"));
42+
assertEquals(DatabaseProduct.DERBY,
4243
SqlTypeUtils.detectProduct("jdbc:derby:memory:test;create=true"));
43-
assertEquals(SqlDialect.DatabaseProduct.MSSQL,
44+
assertEquals(DatabaseProduct.MSSQL,
4445
SqlTypeUtils.detectProduct("jdbc:sqlserver://localhost:1433;databaseName=db"));
45-
assertEquals(SqlDialect.DatabaseProduct.UNKNOWN, SqlTypeUtils.detectProduct("jdbc:unknown:db"));
46+
assertEquals(DatabaseProduct.UNKNOWN, SqlTypeUtils.detectProduct("jdbc:unknown:db"));
4647
}
4748

4849
@Test
4950
public void testGetSqlTypeDefault() {
50-
SqlDialect.DatabaseProduct product = SqlDialect.DatabaseProduct.UNKNOWN;
51-
assertEquals("INT", SqlTypeUtils.getSqlType(Integer.class, product));
52-
assertEquals("INT", SqlTypeUtils.getSqlType(int.class, product));
53-
assertEquals("BIGINT", SqlTypeUtils.getSqlType(Long.class, product));
54-
assertEquals("DOUBLE", SqlTypeUtils.getSqlType(Double.class, product));
55-
assertEquals("VARCHAR(255)", SqlTypeUtils.getSqlType(String.class, product));
56-
assertEquals("DATE", SqlTypeUtils.getSqlType(Date.class, product));
57-
assertEquals("TIMESTAMP", SqlTypeUtils.getSqlType(Timestamp.class, product));
51+
assertEquals("INT", SqlTypeUtils.getSqlType(Integer.class, DatabaseProduct.UNKNOWN));
52+
assertEquals("INT", SqlTypeUtils.getSqlType(int.class, DatabaseProduct.UNKNOWN));
53+
assertEquals("BIGINT", SqlTypeUtils.getSqlType(Long.class, DatabaseProduct.UNKNOWN));
54+
assertEquals("DOUBLE", SqlTypeUtils.getSqlType(Double.class, DatabaseProduct.UNKNOWN));
55+
assertEquals("VARCHAR(255)", SqlTypeUtils.getSqlType(String.class, DatabaseProduct.UNKNOWN));
56+
assertEquals("DATE", SqlTypeUtils.getSqlType(java.sql.Date.class, DatabaseProduct.UNKNOWN));
57+
assertEquals("TIMESTAMP", SqlTypeUtils.getSqlType(java.sql.Timestamp.class, DatabaseProduct.UNKNOWN));
5858
}
5959

6060
@Test
61-
public void testGetSqlTypePostgres() {
62-
SqlDialect.DatabaseProduct product = SqlDialect.DatabaseProduct.POSTGRESQL;
63-
assertEquals("INT", SqlTypeUtils.getSqlType(Integer.class, product));
64-
assertEquals("DOUBLE PRECISION", SqlTypeUtils.getSqlType(Double.class, product));
65-
assertEquals("DOUBLE PRECISION", SqlTypeUtils.getSqlType(double.class, product));
66-
assertEquals("VARCHAR(255)", SqlTypeUtils.getSqlType(String.class, product));
61+
public void testPostgresqlOverrides() {
62+
assertEquals("INT", SqlTypeUtils.getSqlType(Integer.class, DatabaseProduct.POSTGRESQL));
63+
assertEquals("DOUBLE PRECISION", SqlTypeUtils.getSqlType(Double.class, DatabaseProduct.POSTGRESQL));
64+
assertEquals("DOUBLE PRECISION", SqlTypeUtils.getSqlType(double.class, DatabaseProduct.POSTGRESQL));
65+
assertEquals("VARCHAR(255)", SqlTypeUtils.getSqlType(String.class, DatabaseProduct.POSTGRESQL));
6766
}
6867

6968
@Test
7069
public void testGetSchema() {
7170
List<SqlTypeUtils.SchemaField> schema = SqlTypeUtils.getSchema(TestPojo.class,
72-
SqlDialect.DatabaseProduct.POSTGRESQL);
73-
assertEquals(3, schema.size());
71+
DatabaseProduct.POSTGRESQL);
72+
// id, name, value, active (from getter/is)
73+
assertEquals(4, schema.size());
7474

75-
assertEquals("id", schema.get(0).getName());
76-
assertEquals("INT", schema.get(0).getSqlType());
75+
schema.sort((f1, f2) -> f1.getName().compareTo(f2.getName()));
7776

78-
assertEquals("name", schema.get(1).getName());
79-
assertEquals("VARCHAR(255)", schema.get(1).getSqlType());
77+
assertEquals("active", schema.get(0).getName());
78+
assertEquals("BOOLEAN", schema.get(0).getSqlType());
8079

81-
assertEquals("value", schema.get(2).getName());
82-
assertEquals("DOUBLE PRECISION", schema.get(2).getSqlType());
80+
assertEquals("id", schema.get(1).getName());
81+
assertEquals("INT", schema.get(1).getSqlType());
82+
83+
assertEquals("name", schema.get(2).getName());
84+
assertEquals("VARCHAR(255)", schema.get(2).getSqlType());
85+
86+
assertEquals("value", schema.get(3).getName());
87+
assertEquals("DOUBLE PRECISION", schema.get(3).getSqlType());
8388
}
8489

8590
@Test
8691
public void testGetSchemaRecord() {
8792
Record record = new Record(1, "test", 1.5);
88-
List<SqlTypeUtils.SchemaField> schema = SqlTypeUtils.getSchema(record, SqlDialect.DatabaseProduct.POSTGRESQL,
93+
List<SqlTypeUtils.SchemaField> schema = SqlTypeUtils.getSchema(record, DatabaseProduct.POSTGRESQL,
8994
null);
9095

9196
assertEquals(3, schema.size());
@@ -106,7 +111,7 @@ public void testGetSchemaRecord() {
106111
public void testGetSchemaRecordWithNames() {
107112
Record record = new Record(1, "test");
108113
String[] names = { "id", "description" };
109-
List<SqlTypeUtils.SchemaField> schema = SqlTypeUtils.getSchema(record, SqlDialect.DatabaseProduct.POSTGRESQL,
114+
List<SqlTypeUtils.SchemaField> schema = SqlTypeUtils.getSchema(record, DatabaseProduct.POSTGRESQL,
110115
names);
111116

112117
assertEquals(2, schema.size());
@@ -115,8 +120,26 @@ public void testGetSchemaRecordWithNames() {
115120
}
116121

117122
public static class TestPojo {
118-
public int id;
119-
public String name;
120-
public Double value;
123+
private int id;
124+
private String name;
125+
private Double value;
126+
private boolean active;
127+
private String hidden;
128+
129+
public int getId() {
130+
return id;
131+
}
132+
133+
public String getName() {
134+
return name;
135+
}
136+
137+
public Double getValue() {
138+
return value;
139+
}
140+
141+
public boolean isActive() {
142+
return active;
143+
}
121144
}
122145
}

wayang-platforms/wayang-java/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,6 @@
7878
<artifactId>log4j-slf4j-impl</artifactId>
7979
<version>2.20.0</version>
8080
</dependency>
81-
<dependency>
82-
<groupId>org.postgresql</groupId>
83-
<artifactId>postgresql</artifactId>
84-
<version>42.7.2</version>
85-
<scope>test</scope>
86-
</dependency>
8781
<!-- Mockito for mocking -->
8882
<dependency>
8983
<groupId>com.h2database</groupId>

0 commit comments

Comments
 (0)