Skip to content

Commit 6a2586f

Browse files
authored
[server] Add null safety for TimerTaskEntry removal (#1872)
1 parent 684a43d commit 6a2586f

File tree

2 files changed

+134
-3
lines changed

2 files changed

+134
-3
lines changed

fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimerTaskEntry.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,14 @@ boolean isCancelled() {
5757
}
5858

5959
void remove() {
60-
TimerTaskList currentList = list;
60+
TimerTaskList currentList;
6161
// If remove is called when another thread is moving the entry from a task
6262
// entry list to another, this may fail to remove the entry due to the change of value
6363
// of list. Thus, we retry until the list becomes null. In a rare case, this thread
6464
// sees null and exits the loop, but the other thread insert the entry to another list
6565
// later.
66-
while (list != null) {
66+
while ((currentList = list) != null) {
6767
currentList.remove(this);
68-
currentList = list;
6968
}
7069
}
7170

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.utils.timer;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
28+
/** Tests for {@link org.apache.fluss.server.utils.timer.TimerTaskEntry}. */
29+
public class TimerTaskEntryTest {
30+
31+
@Test
32+
void testRemoveEnsuresCurrentListNullSafety() throws InterruptedException {
33+
// Create two lists to reproduce the values that we are working
34+
// with being added/removed. We will oscillate between adding
35+
// and removing these elements until we encounter a NPE
36+
AtomicInteger sharedTaskCounter = new AtomicInteger(0);
37+
TimerTaskList primaryList = new TimerTaskList(sharedTaskCounter);
38+
TimerTaskList secondaryList = new TimerTaskList(sharedTaskCounter);
39+
40+
// Set up our initial task that will handle coordinating this
41+
// reproduction behavior
42+
TestTask task = new TestTask(0L);
43+
TimerTaskEntry entry = new TimerTaskEntry(task, 10L);
44+
primaryList.add(entry);
45+
46+
// Container for any NullPointerException caught during remove()
47+
AtomicReference<NullPointerException> thrownException = new AtomicReference<>();
48+
49+
// Latch to handle coordinating addition/removal threads
50+
CountDownLatch latch = new CountDownLatch(1);
51+
52+
// Create a thread responsible for continually removing entries, which
53+
// will be responsible for triggering the exception
54+
Thread removalThread =
55+
new Thread(
56+
() -> {
57+
try {
58+
latch.await();
59+
// Continually remove elements from the task (forward-oscillation)
60+
for (int i = 0; i < 10000; i++) {
61+
entry.remove();
62+
}
63+
} catch (NullPointerException e) {
64+
thrownException.set(e);
65+
} catch (InterruptedException e) {
66+
Thread.currentThread().interrupt();
67+
}
68+
});
69+
70+
// Create a separate thread for adding the entry while the removal thread is
71+
// still executing which results in the expected null reference
72+
Thread additionThread =
73+
new Thread(
74+
() -> {
75+
try {
76+
// Wait for the initial removal to complete
77+
latch.await();
78+
// Add the entry to our separate list while the removal thread is
79+
// still verifying the condition (resulting in our null list within
80+
// the internal removal call, and our exception)
81+
for (int i = 0; i < 10000; i++) {
82+
// Determine which list to add to the task
83+
// (backwards-oscillation)
84+
TimerTaskList currentList = entry.list;
85+
if (currentList == null || currentList == primaryList) {
86+
// If the entry is not in any list or in the primary list,
87+
// move it to the secondary list
88+
secondaryList.add(entry);
89+
} else if (currentList == secondaryList) {
90+
// If the entry is in the secondary list, move it to the
91+
// primary list
92+
primaryList.add(entry);
93+
}
94+
Thread.yield();
95+
}
96+
} catch (InterruptedException e) {
97+
Thread.currentThread().interrupt();
98+
}
99+
});
100+
101+
// Start both threads
102+
removalThread.start();
103+
additionThread.start();
104+
105+
// Release both threads to trigger our race condition
106+
latch.countDown();
107+
108+
// Wait for threads to complete
109+
removalThread.join();
110+
additionThread.join();
111+
112+
// Attempt to remove the last entry (to ensure empty list)
113+
entry.remove();
114+
115+
// Verify the list is empty after entry removal and ensure
116+
// counter reflects the correct state
117+
assertThat(entry.list).isNull();
118+
assertThat(sharedTaskCounter.get()).isEqualTo(0);
119+
120+
// Assert that no exception was originated
121+
assertThat(thrownException.get()).isNull();
122+
}
123+
124+
private static class TestTask extends TimerTask {
125+
public TestTask(long delayMs) {
126+
super(delayMs);
127+
}
128+
129+
@Override
130+
public void run() {}
131+
}
132+
}

0 commit comments

Comments
 (0)