Skip to content

Commit 3fe2ca7

Browse files
committed
feat: add test helpers for write commit protocol
1 parent 4ad285e commit 3fe2ca7

1 file changed

Lines changed: 174 additions & 0 deletions

File tree

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet
21+
22+
import org.apache.hadoop.fs.Path
23+
import org.apache.spark.sql.SparkSession
24+
25+
object CometWriteTestHelpers {
26+
27+
/**
28+
* Check if _temporary folder exists in the given path.
29+
*
30+
* @param basePath
31+
* The base output path to check
32+
* @param spark
33+
* SparkSession for accessing Hadoop configuration
34+
* @return
35+
* true if _temporary folder exists, false otherwise
36+
*/
37+
def hasTemporaryFolder(basePath: String)(implicit spark: SparkSession): Boolean = {
38+
try {
39+
val fs = new Path(basePath).getFileSystem(spark.sparkContext.hadoopConfiguration)
40+
fs.exists(new Path(basePath, "_temporary"))
41+
} catch {
42+
case _: Exception => false
43+
}
44+
}
45+
46+
/**
47+
* Get all subfolders in _temporary directory.
48+
*
49+
* @param basePath
50+
* The base output path
51+
* @param spark
52+
* SparkSession for accessing Hadoop configuration
53+
* @return
54+
* Sequence of subfolder names in _temporary
55+
*/
56+
def getTemporarySubfolders(basePath: String)(implicit spark: SparkSession): Seq[String] = {
57+
try {
58+
val fs = new Path(basePath).getFileSystem(spark.sparkContext.hadoopConfiguration)
59+
val tempPath = new Path(basePath, "_temporary")
60+
if (!fs.exists(tempPath)) return Seq.empty
61+
62+
fs.listStatus(tempPath).map(_.getPath.getName).toSeq
63+
} catch {
64+
case _: Exception => Seq.empty
65+
}
66+
}
67+
68+
/**
69+
* Count files in _temporary folder recursively.
70+
*
71+
* @param basePath
72+
* The base output path
73+
* @param spark
74+
* SparkSession for accessing Hadoop configuration
75+
* @return
76+
* Number of files in _temporary folder
77+
*/
78+
def countTemporaryFiles(basePath: String)(implicit spark: SparkSession): Int = {
79+
try {
80+
val fs = new Path(basePath).getFileSystem(spark.sparkContext.hadoopConfiguration)
81+
val tempPath = new Path(basePath, "_temporary")
82+
if (!fs.exists(tempPath)) return 0
83+
84+
def countRecursive(path: Path): Int = {
85+
val status = fs.listStatus(path)
86+
status.map { fileStatus =>
87+
if (fileStatus.isDirectory) {
88+
countRecursive(fileStatus.getPath)
89+
} else {
90+
1
91+
}
92+
}.sum
93+
}
94+
95+
countRecursive(tempPath)
96+
} catch {
97+
case _: Exception => 0
98+
}
99+
}
100+
101+
/**
102+
* Wait for condition with timeout.
103+
*
104+
* @param condition
105+
* The condition to wait for
106+
* @param timeoutMs
107+
* Timeout in milliseconds (default: 5000)
108+
* @param intervalMs
109+
* Check interval in milliseconds (default: 100)
110+
* @return
111+
* true if condition met within timeout, false otherwise
112+
*/
113+
def waitForCondition(
114+
condition: => Boolean,
115+
timeoutMs: Long = 5000,
116+
intervalMs: Long = 100): Boolean = {
117+
val deadline = System.currentTimeMillis() + timeoutMs
118+
while (System.currentTimeMillis() < deadline) {
119+
if (condition) return true
120+
Thread.sleep(intervalMs)
121+
}
122+
false
123+
}
124+
125+
/**
126+
* Get all files in a directory (non-recursive).
127+
*
128+
* @param basePath
129+
* The directory path
130+
* @param spark
131+
* SparkSession for accessing Hadoop configuration
132+
* @return
133+
* Sequence of file names
134+
*/
135+
def listFiles(basePath: String)(implicit spark: SparkSession): Seq[String] = {
136+
try {
137+
val fs = new Path(basePath).getFileSystem(spark.sparkContext.hadoopConfiguration)
138+
val path = new Path(basePath)
139+
if (!fs.exists(path)) return Seq.empty
140+
141+
fs.listStatus(path)
142+
.filter(_.isFile)
143+
.map(_.getPath.getName)
144+
.toSeq
145+
} catch {
146+
case _: Exception => Seq.empty
147+
}
148+
}
149+
150+
/**
151+
* Get all directories in a path (non-recursive).
152+
*
153+
* @param basePath
154+
* The directory path
155+
* @param spark
156+
* SparkSession for accessing Hadoop configuration
157+
* @return
158+
* Sequence of directory names
159+
*/
160+
def listDirectories(basePath: String)(implicit spark: SparkSession): Seq[String] = {
161+
try {
162+
val fs = new Path(basePath).getFileSystem(spark.sparkContext.hadoopConfiguration)
163+
val path = new Path(basePath)
164+
if (!fs.exists(path)) return Seq.empty
165+
166+
fs.listStatus(path)
167+
.filter(_.isDirectory)
168+
.map(_.getPath.getName)
169+
.toSeq
170+
} catch {
171+
case _: Exception => Seq.empty
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)