Skip to content

Commit 8428e88

Browse files
[VARIANT] Add variant integration tests (#3220)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description adds an integration test for variant to create and insert variant values using sql and scala API ## How was this patch tested? test only change. Haven't been able to run this in the integration testing framework yet. However, manually tested this in spark shell using instructions [here](https://docs.google.com/document/d/1gpxcKZitSGf0K217kJa8eOaq5y-BDnDKpcuQ3JCjNCE/edit#heading=h.ybzt6s7hn9ff) ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> no --------- Co-authored-by: Allison Portis <[email protected]>
1 parent 3e2ef32 commit 8428e88

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package example
17+
18+
import io.delta.tables.DeltaTable
19+
20+
import org.apache.spark.sql.SparkSession
21+
22+
object Variant {
23+
24+
def main(args: Array[String]): Unit = {
25+
val tableName = "tbl"
26+
27+
val spark = SparkSession
28+
.builder()
29+
.appName("Variant-Delta")
30+
.master("local[*]")
31+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
32+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
33+
.getOrCreate()
34+
35+
// Create and insert variant values.
36+
try {
37+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
38+
spark.sql(s"CREATE TABLE $tableName(v VARIANT) USING DELTA")
39+
spark.sql(s"INSERT INTO $tableName VALUES (parse_json('1'))")
40+
spark.sql(s"""INSERT INTO $tableName SELECT parse_json(format_string('{\"k\": %s}', id))
41+
FROM range(0, 10)""")
42+
val ids = spark.sql("SELECT variant_get(v, '$.k', 'INT') out " +
43+
s"""FROM $tableName WHERE contains(schema_of_variant(v), 'k')
44+
ORDER BY out""")
45+
.collect().map { r => r.getInt(0) }.toSeq
46+
val expected = (0 until 10).toSeq
47+
assert(expected == ids)
48+
49+
spark.sql("DELETE FROM tbl WHERE variant_get(v, '$.k', 'INT') = 0")
50+
val idsWithDelete = spark.sql("SELECT variant_get(v, '$.k', 'INT') out " +
51+
s"""FROM $tableName WHERE contains(schema_of_variant(v), 'k')
52+
ORDER BY out""")
53+
.collect().map { r => r.getInt(0) }.toSeq
54+
val expectedWithDelete = (1 until 10).toSeq
55+
assert(idsWithDelete == expectedWithDelete)
56+
} finally {
57+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
58+
}
59+
60+
// Convert Parquet table with variant values to Delta.
61+
try {
62+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
63+
spark.sql("""CREATE TABLE tbl USING PARQUET AS (
64+
SELECT parse_json(format_string('%s', id)) v FROM range(0, 10))""")
65+
spark.sql(s"CONVERT TO DELTA $tableName")
66+
val convertToDeltaIds = spark.sql(s"SELECT v::int v FROM $tableName ORDER BY v")
67+
.collect()
68+
.map { r => r.getInt(0) }
69+
.toSeq
70+
val convertToDeltaExpected = (0 until 10).toSeq
71+
assert(convertToDeltaIds == convertToDeltaExpected)
72+
} finally {
73+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
74+
}
75+
76+
// DeltaTable create with variant Scala API.
77+
try {
78+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
79+
val table = io.delta.tables.DeltaTable.create()
80+
.tableName(tableName)
81+
.addColumn("v", "VARIANT")
82+
.execute()
83+
84+
table
85+
.as("tgt")
86+
.merge(
87+
spark.sql("select parse_json(format_string('%s', id)) v from range(0, 10)").as("source"),
88+
"source.v::int == tgt.v::int"
89+
)
90+
.whenMatched()
91+
.updateAll()
92+
.whenNotMatched()
93+
.insertAll()
94+
.execute()
95+
val insertedVals = spark.sql(s"SELECT v::int v FROM $tableName ORDER BY v")
96+
.collect()
97+
.map { r => r.getInt(0) }
98+
.toSeq
99+
val expected = (0 until 10).toSeq
100+
assert(insertedVals == expected)
101+
} finally {
102+
spark.sql(s"DROP TABLE IF EXISTS $tableName")
103+
spark.stop()
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)