Skip to content

Commit 18d27a3

Browse files
luozongle01cnauroth
authored andcommitted
ZOOKEEPER-4909: Fix exceeded request timeout in case of spurious wakeup
Closes #2237 Co-authored-by: Kezhu Wang <[email protected]> Signed-off-by: Kezhu Wang <[email protected]> Signed-off-by: Chris Nauroth <[email protected]> (cherry picked from commit 9cc3043) (cherry picked from commit c676112)
1 parent 236d254 commit 18d27a3

File tree

2 files changed

+141
-8
lines changed

2 files changed

+141
-8
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -1615,14 +1615,16 @@ public ReplyHeader submitRequest(
16151615
* Wait for request completion with timeout.
16161616
*/
16171617
private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException {
1618-
long waitStartTime = Time.currentElapsedTime();
1619-
while (!packet.finished) {
1620-
packet.wait(requestTimeout);
1621-
if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) {
1622-
LOG.error("Timeout error occurred for the packet '{}'.", packet);
1623-
r.setErr(Code.REQUESTTIMEOUT.intValue());
1624-
break;
1625-
}
1618+
long remainingTime = requestTimeout;
1619+
while (!packet.finished && remainingTime > 0) {
1620+
long waitStartTime = Time.currentElapsedTime();
1621+
packet.wait(remainingTime);
1622+
remainingTime -= (Time.currentElapsedTime() - waitStartTime);
1623+
}
1624+
1625+
if (!packet.finished) {
1626+
LOG.error("Timeout error occurred for the packet '{}'.", packet);
1627+
r.setErr(Code.REQUESTTIMEOUT.intValue());
16261628
}
16271629
}
16281630

zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java

+131
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,21 @@
1919
package org.apache.zookeeper;
2020

2121
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
24+
import static org.hamcrest.Matchers.lessThan;
2225
import static org.junit.jupiter.api.Assertions.assertEquals;
2326
import static org.junit.jupiter.api.Assertions.assertTrue;
2427
import static org.junit.jupiter.api.Assertions.fail;
2528
import java.io.IOException;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.jute.Record;
2631
import org.apache.zookeeper.ZooDefs.Ids;
2732
import org.apache.zookeeper.client.HostProvider;
2833
import org.apache.zookeeper.client.ZKClientConfig;
34+
import org.apache.zookeeper.common.Time;
35+
import org.apache.zookeeper.proto.ReplyHeader;
36+
import org.apache.zookeeper.proto.RequestHeader;
2937
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
3038
import org.apache.zookeeper.test.ClientBase;
3139
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -37,6 +45,9 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
3745
private static final int SERVER_COUNT = 3;
3846
private boolean dropPacket = false;
3947
private int dropPacketType = ZooDefs.OpCode.create;
48+
private boolean capturePacket = false;
49+
private int capturePacketType = ZooDefs.OpCode.create;
50+
private ClientCnxn.Packet capturedPacket = null;
4051

4152
@Test
4253
@Timeout(value = 120)
@@ -94,6 +105,105 @@ public void testClientRequestTimeout() throws Exception {
94105
}
95106
}
96107

108+
@Test
109+
void testClientRequestTimeoutTime() throws Exception {
110+
long requestTimeout = TimeUnit.SECONDS.toMillis(5);
111+
System.setProperty("zookeeper.request.timeout", Long.toString(requestTimeout));
112+
113+
CustomZooKeeper zk = null;
114+
int clientPort = PortAssignment.unique();
115+
MainThread mainThread = new MainThread(0, clientPort, "", false);
116+
mainThread.start();
117+
try {
118+
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT),
119+
"waiting for server 0 being up");
120+
121+
CountdownWatcher watch = new CountdownWatcher();
122+
zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), ClientBase.CONNECTION_TIMEOUT, watch);
123+
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
124+
125+
dropPacket = true;
126+
dropPacketType = ZooDefs.OpCode.create;
127+
128+
String data = "originalData";
129+
long startTime = Time.currentElapsedTime();
130+
try {
131+
zk.create("/testClientRequestTimeout", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
132+
fail("KeeperException is expected.");
133+
} catch (KeeperException exception) {
134+
long cost = Time.currentElapsedTime() - startTime;
135+
assertEquals(KeeperException.Code.REQUESTTIMEOUT, exception.code());
136+
LOG.info("testClientRequestTimeoutTime cost:{}", cost);
137+
assertThat(cost, greaterThanOrEqualTo(requestTimeout));
138+
assertThat(cost, lessThan(requestTimeout + 500));
139+
}
140+
} finally {
141+
mainThread.shutdown();
142+
if (zk != null) {
143+
zk.close();
144+
}
145+
}
146+
}
147+
148+
149+
@Test
150+
void testClientRequestTimeoutTimeSimulatingSpuriousWakeup() throws Exception {
151+
long requestTimeout = TimeUnit.SECONDS.toMillis(5);
152+
System.setProperty("zookeeper.request.timeout", Long.toString(requestTimeout));
153+
154+
CustomZooKeeper zk = null;
155+
int clientPort = PortAssignment.unique();
156+
MainThread mainThread = new MainThread(0, clientPort, "", false);
157+
mainThread.start();
158+
try {
159+
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT),
160+
"waiting for server 0 being up");
161+
162+
CountdownWatcher watch = new CountdownWatcher();
163+
zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), ClientBase.CONNECTION_TIMEOUT, watch);
164+
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
165+
166+
dropPacket = true;
167+
dropPacketType = ZooDefs.OpCode.create;
168+
capturePacket = true;
169+
capturePacketType = ZooDefs.OpCode.create;
170+
171+
// Simulating spurious wakeup
172+
new Thread(() -> {
173+
try {
174+
TimeUnit.MILLISECONDS.sleep(requestTimeout / 2);
175+
if (capturedPacket != null) {
176+
synchronized (capturedPacket) {
177+
capturedPacket.notifyAll();
178+
}
179+
}
180+
} catch (InterruptedException e) {
181+
throw new RuntimeException(e);
182+
}
183+
}).start();
184+
185+
String data = "originalData";
186+
long startTime = Time.currentElapsedTime();
187+
try {
188+
zk.create("/testClientRequestTimeout", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
189+
fail("KeeperException is expected.");
190+
} catch (KeeperException exception) {
191+
long cost = Time.currentElapsedTime() - startTime;
192+
assertEquals(KeeperException.Code.REQUESTTIMEOUT, exception.code());
193+
LOG.info("testClientRequestTimeoutTimeSimulatingSpuriousWakeup cost:{}", cost);
194+
assertThat(cost, greaterThanOrEqualTo(requestTimeout));
195+
assertThat(cost, lessThan(requestTimeout + 500));
196+
}
197+
} finally {
198+
capturePacket = false;
199+
capturedPacket = null;
200+
mainThread.shutdown();
201+
if (zk != null) {
202+
zk.close();
203+
}
204+
}
205+
}
206+
97207
/**
98208
* @return connection string in the form of
99209
* 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
@@ -141,6 +251,27 @@ public void finishPacket(Packet p) {
141251
super.finishPacket(p);
142252
}
143253

254+
@Override
255+
public Packet queuePacket(
256+
RequestHeader h,
257+
ReplyHeader r,
258+
Record request,
259+
Record response,
260+
AsyncCallback cb,
261+
String clientPath,
262+
String serverPath,
263+
Object ctx,
264+
ZooKeeper.WatchRegistration watchRegistration,
265+
WatchDeregistration watchDeregistration) {
266+
Packet packet = super.queuePacket(h, r, request, response, cb, clientPath, serverPath,
267+
ctx, watchRegistration, watchDeregistration);
268+
269+
if (capturePacket && h != null && h.getType() == capturePacketType) {
270+
capturedPacket = packet;
271+
}
272+
return packet;
273+
}
274+
144275
}
145276

146277
class CustomZooKeeper extends ZooKeeper {

0 commit comments

Comments
 (0)