@@ -22,9 +22,7 @@ import org.apache.celeborn.tests.spark.fetch.failure.{FetchFailureDiskCleanBase,
2222
2323class CelebornFetchFailureDiskCleanSuite extends FetchFailureDiskCleanBase {
2424
25- // 1. for single level 1-1 lineage, the old disk space is cleaned before the spark application
26- // finish
27- test(" celeborn spark integration test - (1-1 dep with, single level lineage) the failed shuffle file is cleaned up correctly" ) {
25+ test(" celeborn spark integration test - the failed shuffle file is cleaned up correctly" ) {
2826 if (Spark3OrNewer ) {
2927 val sparkSession = createSparkSession(enableFailedShuffleCleaner = true )
3028 val celebornConf = SparkUtils .fromSparkConf(sparkSession.sparkContext.getConf)
@@ -47,90 +45,4 @@ class CelebornFetchFailureDiskCleanSuite extends FetchFailureDiskCleanBase {
4745 sparkSession.stop()
4846 }
4947 }
50-
51- // 2. for multiple level 1-1 lineage, the old disk space is cleaned one by one
52- test(" celeborn spark integration test - (1-1 dep with, multi-level lineage) the failed shuffle file is cleaned up correctly" ) {
53- if (Spark3OrNewer ) {
54- val sparkSession = createSparkSession(enableFailedShuffleCleaner = true )
55- val celebornConf = SparkUtils .fromSparkConf(sparkSession.sparkContext.getConf)
56- val hook =
57- new FileDeletionShuffleReaderGetHook (
58- celebornConf,
59- workerDirs,
60- shuffleIdToBeDeleted = Seq (0 , 1 ),
61- triggerStageId = Some (2 ))
62- TestCelebornShuffleManager .registerReaderGetHook(hook)
63- val checkingThread = triggerStorageCheckThread(
64- Seq (0 , 1 ),
65- Seq (2 , 3 , 4 ),
66- sparkSession,
67- forStableStatusChecking = false )
68- val tuples = sparkSession.sparkContext.parallelize(1 to 10000 , 2 )
69- .map { i => (i, i) }.groupByKey(16 ).map {
70- case (k, elements) => (k, elements.map(i => i))
71- }.groupByKey(4 ).groupByKey(2 ).collect()
72- checkStorageValidation(checkingThread)
73- // verify result
74- assert(hook.executed.get())
75- assert(tuples.length == 10000 )
76- for (elem <- tuples) {
77- elem._2.flatten.flatten.foreach(s => s.equals(elem._1))
78- }
79- sparkSession.stop()
80- }
81- }
82-
83- // 3. for single level M-1 lineage, the single failed disk space is cleaned
84- test(
85- " celeborn spark integration test - (M-1 dep with single level lineage) the single failed shuffle file is cleaned up correctly" ) {
86- if (Spark3OrNewer ) {
87- val sparkSession = createSparkSession(enableFailedShuffleCleaner = true )
88- val celebornConf = SparkUtils .fromSparkConf(sparkSession.sparkContext.getConf)
89- val hook = new FileDeletionShuffleReaderGetHook (
90- celebornConf,
91- workerDirs,
92- shuffleIdToBeDeleted = Seq (0 ))
93- TestCelebornShuffleManager .registerReaderGetHook(hook)
94- val checkingThread =
95- triggerStorageCheckThread(Seq (0 ), Seq (1 , 2 ), sparkSession, forStableStatusChecking = false )
96- import sparkSession .implicits ._
97- val df1 = Seq ((1 , " a" ), (2 , " b" )).toDF(" id" , " data" ).groupBy(" id" ).count()
98- val df2 = Seq ((2 , " c" ), (2 , " d" )).toDF(" id" , " data" ).groupBy(" id" ).count()
99- val tuples = df1.hint(" merge" ).join(df2, " id" ).select(" *" ).collect()
100- checkStorageValidation(checkingThread)
101- // verify result
102- assert(hook.executed.get())
103- val expect = " [2,1,2]"
104- assert(tuples.head.toString().equals(expect))
105- sparkSession.stop()
106- }
107- }
108-
109- // 4. for single level M-1 lineage, all failed disk spaces are cleaned
110- test(" celeborn spark integration test - (M-1 dep with single-level lineage) all failed disk spaces are cleaned" ) {
111- if (Spark3OrNewer ) {
112- val sparkSession = createSparkSession(enableFailedShuffleCleaner = true )
113- val celebornConf = SparkUtils .fromSparkConf(sparkSession.sparkContext.getConf)
114- val hook = new FileDeletionShuffleReaderGetHook (
115- celebornConf,
116- workerDirs,
117- shuffleIdToBeDeleted = Seq (0 , 1 ))
118- TestCelebornShuffleManager .registerReaderGetHook(hook)
119- val checkingThread = triggerStorageCheckThread(
120- Seq (0 , 1 ),
121- Seq (2 , 3 ),
122- sparkSession,
123- forStableStatusChecking = false )
124- import sparkSession .implicits ._
125- val df1 = Seq ((1 , " a" ), (2 , " b" )).toDF(" id" , " data" ).groupBy(" id" ).count()
126- val df2 = Seq ((2 , " c" ), (3 , " d" )).toDF(" id" , " data" ).groupBy(" id" ).count()
127- val tuples = df1.hint(" merge" ).join(df2, " id" ).select(" *" ).collect()
128- checkStorageValidation(checkingThread)
129- // verify result
130- assert(hook.executed.get())
131- val expect = " [2,1,1]"
132- assert(tuples.head.toString().equals(expect))
133- sparkSession.stop()
134- }
135- }
13648}
0 commit comments