19
19
package org .apache .zookeeper ;
20
20
21
21
import static org .apache .zookeeper .test .ClientBase .CONNECTION_TIMEOUT ;
22
+ import static org .hamcrest .MatcherAssert .assertThat ;
23
+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
24
+ import static org .hamcrest .Matchers .lessThan ;
22
25
import static org .junit .jupiter .api .Assertions .assertEquals ;
23
26
import static org .junit .jupiter .api .Assertions .assertTrue ;
24
27
import static org .junit .jupiter .api .Assertions .fail ;
25
28
import java .io .IOException ;
29
+ import java .util .concurrent .TimeUnit ;
30
+ import org .apache .jute .Record ;
26
31
import org .apache .zookeeper .ZooDefs .Ids ;
27
32
import org .apache .zookeeper .client .HostProvider ;
28
33
import org .apache .zookeeper .client .ZKClientConfig ;
34
+ import org .apache .zookeeper .common .Time ;
35
+ import org .apache .zookeeper .proto .ReplyHeader ;
36
+ import org .apache .zookeeper .proto .RequestHeader ;
29
37
import org .apache .zookeeper .server .quorum .QuorumPeerTestBase ;
30
38
import org .apache .zookeeper .test .ClientBase ;
31
39
import org .apache .zookeeper .test .ClientBase .CountdownWatcher ;
@@ -37,6 +45,9 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
37
45
private static final int SERVER_COUNT = 3 ;
38
46
private boolean dropPacket = false ;
39
47
private int dropPacketType = ZooDefs .OpCode .create ;
48
+ private boolean capturePacket = false ;
49
+ private int capturePacketType = ZooDefs .OpCode .create ;
50
+ private ClientCnxn .Packet capturedPacket = null ;
40
51
41
52
@ Test
42
53
@ Timeout (value = 120 )
@@ -94,6 +105,105 @@ public void testClientRequestTimeout() throws Exception {
94
105
}
95
106
}
96
107
108
+ @ Test
109
+ void testClientRequestTimeoutTime () throws Exception {
110
+ long requestTimeout = TimeUnit .SECONDS .toMillis (5 );
111
+ System .setProperty ("zookeeper.request.timeout" , Long .toString (requestTimeout ));
112
+
113
+ CustomZooKeeper zk = null ;
114
+ int clientPort = PortAssignment .unique ();
115
+ MainThread mainThread = new MainThread (0 , clientPort , "" , false );
116
+ mainThread .start ();
117
+ try {
118
+ assertTrue (ClientBase .waitForServerUp ("127.0.0.1:" + clientPort , CONNECTION_TIMEOUT ),
119
+ "waiting for server 0 being up" );
120
+
121
+ CountdownWatcher watch = new CountdownWatcher ();
122
+ zk = new CustomZooKeeper (getCxnString (new int []{clientPort }), ClientBase .CONNECTION_TIMEOUT , watch );
123
+ watch .waitForConnected (ClientBase .CONNECTION_TIMEOUT );
124
+
125
+ dropPacket = true ;
126
+ dropPacketType = ZooDefs .OpCode .create ;
127
+
128
+ String data = "originalData" ;
129
+ long startTime = Time .currentElapsedTime ();
130
+ try {
131
+ zk .create ("/testClientRequestTimeout" , data .getBytes (), Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT_SEQUENTIAL );
132
+ fail ("KeeperException is expected." );
133
+ } catch (KeeperException exception ) {
134
+ long cost = Time .currentElapsedTime () - startTime ;
135
+ assertEquals (KeeperException .Code .REQUESTTIMEOUT , exception .code ());
136
+ LOG .info ("testClientRequestTimeoutTime cost:{}" , cost );
137
+ assertThat (cost , greaterThanOrEqualTo (requestTimeout ));
138
+ assertThat (cost , lessThan (requestTimeout + 500 ));
139
+ }
140
+ } finally {
141
+ mainThread .shutdown ();
142
+ if (zk != null ) {
143
+ zk .close ();
144
+ }
145
+ }
146
+ }
147
+
148
+
149
+ @ Test
150
+ void testClientRequestTimeoutTimeSimulatingSpuriousWakeup () throws Exception {
151
+ long requestTimeout = TimeUnit .SECONDS .toMillis (5 );
152
+ System .setProperty ("zookeeper.request.timeout" , Long .toString (requestTimeout ));
153
+
154
+ CustomZooKeeper zk = null ;
155
+ int clientPort = PortAssignment .unique ();
156
+ MainThread mainThread = new MainThread (0 , clientPort , "" , false );
157
+ mainThread .start ();
158
+ try {
159
+ assertTrue (ClientBase .waitForServerUp ("127.0.0.1:" + clientPort , CONNECTION_TIMEOUT ),
160
+ "waiting for server 0 being up" );
161
+
162
+ CountdownWatcher watch = new CountdownWatcher ();
163
+ zk = new CustomZooKeeper (getCxnString (new int []{clientPort }), ClientBase .CONNECTION_TIMEOUT , watch );
164
+ watch .waitForConnected (ClientBase .CONNECTION_TIMEOUT );
165
+
166
+ dropPacket = true ;
167
+ dropPacketType = ZooDefs .OpCode .create ;
168
+ capturePacket = true ;
169
+ capturePacketType = ZooDefs .OpCode .create ;
170
+
171
+ // Simulating spurious wakeup
172
+ new Thread (() -> {
173
+ try {
174
+ TimeUnit .MILLISECONDS .sleep (requestTimeout / 2 );
175
+ if (capturedPacket != null ) {
176
+ synchronized (capturedPacket ) {
177
+ capturedPacket .notifyAll ();
178
+ }
179
+ }
180
+ } catch (InterruptedException e ) {
181
+ throw new RuntimeException (e );
182
+ }
183
+ }).start ();
184
+
185
+ String data = "originalData" ;
186
+ long startTime = Time .currentElapsedTime ();
187
+ try {
188
+ zk .create ("/testClientRequestTimeout" , data .getBytes (), Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT_SEQUENTIAL );
189
+ fail ("KeeperException is expected." );
190
+ } catch (KeeperException exception ) {
191
+ long cost = Time .currentElapsedTime () - startTime ;
192
+ assertEquals (KeeperException .Code .REQUESTTIMEOUT , exception .code ());
193
+ LOG .info ("testClientRequestTimeoutTimeSimulatingSpuriousWakeup cost:{}" , cost );
194
+ assertThat (cost , greaterThanOrEqualTo (requestTimeout ));
195
+ assertThat (cost , lessThan (requestTimeout + 500 ));
196
+ }
197
+ } finally {
198
+ capturePacket = false ;
199
+ capturedPacket = null ;
200
+ mainThread .shutdown ();
201
+ if (zk != null ) {
202
+ zk .close ();
203
+ }
204
+ }
205
+ }
206
+
97
207
/**
98
208
* @return connection string in the form of
99
209
* 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
@@ -141,6 +251,27 @@ public void finishPacket(Packet p) {
141
251
super .finishPacket (p );
142
252
}
143
253
254
+ @ Override
255
+ public Packet queuePacket (
256
+ RequestHeader h ,
257
+ ReplyHeader r ,
258
+ Record request ,
259
+ Record response ,
260
+ AsyncCallback cb ,
261
+ String clientPath ,
262
+ String serverPath ,
263
+ Object ctx ,
264
+ ZooKeeper .WatchRegistration watchRegistration ,
265
+ WatchDeregistration watchDeregistration ) {
266
+ Packet packet = super .queuePacket (h , r , request , response , cb , clientPath , serverPath ,
267
+ ctx , watchRegistration , watchDeregistration );
268
+
269
+ if (capturePacket && h != null && h .getType () == capturePacketType ) {
270
+ capturedPacket = packet ;
271
+ }
272
+ return packet ;
273
+ }
274
+
144
275
}
145
276
146
277
class CustomZooKeeper extends ZooKeeper {
0 commit comments