Skip to content

Commit d343539

Browse files
committed
test: add 2PC tests for native writer
1 parent abd8ef7 commit d343539

1 file changed

Lines changed: 267 additions & 0 deletions

File tree

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet
21+
22+
import java.io.File
23+
24+
import org.apache.spark.sql.CometTestBase
25+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
26+
import org.apache.spark.sql.functions._
27+
28+
import org.apache.comet.CometConf
29+
30+
class CometParquetWriterCommitSuite extends CometTestBase {
31+
32+
private val nativeWriteConf = Seq(
33+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
34+
CometConf.COMET_EXEC_ENABLED.key -> "true",
35+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true")
36+
37+
private def hasDataFiles(dir: File): Boolean = {
38+
if (!dir.exists()) return false
39+
dir.listFiles().exists(f => f.getName.startsWith("part-") && f.getName.endsWith(".parquet"))
40+
}
41+
42+
test("_temporary folder is created during write and cleaned up after commit") {
43+
withTempPath { dir =>
44+
val outputPath = new File(dir, "output").getAbsolutePath
45+
46+
val df = spark
47+
.range(0, 100000, 1, 4)
48+
.selectExpr("id", "id * 2 as value")
49+
50+
withSQLConf(nativeWriteConf: _*) {
51+
@volatile var writeStarted = false
52+
@volatile var writeException: Option[Throwable] = None
53+
val writeThread = new Thread(() => {
54+
try {
55+
writeStarted = true
56+
df.write.parquet(outputPath)
57+
} catch {
58+
case e: Throwable => writeException = Some(e)
59+
}
60+
})
61+
writeThread.start()
62+
63+
CometWriteTestHelpers.waitForCondition(writeStarted, timeoutMs = 5000)
64+
65+
val tempExists = CometWriteTestHelpers.waitForCondition(
66+
CometWriteTestHelpers.hasTemporaryFolder(outputPath),
67+
timeoutMs = 10000)
68+
69+
if (tempExists) {
70+
assert(
71+
CometWriteTestHelpers.hasTemporaryFolder(outputPath),
72+
"_temporary folder should be created during write")
73+
74+
val tempFileCount = CometWriteTestHelpers.countTemporaryFiles(outputPath)
75+
assert(tempFileCount > 0, s"Expected temp files during write, found $tempFileCount")
76+
}
77+
78+
writeThread.join(30000)
79+
assert(!writeThread.isAlive, "Write should complete within 30 seconds")
80+
81+
writeException.foreach(throw _)
82+
83+
assert(
84+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
85+
"_temporary folder should be cleaned up after successful commit")
86+
87+
val outputDir = new File(outputPath)
88+
assert(hasDataFiles(outputDir), "Final data files should exist")
89+
90+
val readDf = spark.read.parquet(outputPath)
91+
assert(readDf.count() == 100000, "All rows should be committed")
92+
}
93+
}
94+
}
95+
96+
test("_temporary folder is cleaned up on task failure") {
97+
withTempPath { dir =>
98+
val outputPath = new File(dir, "output").getAbsolutePath
99+
100+
val divideByZero = udf((x: Long) => { x / (x - 100) })
101+
val df = spark
102+
.range(0, 1000, 1, 1) // single partition to avoid race conditions
103+
.select(divideByZero(col("id")).as("value"))
104+
105+
withSQLConf(nativeWriteConf: _*) {
106+
intercept[Exception] {
107+
df.write.parquet(outputPath)
108+
}
109+
110+
// small delay for cleanup to complete
111+
Thread.sleep(1000)
112+
assert(
113+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
114+
"_temporary folder should be cleaned up after task failure")
115+
116+
val outputDir = new File(outputPath)
117+
if (outputDir.exists()) {
118+
assert(!hasDataFiles(outputDir), "No data files should exist after failure")
119+
}
120+
}
121+
}
122+
}
123+
124+
test("_temporary folder handles concurrent tasks correctly") {
125+
withTempPath { dir =>
126+
val outputPath = new File(dir, "output").getAbsolutePath
127+
128+
val df = spark
129+
.range(0, 50000, 1, 10)
130+
.selectExpr("id", "id * 2 as value")
131+
132+
withSQLConf(nativeWriteConf: _*) {
133+
@volatile var writeStarted = false
134+
@volatile var writeException: Option[Throwable] = None
135+
val writeThread = new Thread(() => {
136+
try {
137+
writeStarted = true
138+
df.write.parquet(outputPath)
139+
} catch {
140+
case e: Throwable => writeException = Some(e)
141+
}
142+
})
143+
writeThread.start()
144+
145+
CometWriteTestHelpers.waitForCondition(writeStarted, timeoutMs = 5000)
146+
147+
val tempAppeared = CometWriteTestHelpers.waitForCondition(
148+
CometWriteTestHelpers.hasTemporaryFolder(outputPath),
149+
timeoutMs = 10000)
150+
151+
if (tempAppeared) {
152+
val subfolders = CometWriteTestHelpers.getTemporarySubfolders(outputPath)
153+
assert(subfolders.nonEmpty, "Should have job tracking folders")
154+
}
155+
156+
writeThread.join(30000)
157+
158+
writeException.foreach(throw _)
159+
160+
assert(
161+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
162+
"_temporary should be cleaned up after commit")
163+
164+
val outputDir = new File(outputPath)
165+
assert(hasDataFiles(outputDir), "Data files should exist")
166+
167+
val readDf = spark.read.parquet(outputPath)
168+
assert(readDf.count() == 50000, "All rows should be committed")
169+
}
170+
}
171+
}
172+
173+
test("_temporary folder is cleaned up on overwrite") {
174+
withTempPath { dir =>
175+
val outputPath = new File(dir, "output").getAbsolutePath
176+
177+
withSQLConf(nativeWriteConf: _*) {
178+
spark.range(1000).write.parquet(outputPath)
179+
assert(
180+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
181+
"_temporary should be cleaned up after first write")
182+
183+
val count1 = spark.read.parquet(outputPath).count()
184+
assert(count1 == 1000, "First write should have 1000 rows")
185+
186+
spark.range(500).write.mode("overwrite").parquet(outputPath)
187+
assert(
188+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
189+
"_temporary should be cleaned up after overwrite")
190+
191+
val count2 = spark.read.parquet(outputPath).count()
192+
assert(count2 == 500, "Overwrite should result in 500 rows")
193+
}
194+
}
195+
}
196+
197+
test("small writes may not create visible _temporary folder") {
198+
withTempPath { dir =>
199+
val outputPath = new File(dir, "output").getAbsolutePath
200+
201+
withSQLConf(nativeWriteConf: _*) {
202+
spark.range(10).write.parquet(outputPath)
203+
204+
assert(
205+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
206+
"_temporary should not exist after completion")
207+
208+
val outputDir = new File(outputPath)
209+
assert(hasDataFiles(outputDir), "Data files should exist")
210+
211+
val readDf = spark.read.parquet(outputPath)
212+
assert(readDf.count() == 10, "Should have 10 rows")
213+
}
214+
}
215+
}
216+
217+
test("multiple concurrent writes to different paths are isolated") {
218+
withTempPath { dir1 =>
219+
withTempPath { dir2 =>
220+
val outputPath1 = new File(dir1, "output1").getAbsolutePath
221+
val outputPath2 = new File(dir2, "output2").getAbsolutePath
222+
223+
withSQLConf(nativeWriteConf: _*) {
224+
val df1 = spark.range(0, 10000, 1, 4)
225+
val df2 = spark.range(10000, 20000, 1, 4)
226+
227+
val thread1 = new Thread(() => df1.write.parquet(outputPath1))
228+
val thread2 = new Thread(() => df2.write.parquet(outputPath2))
229+
230+
thread1.start()
231+
thread2.start()
232+
233+
thread1.join(30000)
234+
thread2.join(30000)
235+
236+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath1))
237+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath2))
238+
239+
assert(spark.read.parquet(outputPath1).count() == 10000)
240+
assert(spark.read.parquet(outputPath2).count() == 10000)
241+
}
242+
}
243+
}
244+
}
245+
246+
test("no stale _temporary folders from previous operations") {
247+
withTempPath { dir =>
248+
val outputPath = new File(dir, "output").getAbsolutePath
249+
250+
withSQLConf(nativeWriteConf: _*) {
251+
spark.range(100).write.parquet(outputPath)
252+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath))
253+
254+
spark.range(200).write.mode("overwrite").parquet(outputPath)
255+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath))
256+
257+
spark.range(300).write.mode("overwrite").parquet(outputPath)
258+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath))
259+
260+
assert(spark.read.parquet(outputPath).count() == 300)
261+
262+
val dirs = CometWriteTestHelpers.listDirectories(outputPath)
263+
assert(!dirs.exists(_.startsWith("_temporary")), "No _temporary folders should exist")
264+
}
265+
}
266+
}
267+
}

0 commit comments

Comments
 (0)