Skip to content

Commit ef76d4d

Browse files
yijiacui-dbtdas
authored andcommitted
[SC-69796][Delta] Fix JavaDeltaTableBuilderSuite and JavaDeltaTableSuite in OSS
## What changes were proposed in this pull request? Fix JavaDeltaTableBuilderSuite and JavaDeltaTableSuite in OSS ## How was this patch tested? Unit test only Author: Yijia Cui <[email protected]> #22428 is resolved by yijiacui-db/SC-69796-java-suite. GitOrigin-RevId: 26053a0d4e517e7d3f472175b25910fa03b04948
1 parent e36dc6b commit ef76d4d

File tree

4 files changed

+608
-19
lines changed

4 files changed

+608
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.tables;
18+
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.spark.sql.delta.DeltaLog;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.spark.sql.*;
27+
28+
import org.apache.spark.util.Utils;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.apache.spark.sql.delta.DeltaSQLCommandJavaTest;
33+
34+
import static org.apache.spark.sql.types.DataTypes.*;
35+
36+
public class JavaDeltaTableBuilderSuite implements DeltaSQLCommandJavaTest {
37+
38+
private transient SparkSession spark;
39+
private transient String input;
40+
41+
42+
@Before
43+
public void setUp() {
44+
// Trigger static initializer of TestData
45+
spark = buildSparkSession();
46+
}
47+
48+
@After
49+
public void tearDown() {
50+
if (spark != null) {
51+
spark.stop();
52+
spark = null;
53+
}
54+
}
55+
56+
private DeltaTable buildTable(DeltaTableBuilder builder) {
57+
return builder.addColumn("c1", "int")
58+
.addColumn("c2", IntegerType)
59+
.addColumn("c3", "string", false)
60+
.addColumn("c4", StringType, true)
61+
.addColumn(DeltaTable.columnBuilder(spark, "c5")
62+
.dataType("bigint")
63+
.comment("foo")
64+
.nullable(false)
65+
.build()
66+
)
67+
.addColumn(DeltaTable.columnBuilder(spark, "c6")
68+
.dataType(LongType)
69+
.generatedAlwaysAs("c5 + 10")
70+
.build()
71+
).execute();
72+
}
73+
74+
private DeltaTable createTable(boolean ifNotExists, String tableName) {
75+
DeltaTableBuilder builder;
76+
if (ifNotExists) {
77+
builder = DeltaTable.createIfNotExists();
78+
} else {
79+
builder = DeltaTable.create();
80+
}
81+
if (tableName.startsWith("delta.`")) {
82+
tableName = tableName.substring("delta.`".length());
83+
String location = tableName.substring(0, tableName.length() - 1);
84+
builder = builder.location(location);
85+
DeltaLog.forTable(spark, location).clearCache();
86+
} else {
87+
builder = builder.tableName(tableName);
88+
DeltaLog.forTable(spark, new Path(tableName)).clearCache();
89+
}
90+
return buildTable(builder);
91+
}
92+
93+
private DeltaTable replaceTable(boolean orCreate, String tableName) {
94+
DeltaTableBuilder builder;
95+
if (orCreate) {
96+
builder = DeltaTable.createOrReplace();
97+
} else {
98+
builder = DeltaTable.replace();
99+
}
100+
if (tableName.startsWith("delta.`")) {
101+
tableName = tableName.substring("delta.`".length());
102+
String location = tableName.substring(0, tableName.length() - 1);
103+
builder = builder.location(location);
104+
} else {
105+
builder = builder.tableName(tableName);
106+
}
107+
return buildTable(builder);
108+
}
109+
110+
private void verifyGeneratedColumn(String tableName, DeltaTable deltaTable) {
111+
String cmd = String.format("INSERT INTO %s (c1, c2, c3, c4, c5, c6) %s", tableName,
112+
"VALUES (1, 2, 'a', 'c', 1, 11)");
113+
spark.sql(cmd);
114+
Map<String, String> set = new HashMap<String, String>() {{
115+
put("c5", "10");
116+
}};
117+
deltaTable.updateExpr("c6 = 11", set);
118+
assert(deltaTable.toDF().select("c6").collectAsList().get(0).getLong(0) == 20);
119+
}
120+
121+
@Test
122+
public void testCreateTable() {
123+
try {
124+
// Test creating DeltaTable by name
125+
DeltaTable table = createTable(false, "deltaTable");
126+
verifyGeneratedColumn("deltaTable", table);
127+
} finally {
128+
spark.sql("DROP TABLE IF EXISTS deltaTable");
129+
}
130+
// Test creating DeltaTable by path.
131+
String input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input")
132+
.toString();
133+
DeltaTable table2 = createTable(false, String.format("delta.`%s`", input));
134+
verifyGeneratedColumn(String.format("delta.`%s`", input), table2);
135+
}
136+
137+
@Test
138+
public void testCreateTableIfNotExists() {
139+
// Ignore table creation if already exsits.
140+
List<String> data = Arrays.asList("hello", "world");
141+
Dataset<Row> dataDF = spark.createDataset(data, Encoders.STRING()).toDF();
142+
try {
143+
// Test creating DeltaTable by name - not exists.
144+
DeltaTable table = createTable(true, "deltaTable");
145+
verifyGeneratedColumn("deltaTable", table);
146+
147+
dataDF.write().format("delta").mode("overwrite").saveAsTable("deltaTable2");
148+
149+
// Table 2 should be the old table saved by path.
150+
DeltaTable table2 = DeltaTable.createIfNotExists().tableName("deltaTable2")
151+
.addColumn("value", "string")
152+
.execute();
153+
QueryTest$.MODULE$.checkAnswer(table2.toDF(), dataDF.collectAsList());
154+
} finally {
155+
spark.sql("DROP TABLE IF EXISTS deltaTable");
156+
spark.sql("DROP TABLE IF EXISTS deltaTable2");
157+
}
158+
// Test creating DeltaTable by path.
159+
String input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input")
160+
.toString();
161+
dataDF.write().format("delta").mode("overwrite").save(input);
162+
DeltaTable table = createTable(true, String.format("delta.`%s`", input));
163+
QueryTest$.MODULE$.checkAnswer(table.toDF(), dataDF.collectAsList());
164+
}
165+
166+
@Test
167+
public void testCreateTableWithExistingSchema() {
168+
try {
169+
// Test create table with an existing schema.
170+
List<String> data = Arrays.asList("hello", "world");
171+
Dataset<Row> dataDF = spark.createDataset(data, Encoders.STRING()).toDF();
172+
173+
DeltaLog.forTable(spark, new Path("deltaTable")).clearCache();
174+
DeltaTable table = DeltaTable.create().tableName("deltaTable")
175+
.addColumns(dataDF.schema())
176+
.execute();
177+
dataDF.write().format("delta").mode("append").saveAsTable("deltaTable");
178+
179+
QueryTest$.MODULE$.checkAnswer(table.toDF(), dataDF.collectAsList());
180+
} finally {
181+
spark.sql("DROP TABLE IF EXISTS deltaTable");
182+
}
183+
}
184+
185+
@Test
186+
public void testReplaceTable() {
187+
try {
188+
// create a table first
189+
spark.sql("CREATE TABLE deltaTable (col1 int) USING delta");
190+
// Test replacing DeltaTable by name
191+
DeltaTable table = replaceTable(false, "deltaTable");
192+
verifyGeneratedColumn("deltaTable", table);
193+
} finally {
194+
spark.sql("DROP TABLE IF EXISTS deltaTable");
195+
}
196+
String input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input")
197+
.toString();
198+
List<String> data = Arrays.asList("hello", "world");
199+
Dataset<Row> dataDF = spark.createDataset(data, Encoders.STRING()).toDF();
200+
dataDF.write().format("delta").mode("overwrite").save(input);
201+
DeltaTable table = replaceTable(false, String.format("delta.`%s`", input));
202+
verifyGeneratedColumn(String.format("delta.`%s`", input), table);
203+
}
204+
205+
@Test
206+
public void testCreateOrReplaceTable() {
207+
try {
208+
// Test creating DeltaTable by name if table to be replaced does not exist.
209+
DeltaTable table = replaceTable(true, "deltaTable");
210+
verifyGeneratedColumn("deltaTable", table);
211+
} finally {
212+
spark.sql("DROP TABLE IF EXISTS deltaTable");
213+
}
214+
}
215+
}

core/src/test/java/io/delta/tables/JavaDeltaTableSuite.java

+32-18
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,18 @@
2727
import org.junit.Assert;
2828
import org.junit.Before;
2929
import org.junit.Test;
30+
import org.apache.spark.sql.delta.DeltaSQLCommandJavaTest;
3031

31-
public class JavaDeltaTableSuite {
32+
public class JavaDeltaTableSuite implements DeltaSQLCommandJavaTest {
3233

33-
private transient TestSparkSession spark;
34+
private transient SparkSession spark;
3435
private transient String input;
3536

3637

3738
@Before
3839
public void setUp() {
3940
// Trigger static initializer of TestData
40-
spark = new TestSparkSession();
41+
spark = buildSparkSession();
4142
}
4243

4344
@After
@@ -50,25 +51,38 @@ public void tearDown() {
5051

5152
@Test
5253
public void testAPI() {
53-
String input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString();
54-
List<String> data = Arrays.asList("hello", "world");
55-
Dataset<Row> dataDF = spark.createDataset(data, Encoders.STRING()).toDF();
56-
List<Row> dataRows = dataDF.collectAsList();
57-
dataDF.write().format("delta").mode("overwrite").save(input);
54+
try {
55+
String input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString();
56+
List<String> data = Arrays.asList("hello", "world");
57+
Dataset<Row> dataDF = spark.createDataset(data, Encoders.STRING()).toDF();
58+
List<Row> dataRows = dataDF.collectAsList();
59+
dataDF.write().format("delta").mode("overwrite").save(input);
5860

59-
// Test creating DeltaTable by path
60-
DeltaTable table1 = DeltaTable.forPath(spark, input);
61-
QueryTest$.MODULE$.checkAnswer(table1.toDF(), dataRows);
61+
// Test creating DeltaTable by path
62+
DeltaTable table1 = DeltaTable.forPath(spark, input);
63+
QueryTest$.MODULE$.checkAnswer(table1.toDF(), dataRows);
6264

63-
// Test creating DeltaTable by path picks up active SparkSession
64-
DeltaTable table2 = DeltaTable.forPath(input);
65-
QueryTest$.MODULE$.checkAnswer(table2.toDF(), dataRows);
65+
// Test creating DeltaTable by path picks up active SparkSession
66+
DeltaTable table2 = DeltaTable.forPath(input);
67+
QueryTest$.MODULE$.checkAnswer(table2.toDF(), dataRows);
6668

69+
dataDF.write().format("delta").mode("overwrite").saveAsTable("deltaTable");
6770

68-
// Test DeltaTable.as() creates subquery alias
69-
QueryTest$.MODULE$.checkAnswer(table2.as("tbl").toDF().select("tbl.value"), dataRows);
71+
// Test creating DeltaTable by name
72+
DeltaTable table3 = DeltaTable.forName(spark, "deltaTable");
73+
QueryTest$.MODULE$.checkAnswer(table3.toDF(), dataRows);
7074

71-
// Test DeltaTable.isDeltaTable() is true for a Delta file path.
72-
Assert.assertTrue(DeltaTable.isDeltaTable(input));
75+
// Test creating DeltaTable by name
76+
DeltaTable table4 = DeltaTable.forName("deltaTable");
77+
QueryTest$.MODULE$.checkAnswer(table4.toDF(), dataRows);
78+
79+
// Test DeltaTable.as() creates subquery alias
80+
QueryTest$.MODULE$.checkAnswer(table2.as("tbl").toDF().select("tbl.value"), dataRows);
81+
82+
// Test DeltaTable.isDeltaTable() is true for a Delta file path.
83+
Assert.assertTrue(DeltaTable.isDeltaTable(input));
84+
} finally {
85+
spark.sql("DROP TABLE IF EXISTS deltaTable");
86+
}
7387
}
7488
}

core/src/test/java/org/apache/spark/sql/delta/DeltaSQLCommandJavaTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import org.apache.spark.sql.SparkSession;
2020

21-
interface DeltaSQLCommandJavaTest {
21+
public interface DeltaSQLCommandJavaTest {
2222
default SparkSession buildSparkSession() {
2323
// Set the configurations as DeltaSQLCommandTest
2424
return SparkSession.builder()
2525
.appName("JavaDeltaSparkSessionExtensionSuiteUsingSQLConf")
2626
.master("local[2]")
2727
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
28+
.config("spark.sql.catalog.spark_catalog",
29+
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
2830
.getOrCreate();
2931
}
3032
}

0 commit comments

Comments
 (0)