Skip to content

Commit 9532302

Browse files
committed
test: add 2PC tests for native writer
1 parent 3fe2ca7 commit 9532302

1 file changed

Lines changed: 335 additions & 0 deletions

File tree

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet
21+
22+
import java.io.File
23+
24+
import org.apache.spark.sql.CometTestBase
25+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
26+
import org.apache.spark.sql.functions._
27+
28+
import org.apache.comet.CometConf
29+
30+
/**
31+
* Test suite for Comet Native Parquet Writer commit protocol and _temporary folder handling.
32+
*
33+
* These tests verify that Comet properly integrates with Spark's FileCommitProtocol for reliable
34+
* writes with proper 2-phase commit semantics.
35+
*/
36+
class CometParquetWriterCommitSuite extends CometTestBase {
37+
38+
private val nativeWriteConf = Seq(
39+
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
40+
CometConf.COMET_EXEC_ENABLED.key -> "true",
41+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true")
42+
43+
/** Helper to check if output directory contains any data files */
44+
private def hasDataFiles(dir: File): Boolean = {
45+
if (!dir.exists()) return false
46+
dir.listFiles().exists(f => f.getName.startsWith("part-") && f.getName.endsWith(".parquet"))
47+
}
48+
49+
// ==========================================================================
50+
// Test 1: _temporary folder lifecycle - exists during write, cleaned after commit
51+
// ==========================================================================
52+
test("_temporary folder is created during write and cleaned up after commit") {
53+
withTempPath { dir =>
54+
val outputPath = new File(dir, "output").getAbsolutePath
55+
56+
// Create a large dataset to ensure write takes some time
57+
val df = spark
58+
.range(0, 100000, 1, 4)
59+
.selectExpr("id", "id * 2 as value")
60+
61+
withSQLConf(nativeWriteConf: _*) {
62+
// Start write in background thread
63+
@volatile var writeStarted = false
64+
@volatile var writeException: Option[Throwable] = None
65+
val writeThread = new Thread(() => {
66+
try {
67+
writeStarted = true
68+
df.write.parquet(outputPath)
69+
} catch {
70+
case e: Throwable => writeException = Some(e)
71+
}
72+
})
73+
writeThread.start()
74+
75+
// Wait for write to start
76+
CometWriteTestHelpers.waitForCondition(writeStarted, timeoutMs = 5000)
77+
78+
// Wait for _temporary folder to appear (indicates write started)
79+
val tempExists = CometWriteTestHelpers.waitForCondition(
80+
CometWriteTestHelpers.hasTemporaryFolder(outputPath),
81+
timeoutMs = 10000)
82+
83+
// If _temporary folder appears during write, verify it exists
84+
if (tempExists) {
85+
assert(
86+
CometWriteTestHelpers.hasTemporaryFolder(outputPath),
87+
"_temporary folder should be created during write")
88+
89+
// Verify temp files exist
90+
val tempFileCount = CometWriteTestHelpers.countTemporaryFiles(outputPath)
91+
assert(tempFileCount > 0, s"Expected temp files during write, found $tempFileCount")
92+
}
93+
94+
// Wait for write to complete
95+
writeThread.join(30000)
96+
assert(!writeThread.isAlive, "Write should complete within 30 seconds")
97+
98+
// Check if write failed
99+
writeException.foreach(throw _)
100+
101+
// After successful commit, _temporary should be cleaned up
102+
assert(
103+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
104+
"_temporary folder should be cleaned up after successful commit")
105+
106+
// Verify final output exists
107+
val outputDir = new File(outputPath)
108+
assert(hasDataFiles(outputDir), "Final data files should exist")
109+
110+
// Verify data integrity
111+
val readDf = spark.read.parquet(outputPath)
112+
assert(readDf.count() == 100000, "All rows should be committed")
113+
}
114+
}
115+
}
116+
117+
// ==========================================================================
118+
// Test 2: _temporary cleanup on task failure
119+
// ==========================================================================
120+
test("_temporary folder is cleaned up on task failure") {
121+
withTempPath { dir =>
122+
val outputPath = new File(dir, "output").getAbsolutePath
123+
124+
// Create dataframe that will fail during processing
125+
val divideByZero = udf((x: Long) => { x / (x - 100) })
126+
val df = spark
127+
.range(0, 1000, 1, 1) // Single partition to avoid race conditions
128+
.select(divideByZero(col("id")).as("value"))
129+
130+
withSQLConf(nativeWriteConf: _*) {
131+
// Write should fail with division by zero
132+
intercept[Exception] {
133+
df.write.parquet(outputPath)
134+
}
135+
136+
// After failure, _temporary should be cleaned up
137+
// Note: May need a small delay for cleanup to complete
138+
Thread.sleep(1000)
139+
assert(
140+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
141+
"_temporary folder should be cleaned up after task failure")
142+
143+
// No data files should exist
144+
val outputDir = new File(outputPath)
145+
if (outputDir.exists()) {
146+
assert(!hasDataFiles(outputDir), "No data files should exist after failure")
147+
}
148+
}
149+
}
150+
}
151+
152+
// ==========================================================================
153+
// Test 3: _temporary with multiple concurrent tasks
154+
// ==========================================================================
155+
test("_temporary folder handles concurrent tasks correctly") {
156+
withTempPath { dir =>
157+
val outputPath = new File(dir, "output").getAbsolutePath
158+
159+
val df = spark
160+
.range(0, 50000, 1, 10) // 10 partitions
161+
.selectExpr("id", "id * 2 as value")
162+
163+
withSQLConf(nativeWriteConf: _*) {
164+
// Start write in background
165+
@volatile var writeStarted = false
166+
@volatile var writeException: Option[Throwable] = None
167+
val writeThread = new Thread(() => {
168+
try {
169+
writeStarted = true
170+
df.write.parquet(outputPath)
171+
} catch {
172+
case e: Throwable => writeException = Some(e)
173+
}
174+
})
175+
writeThread.start()
176+
177+
// Wait for write to start
178+
CometWriteTestHelpers.waitForCondition(writeStarted, timeoutMs = 5000)
179+
180+
// Wait for _temporary to exist (if it does during write)
181+
val tempAppeared = CometWriteTestHelpers.waitForCondition(
182+
CometWriteTestHelpers.hasTemporaryFolder(outputPath),
183+
timeoutMs = 10000)
184+
185+
// If _temporary appeared, check structure
186+
if (tempAppeared) {
187+
val subfolders = CometWriteTestHelpers.getTemporarySubfolders(outputPath)
188+
// Should have job-level folder structure
189+
assert(subfolders.nonEmpty, "Should have job tracking folders")
190+
}
191+
192+
writeThread.join(30000)
193+
194+
// Check if write failed
195+
writeException.foreach(throw _)
196+
197+
// After commit, everything cleaned up
198+
assert(
199+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
200+
"_temporary should be cleaned up after commit")
201+
202+
// Verify all partitions wrote successfully
203+
val outputDir = new File(outputPath)
204+
assert(hasDataFiles(outputDir), "Data files should exist")
205+
206+
// Verify data integrity
207+
val readDf = spark.read.parquet(outputPath)
208+
assert(readDf.count() == 50000, "All rows should be committed")
209+
}
210+
}
211+
}
212+
213+
// ==========================================================================
214+
// Test 4: _temporary with overwrite mode
215+
// ==========================================================================
216+
test("_temporary folder is cleaned up on overwrite") {
217+
withTempPath { dir =>
218+
val outputPath = new File(dir, "output").getAbsolutePath
219+
220+
withSQLConf(nativeWriteConf: _*) {
221+
// First write
222+
spark.range(1000).write.parquet(outputPath)
223+
assert(
224+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
225+
"_temporary should be cleaned up after first write")
226+
227+
// Verify first write
228+
val count1 = spark.read.parquet(outputPath).count()
229+
assert(count1 == 1000, "First write should have 1000 rows")
230+
231+
// Overwrite
232+
spark.range(500).write.mode("overwrite").parquet(outputPath)
233+
assert(
234+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
235+
"_temporary should be cleaned up after overwrite")
236+
237+
// Verify final state
238+
val count2 = spark.read.parquet(outputPath).count()
239+
assert(count2 == 500, "Overwrite should result in 500 rows")
240+
}
241+
}
242+
}
243+
244+
// ==========================================================================
245+
// Test 5: No _temporary folder when write completes quickly
246+
// ==========================================================================
247+
test("small writes may not create visible _temporary folder") {
248+
withTempPath { dir =>
249+
val outputPath = new File(dir, "output").getAbsolutePath
250+
251+
withSQLConf(nativeWriteConf: _*) {
252+
// Small dataset that completes quickly
253+
spark.range(10).write.parquet(outputPath)
254+
255+
// After completion, _temporary should not exist
256+
assert(
257+
!CometWriteTestHelpers.hasTemporaryFolder(outputPath),
258+
"_temporary should not exist after completion")
259+
260+
// Data should exist
261+
val outputDir = new File(outputPath)
262+
assert(hasDataFiles(outputDir), "Data files should exist")
263+
264+
// Verify data
265+
val readDf = spark.read.parquet(outputPath)
266+
assert(readDf.count() == 10, "Should have 10 rows")
267+
}
268+
}
269+
}
270+
271+
// ==========================================================================
272+
// Test 6: Multiple writes to different paths don't interfere
273+
// ==========================================================================
274+
test("multiple concurrent writes to different paths are isolated") {
275+
withTempPath { dir1 =>
276+
withTempPath { dir2 =>
277+
val outputPath1 = new File(dir1, "output1").getAbsolutePath
278+
val outputPath2 = new File(dir2, "output2").getAbsolutePath
279+
280+
withSQLConf(nativeWriteConf: _*) {
281+
val df1 = spark.range(0, 10000, 1, 4)
282+
val df2 = spark.range(10000, 20000, 1, 4)
283+
284+
// Start both writes
285+
val thread1 = new Thread(() => df1.write.parquet(outputPath1))
286+
val thread2 = new Thread(() => df2.write.parquet(outputPath2))
287+
288+
thread1.start()
289+
thread2.start()
290+
291+
thread1.join(30000)
292+
thread2.join(30000)
293+
294+
// Both should complete successfully
295+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath1))
296+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath2))
297+
298+
// Verify both outputs
299+
assert(spark.read.parquet(outputPath1).count() == 10000)
300+
assert(spark.read.parquet(outputPath2).count() == 10000)
301+
}
302+
}
303+
}
304+
}
305+
306+
// ==========================================================================
307+
// Test 7: Verify no leftover _temporary folders from previous runs
308+
// ==========================================================================
309+
test("no stale _temporary folders from previous operations") {
310+
withTempPath { dir =>
311+
val outputPath = new File(dir, "output").getAbsolutePath
312+
313+
withSQLConf(nativeWriteConf: _*) {
314+
// First write
315+
spark.range(100).write.parquet(outputPath)
316+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath))
317+
318+
// Second write (overwrite)
319+
spark.range(200).write.mode("overwrite").parquet(outputPath)
320+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath))
321+
322+
// Third write (overwrite)
323+
spark.range(300).write.mode("overwrite").parquet(outputPath)
324+
assert(!CometWriteTestHelpers.hasTemporaryFolder(outputPath))
325+
326+
// Verify final state
327+
assert(spark.read.parquet(outputPath).count() == 300)
328+
329+
// No temporary folders should exist
330+
val dirs = CometWriteTestHelpers.listDirectories(outputPath)
331+
assert(!dirs.exists(_.startsWith("_temporary")), "No _temporary folders should exist")
332+
}
333+
}
334+
}
335+
}

0 commit comments

Comments
 (0)