Skip to content

Commit 8186592

Browse files
authored
Merge pull request #1176 from ably/ECO-5626/fix-reconnect
fix: reconnect breaks async requests
2 parents 6d66f58 + 6e8a5a6 commit 8186592

File tree

5 files changed

+68
-5
lines changed

5 files changed

+68
-5
lines changed

lib/src/main/java/io/ably/lib/http/AsyncHttpScheduler.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,23 @@ public AsyncHttpScheduler exchangeHttpCore(HttpCore httpCore) {
3333
return new AsyncHttpScheduler(httpCore, this.executor);
3434
}
3535

36+
public void connect() {
37+
((CloseableThreadPoolExecutor) executor).connect();
38+
}
39+
3640
private static class CloseableThreadPoolExecutor implements CloseableExecutor {
37-
private final ThreadPoolExecutor executor;
41+
// can be accessed by multiple threads, so needs to be volatile
42+
private volatile ThreadPoolExecutor executor;
43+
private final ClientOptions options;
3844

3945
CloseableThreadPoolExecutor(final ClientOptions options) {
46+
this.options = options;
4047
executor = new ThreadPoolExecutor(
4148
options.asyncHttpThreadpoolSize,
4249
options.asyncHttpThreadpoolSize,
4350
KEEP_ALIVE_TIME,
4451
TimeUnit.MILLISECONDS,
45-
new LinkedBlockingQueue<Runnable>()
52+
new LinkedBlockingQueue<>()
4653
);
4754
}
4855

@@ -59,9 +66,16 @@ public void close() throws Exception {
5966
}
6067
}
6168

62-
@Override
63-
protected void finalize() throws Throwable {
64-
close();
69+
public void connect() {
70+
if (executor.isShutdown()) {
71+
executor = new ThreadPoolExecutor(
72+
options.asyncHttpThreadpoolSize,
73+
options.asyncHttpThreadpoolSize,
74+
KEEP_ALIVE_TIME,
75+
TimeUnit.MILLISECONDS,
76+
new LinkedBlockingQueue<>()
77+
);
78+
}
6579
}
6680
}
6781
}

lib/src/main/java/io/ably/lib/http/Http.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ public void close() throws Exception {
2121
asyncHttp.close();
2222
}
2323

24+
public void connect() {
25+
asyncHttp.connect();
26+
}
27+
2428
/**
2529
* [Internal Method]
2630
* <p>

lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ public AblyRealtime(ClientOptions options) throws AblyException {
9696
* <p>
9797
* Spec: RTN11
9898
*/
99+
@Override
99100
public void connect() {
101+
super.connect(); // resets thread pool for async requests
100102
connection.connect();
101103
}
102104

lib/src/main/java/io/ably/lib/rest/AblyBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ public void close() throws Exception {
138138
http.close();
139139
}
140140

141+
/**
142+
* Internal method, used for `RealtimeClient` only
143+
*/
144+
protected void connect() {
145+
http.connect();
146+
}
147+
141148
/**
142149
* A collection of Channels associated with an Ably instance.
143150
*/

lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
import static org.junit.Assert.assertTrue;
77
import static org.junit.Assert.fail;
88

9+
import io.ably.lib.test.common.Helpers;
10+
import io.ably.lib.types.AsyncHttpPaginatedResponse;
11+
import io.ably.lib.types.ErrorInfo;
912
import org.junit.Ignore;
1013
import org.junit.Test;
1114

@@ -23,6 +26,8 @@
2326
import io.ably.lib.types.ProtocolMessage;
2427
import org.junit.rules.Timeout;
2528

29+
import java.util.concurrent.atomic.AtomicLong;
30+
2631
public class RealtimeConnectTest extends ParameterizedTest {
2732

2833
public Timeout testTimeout = Timeout.seconds(30);
@@ -248,4 +253,35 @@ public void close_when_connecting() {
248253
}
249254
}
250255

256+
@Test
257+
public void reopened_connection_rest_works() throws Exception {
258+
try (AblyRealtime realtimeClient = new AblyRealtime(createOptions(testVars.keys[0].keyStr))) {
259+
realtimeClient.close();
260+
realtimeClient.connect();
261+
262+
long timestamp = System.currentTimeMillis();
263+
AtomicLong ablyTime = new AtomicLong(0);
264+
Helpers.CompletionWaiter waiter = new Helpers.CompletionWaiter();
265+
266+
realtimeClient.requestAsync("GET", "/time", null, null, null, new AsyncHttpPaginatedResponse.Callback() {
267+
@Override
268+
public void onResponse(AsyncHttpPaginatedResponse response) {
269+
ablyTime.set(response.items()[0].getAsLong());
270+
waiter.onSuccess();
271+
}
272+
273+
@Override
274+
public void onError(ErrorInfo reason) {
275+
waiter.onError(reason);
276+
}
277+
});
278+
279+
waiter.waitFor();
280+
281+
long thirtySeconds = 30_000L;
282+
283+
assertTrue(ablyTime.get() > timestamp - thirtySeconds && ablyTime.get() < timestamp + thirtySeconds);
284+
}
285+
}
286+
251287
}

0 commit comments

Comments
 (0)