Skip to content

Commit 92afb9b

Browse files
committed
fix: Address PR review comments (round 4)
- Close allocator in SpiceClient constructor if buildFlightClient() or initRetryers() throws, preventing off-heap memory leaks on failed client construction. - Replace raw Thread usage in concurrent tests with ExecutorService and proper shutdownNow() cleanup, preventing thread leaks and JVM hangs on test timeout. - Refactor integration tests to probe server availability in setUp() and gate with a boolean flag (matching TpchIntegrationTest pattern) instead of brittle exception message substring matching. - Clarify README example: add comment explaining isTransportFailure() is application-defined and suggest which exception types to check.
1 parent d8d736a commit 92afb9b

4 files changed

Lines changed: 114 additions & 75 deletions

File tree

.commitmsg

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
fix: Address PR review comments (round 4)
2+
3+
- Close allocator in SpiceClient constructor if buildFlightClient() or
4+
initRetryers() throws, preventing off-heap memory leaks on failed
5+
client construction.
6+
7+
- Replace raw Thread usage in concurrent tests with ExecutorService
8+
and proper shutdownNow() cleanup, preventing thread leaks and JVM
9+
hangs on test timeout.
10+
11+
- Refactor integration tests to probe server availability in setUp()
12+
and gate with a boolean flag (matching TpchIntegrationTest pattern)
13+
instead of brittle exception message substring matching.
14+
15+
- Clarify README example: add comment explaining isTransportFailure()
16+
is application-defined and suggest which exception types to check.

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,10 @@ SpiceClient client = SpiceClient.builder()
225225
.withSpiceCloud()
226226
.build();
227227

228-
// Long-lived usage with transport recovery
228+
// Long-lived usage with transport recovery.
229+
// isTransportFailure() is application-defined; check for
230+
// io.grpc.StatusRuntimeException with Status.UNAVAILABLE,
231+
// SSLHandshakeException, or similar transport-level errors.
229232
try {
230233
try (FlightStream stream = client.query(sql)) {
231234
// process results...

src/main/java/ai/spice/SpiceClient.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,20 @@ public SpiceClient(String appId, String apiKey, URI flightAddress, URI httpAddre
223223
: memoryLimitMB * BYTES_PER_MB;
224224
this.allocator = new RootAllocator(memoryLimitBytes);
225225

226-
// Build the Flight client (channel + auth handshake)
227-
buildFlightClient();
226+
try {
227+
// Build the Flight client (channel + auth handshake)
228+
buildFlightClient();
228229

229-
// Initialize cached retryers (immutable, built once)
230-
initRetryers();
230+
// Initialize cached retryers (immutable, built once)
231+
initRetryers();
232+
} catch (RuntimeException | Error e) {
233+
try {
234+
this.allocator.close();
235+
} catch (Exception closeEx) {
236+
e.addSuppressed(closeEx);
237+
}
238+
throw e;
239+
}
231240

232241
logger.debug("SpiceClient initialized - flightAddress={}, appId={}", this.flightAddress, this.appId);
233242
}

src/test/java/ai/spice/ResetTest.java

Lines changed: 81 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ of this software and associated documentation files (the "Software"), to deal
2626
import java.util.ArrayList;
2727
import java.util.List;
2828
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
2932
import java.util.concurrent.atomic.AtomicInteger;
3033

3134
import org.apache.arrow.flight.FlightStream;
@@ -39,6 +42,21 @@ of this software and associated documentation files (the "Software"), to deal
3942
*/
4043
public class ResetTest extends TestCase {
4144

45+
private boolean serverAvailable = false;
46+
47+
@Override
48+
protected void setUp() throws Exception {
49+
super.setUp();
50+
try (SpiceClient probe = SpiceClient.builder().build()) {
51+
try (FlightStream stream = probe.query("SELECT 1")) {
52+
stream.next();
53+
}
54+
serverAvailable = true;
55+
} catch (Exception e) {
56+
serverAvailable = false;
57+
}
58+
}
59+
4260
// ==================== reset() Happy Path ====================
4361

4462
/**
@@ -204,28 +222,31 @@ public void testConcurrentResetDoesNotThrow() throws Exception {
204222
final SpiceClient client = SpiceClient.builder().build();
205223
final int threadCount = 8;
206224
final CountDownLatch startLatch = new CountDownLatch(1);
207-
final CountDownLatch doneLatch = new CountDownLatch(threadCount);
208225
final AtomicInteger errors = new AtomicInteger(0);
226+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
209227

210-
for (int i = 0; i < threadCount; i++) {
211-
new Thread(() -> {
212-
try {
213-
startLatch.await(); // all threads start at the same time
214-
client.reset();
215-
} catch (Exception e) {
216-
errors.incrementAndGet();
217-
} finally {
218-
doneLatch.countDown();
219-
}
220-
}).start();
221-
}
228+
try {
229+
for (int i = 0; i < threadCount; i++) {
230+
executor.submit(() -> {
231+
try {
232+
startLatch.await();
233+
client.reset();
234+
} catch (Exception e) {
235+
errors.incrementAndGet();
236+
}
237+
});
238+
}
222239

223-
startLatch.countDown(); // release all threads
224-
assertTrue("Concurrent reset should complete within 30s",
225-
doneLatch.await(30, java.util.concurrent.TimeUnit.SECONDS));
240+
startLatch.countDown(); // release all threads
241+
executor.shutdown();
242+
assertTrue("Concurrent reset should complete within 30s",
243+
executor.awaitTermination(30, TimeUnit.SECONDS));
226244

227-
assertEquals("No threads should have encountered errors", 0, errors.get());
228-
client.close();
245+
assertEquals("No threads should have encountered errors", 0, errors.get());
246+
} finally {
247+
executor.shutdownNow();
248+
client.close();
249+
}
229250
}
230251

231252
/**
@@ -236,51 +257,52 @@ public void testConcurrentResetAndQuery() throws Exception {
236257
final SpiceClient client = SpiceClient.builder().build();
237258
final int iterations = 5;
238259
final CountDownLatch startLatch = new CountDownLatch(1);
239-
final CountDownLatch doneLatch = new CountDownLatch(2);
240260
final List<Throwable> unexpectedErrors = new ArrayList<>();
261+
ExecutorService executor = Executors.newFixedThreadPool(2);
241262

242-
// Thread 1: repeated resets
243-
new Thread(() -> {
244-
try {
245-
startLatch.await();
246-
for (int i = 0; i < iterations; i++) {
247-
client.reset();
248-
Thread.sleep(10);
263+
try {
264+
// Thread 1: repeated resets
265+
executor.submit(() -> {
266+
try {
267+
startLatch.await();
268+
for (int i = 0; i < iterations; i++) {
269+
client.reset();
270+
Thread.sleep(10);
271+
}
272+
} catch (InterruptedException ignored) {
249273
}
250-
} catch (InterruptedException ignored) {
251-
} finally {
252-
doneLatch.countDown();
253-
}
254-
}).start();
274+
});
255275

256-
// Thread 2: repeated queries
257-
new Thread(() -> {
258-
try {
259-
startLatch.await();
260-
for (int i = 0; i < iterations; i++) {
261-
try {
262-
FlightStream stream = client.query("SELECT 1");
263-
stream.close();
264-
} catch (NullPointerException e) {
265-
unexpectedErrors.add(e);
266-
} catch (Exception e) {
267-
// Connection errors are expected
276+
// Thread 2: repeated queries
277+
executor.submit(() -> {
278+
try {
279+
startLatch.await();
280+
for (int i = 0; i < iterations; i++) {
281+
try {
282+
FlightStream stream = client.query("SELECT 1");
283+
stream.close();
284+
} catch (NullPointerException e) {
285+
unexpectedErrors.add(e);
286+
} catch (Exception e) {
287+
// Connection errors are expected
288+
}
289+
Thread.sleep(10);
268290
}
269-
Thread.sleep(10);
291+
} catch (InterruptedException ignored) {
270292
}
271-
} catch (InterruptedException ignored) {
272-
} finally {
273-
doneLatch.countDown();
274-
}
275-
}).start();
293+
});
276294

277-
startLatch.countDown();
278-
assertTrue("Concurrent reset+query should complete within 30s",
279-
doneLatch.await(30, java.util.concurrent.TimeUnit.SECONDS));
295+
startLatch.countDown();
296+
executor.shutdown();
297+
assertTrue("Concurrent reset+query should complete within 30s",
298+
executor.awaitTermination(30, TimeUnit.SECONDS));
280299

281-
assertTrue("Should not get NullPointerException during concurrent reset+query: " + unexpectedErrors,
282-
unexpectedErrors.isEmpty());
283-
client.close();
300+
assertTrue("Should not get NullPointerException during concurrent reset+query: " + unexpectedErrors,
301+
unexpectedErrors.isEmpty());
302+
} finally {
303+
executor.shutdownNow();
304+
client.close();
305+
}
284306
}
285307

286308
// ==================== Construction / DNS / Keep-alive ====================
@@ -391,6 +413,8 @@ public void testResetWithCustomConfig() throws Exception {
391413
* reset() followed by query() actually returns data.
392414
*/
393415
public void testResetThenQueryIntegration() throws Exception {
416+
if (!serverAvailable) return;
417+
394418
try (SpiceClient client = SpiceClient.builder().build()) {
395419
// First query (establishes connection)
396420
try (FlightStream stream1 = client.query(
@@ -414,14 +438,6 @@ public void testResetThenQueryIntegration() throws Exception {
414438
}
415439
assertEquals("Second query after reset should return 2 rows", 2, rows2);
416440
}
417-
} catch (Exception e) {
418-
// Skip if no local runtime or TPC-H data available
419-
String msg = e.getMessage() != null ? e.getMessage().toLowerCase() : "";
420-
if (msg.contains("unavailable") || msg.contains("connection refused")
421-
|| msg.contains("not found") || msg.contains("io exception")) {
422-
return;
423-
}
424-
fail("Unexpected error: " + e.getMessage());
425441
}
426442
}
427443

@@ -430,6 +446,8 @@ public void testResetThenQueryIntegration() throws Exception {
430446
* reset() followed by queryWithParams() actually returns data.
431447
*/
432448
public void testResetThenQueryWithParamsIntegration() throws Exception {
449+
if (!serverAvailable) return;
450+
433451
try (SpiceClient client = SpiceClient.builder().build()) {
434452
// First query
435453
try (ArrowReader reader1 = client.queryWithParams(
@@ -455,13 +473,6 @@ public void testResetThenQueryWithParamsIntegration() throws Exception {
455473
}
456474
assertTrue("Second parameterized query after reset should return rows", rows2 > 0);
457475
}
458-
} catch (Exception e) {
459-
String msg = e.getMessage() != null ? e.getMessage().toLowerCase() : "";
460-
if (msg.contains("unavailable") || msg.contains("connection refused")
461-
|| msg.contains("not found") || msg.contains("io exception")) {
462-
return;
463-
}
464-
fail("Unexpected error: " + e.getMessage());
465476
}
466477
}
467478
}

0 commit comments

Comments
 (0)