1818package org .apache .kyuubi .util
1919
2020import java .io .{ByteArrayOutputStream , PrintStream }
21+ import java .util .concurrent .{CountDownLatch , TimeUnit }
2122
2223import org .apache .kyuubi .KyuubiFunSuite
2324
@@ -30,12 +31,6 @@ class ThreadDumpUtilsSuite extends KyuubiFunSuite {
3031 assert(dump.contains(" Thread Dump End" ))
3132 }
3233
33- test(" dumpToString should respect stackDepth config" ) {
34- val config = ThreadDumpUtils .DumpConfig (stackDepth = 1 )
35- val dump = ThreadDumpUtils .dumpToString(config)
36- assert(dump.linesIterator.exists(_.trim.startsWith(" at " )))
37- }
38-
3934 test(" dumpToConsole should print to provided stream" ) {
4035 val baos = new ByteArrayOutputStream ()
4136 val ps = new PrintStream (baos)
@@ -59,4 +54,141 @@ class ThreadDumpUtilsSuite extends KyuubiFunSuite {
5954 assert(dump.contains(" Thread Dump End" ))
6055 assert(! dump.contains(" ====================== Daemon Threads ======================" ))
6156 }
57+
58+ /**
59+ * Helper to find a specific thread's section in the dump.
60+ * This is useful for targeted assertions on a single thread's output.
61+ */
62+ private def findThreadSection (dump : String , threadName : String ): Option [String ] = {
63+ val pattern = s """ (?s)Thread: " $threadName".*?(?= \\ n \\ nThread:| \\ n \\ n================) """ .r
64+ pattern.findFirstIn(dump)
65+ }
66+
67+ test(" dumpToString should correctly limit stack depth and show truncation message" ) {
68+ val readyLatch = new CountDownLatch (1 )
69+ var deepStackThread : Thread = null
70+ val stackDepth = 100
71+ val limit = 10
72+
73+ // Recursive function to create a deep stack
74+ def deepStack (n : Int ): Unit = {
75+ if (n > 0 ) {
76+ deepStack(n - 1 )
77+ } else {
78+ readyLatch.countDown() // Signal that the thread is ready
79+ try Thread .sleep(5000 ) // Keep thread alive for dumping
80+ catch { case _ : InterruptedException => Thread .currentThread().interrupt() }
81+ }
82+ }
83+
84+ try {
85+ deepStackThread = new Thread (() => deepStack(stackDepth), " deep-stack-test-thread" )
86+ deepStackThread.setDaemon(true )
87+ deepStackThread.start()
88+
89+ // Wait for the thread to reach its deepest point
90+ assert(readyLatch.await(5 , TimeUnit .SECONDS ), " Test thread did not initialize in time" )
91+
92+ val config = ThreadDumpUtils .DumpConfig (stackDepth = limit)
93+ val dump = ThreadDumpUtils .dumpToString(config)
94+ val section = findThreadSection(dump, " deep-stack-test-thread" )
95+
96+ assert(section.isDefined, " Thread 'deep-stack-test-thread' not found in dump" )
97+
98+ // Verify that the stack trace is truncated and the message is correct
99+ // Total frames = stackDepth + 1 (for the call to deepStack(0)),
100+ // so we expect (101 - 10) more frames
101+ val expectedMoreFrames = stackDepth + 1 - limit
102+ assert(section.get.contains(s " ... ( ${expectedMoreFrames} more stack frames) " ))
103+
104+ // Verify the number of 'at' lines matches the limit
105+ val stackTraceLines = section.get.linesIterator.count(_.trim.startsWith(" at " ))
106+ assert(stackTraceLines == limit)
107+
108+ } finally {
109+ if (deepStackThread != null ) deepStackThread.interrupt()
110+ }
111+ }
112+
113+ test(" dumpToString should sort threads by ID when configured" ) {
114+ val config = ThreadDumpUtils .DumpConfig (sortThreadsBy = ThreadDumpUtils .ThreadSortBy .Id )
115+ val dump = ThreadDumpUtils .dumpToString(config)
116+
117+ // Extract the summary table for easier parsing
118+ val summarySection =
119+ dump.substring(dump.indexOf(" ======================== Summary ========================" ))
120+
121+ // Regex to extract thread IDs from the summary table lines
122+ val idPattern = """ ^\s*(\d+)\s*\|.*""" .r
123+ val ids = summarySection.linesIterator.flatMap { line =>
124+ idPattern.findFirstMatchIn(line).map(_.group(1 ).toLong)
125+ }.toList
126+
127+ assert(ids.nonEmpty, " No thread IDs found in the summary" )
128+ // Verify that the list of IDs is sorted, which proves the sorting logic worked
129+ assert(ids == ids.sorted, s " Thread IDs are not sorted: $ids" )
130+ }
131+
132+ test(" dumpToString should detect and report deadlocks" ) {
133+ val lock1 = new Object ()
134+ val lock2 = new Object ()
135+ // Latch to ensure both threads are in a deadlock state before we take the dump
136+ val deadlockSetupLatch = new CountDownLatch (2 )
137+
138+ val thread1 = new Thread (
139+ () => {
140+ lock1.synchronized {
141+ deadlockSetupLatch.countDown()
142+ Thread .sleep(200 ) // Wait for thread2 to acquire lock2
143+ lock2.synchronized {
144+ // This line will never be reached
145+ }
146+ }
147+ },
148+ " kyuubi-deadlock-thread-1" )
149+
150+ val thread2 = new Thread (
151+ () => {
152+ lock2.synchronized {
153+ deadlockSetupLatch.countDown()
154+ Thread .sleep(200 ) // Wait for thread1 to acquire lock1
155+ lock1.synchronized {
156+ // This line will never be reached
157+ }
158+ }
159+ },
160+ " kyuubi-deadlock-thread-2" )
161+
162+ // Use daemon threads so they don't block JVM exit if the test fails
163+ thread1.setDaemon(true )
164+ thread2.setDaemon(true )
165+
166+ try {
167+ thread1.start()
168+ thread2.start()
169+
170+ // Wait for both threads to acquire their first lock
171+ assert(
172+ deadlockSetupLatch.await(5 , TimeUnit .SECONDS ),
173+ " Deadlock condition was not met in time" )
174+
175+ // Give the JVM time to officially recognize the deadlock state
176+ Thread .sleep(500 )
177+
178+ val dump = ThreadDumpUtils .dumpToString()
179+
180+ assert(dump.contains(" *** DEADLOCK DETECTED ***" ))
181+ // Check that both threads involved in the deadlock are mentioned
182+ assert(dump.contains(""" "kyuubi-deadlock-thread-1"""" ))
183+ assert(dump.contains(""" "kyuubi-deadlock-thread-2"""" ))
184+ // Check for lock details which are crucial for debugging deadlocks
185+ assert(dump.contains(" Waiting on:" ))
186+ assert(dump.contains(" Lock owned by" ))
187+
188+ } finally {
189+ // Clean up the threads
190+ thread1.interrupt()
191+ thread2.interrupt()
192+ }
193+ }
62194}
0 commit comments