Skip to content

Commit 72a28ba

Browse files
committed
feat: Add reset() method, gRPC keep-alive, and transport resilience
- Add public reset() method to SpiceClient for recovering from unrecoverable transport failures (SSL cert mismatch, persistent UNAVAILABLE errors, stale connections pinned to wrong backend IPs) - Extract buildFlightClient() for reuse during construction and reset - Configure HTTP/2 keep-alive (30s interval, 10s timeout) on the gRPC channel to detect dead/idle connections behind load balancers - Add ensureFlightClient() guard in queryInternal() for safety - Handle null flightClient in close() after reset - Add ResetTest.java with 20 unit tests covering happy path, edge cases, concurrency, construction variants, and integration - Document transport resilience and reset() usage in README
1 parent 042afe0 commit 72a28ba

3 files changed

Lines changed: 587 additions & 14 deletions

File tree

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,38 @@ SpiceClient client = SpiceClient.builder()
213213
.build();
214214
```
215215

216+
### Long-lived Clients and Transport Resilience
217+
218+
The `SpiceClient` is designed for long-lived reuse. The underlying gRPC channel uses `dns:///` resolution, which periodically re-resolves hostnames so clients automatically recover from load-balancer IP rotation (e.g. AWS NLB). HTTP/2 keep-alive is enabled by default (30s interval, 10s timeout) to detect dead connections quickly.
219+
220+
For the rare case where the transport becomes permanently stuck (e.g. TLS handshake to a wrong backend, persistent `UNAVAILABLE` after retries), use `reset()` to discard the bad connection and immediately establish a fresh one:
221+
222+
```java
223+
SpiceClient client = SpiceClient.builder()
224+
.withApiKey(API_KEY)
225+
.withSpiceCloud()
226+
.build();
227+
228+
// Long-lived usage with transport recovery
229+
try {
230+
FlightStream stream = client.query(sql);
231+
// process results...
232+
} catch (ExecutionException e) {
233+
if (isTransportFailure(e.getCause())) {
234+
client.reset(); // discard bad transport, reconnect immediately
235+
FlightStream stream = client.query(sql); // no connection overhead
236+
} else {
237+
throw e;
238+
}
239+
}
240+
```
241+
242+
**DNS cache TTL:** The gRPC `DnsNameResolver` respects the JVM's DNS cache TTL. For more aggressive DNS refresh (recommended for cloud-deployed clients), set the JVM property:
243+
244+
```bash
245+
-Dnetworkaddress.cache.ttl=30
246+
```
247+
216248
### Iterating Through Results
217249

218250
For more control over query results, you can iterate through rows and access individual field values:

src/main/java/ai/spice/SpiceClient.java

Lines changed: 97 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ of this software and associated documentation files (the "Software"), to deal
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.concurrent.ExecutionException;
42+
import java.util.concurrent.TimeUnit;
4243

4344
import org.apache.arrow.adbc.core.AdbcConnection;
4445
import org.apache.arrow.adbc.core.AdbcDatabase;
@@ -215,6 +216,26 @@ public SpiceClient(String appId, String apiKey, URI flightAddress, URI httpAddre
215216
: memoryLimitMB * BYTES_PER_MB;
216217
this.allocator = new RootAllocator(memoryLimitBytes);
217218

219+
// Build the Flight client (channel + auth handshake)
220+
buildFlightClient();
221+
222+
// Initialize cached retryers (immutable, built once)
223+
initRetryers();
224+
225+
logger.debug("SpiceClient initialized - flightAddress={}, appId={}", this.flightAddress, this.appId);
226+
}
227+
228+
/**
229+
* Builds or rebuilds the Flight client, including the gRPC channel and auth handshake.
230+
* This method is called during construction and after {@link #reset()}.
231+
*
232+
* <p>The gRPC channel is configured with:</p>
233+
* <ul>
234+
* <li>{@code dns:///} target scheme for periodic DNS re-resolution behind load balancers</li>
235+
* <li>HTTP/2 keep-alive (30s interval, 10s timeout) to detect dead connections quickly</li>
236+
* </ul>
237+
*/
238+
private synchronized void buildFlightClient() {
218239
// Build a gRPC channel using forTarget() with the "dns:///" scheme so that
219240
// gRPC's DnsNameResolver periodically re-resolves the hostname. This is critical
220241
// for long-lived clients connecting to load-balanced endpoints (e.g. AWS ALBs)
@@ -246,15 +267,18 @@ public SpiceClient(String appId, String apiKey, URI flightAddress, URI httpAddre
246267
channelBuilder.usePlaintext();
247268
}
248269
channelBuilder
270+
// HTTP/2 keep-alive to detect dead/idle connections behind load balancers
271+
.keepAliveTime(30, TimeUnit.SECONDS)
272+
.keepAliveTimeout(10, TimeUnit.SECONDS)
273+
.keepAliveWithoutCalls(true)
249274
.maxInboundMessageSize(Integer.MAX_VALUE)
250275
.maxInboundMetadataSize(Integer.MAX_VALUE);
251276
ManagedChannel channel = channelBuilder.build();
252277

253278
if (Strings.isNullOrEmpty(apiKey)) {
254279
FlightClient client = FlightGrpcUtils.createFlightClient(allocator, channel);
255280
this.flightClient = new FlightSqlClient(client);
256-
initRetryers();
257-
logger.debug("SpiceClient initialized (unauthenticated) - flightAddress={}, target={}", this.flightAddress, target);
281+
logger.debug("Flight client built (unauthenticated) - target={}", target);
258282
return;
259283
}
260284

@@ -282,13 +306,69 @@ public SpiceClient(String appId, String apiKey, URI flightAddress, URI httpAddre
282306
client.handshake(new CredentialCallOption(new BasicAuthCredentialWriter(this.appId, this.apiKey)));
283307
this.authCallOptions = authFactory.getCredentialCallOption();
284308
this.flightClient = new FlightSqlClient(client);
285-
286-
// Initialize cached retryers (immutable, built once)
287-
initRetryers();
288-
289-
logger.debug("SpiceClient initialized (authenticated) - flightAddress={}, appId={}, target={}", this.flightAddress, this.appId, target);
309+
310+
logger.debug("Flight client built (authenticated) - target={}, appId={}", target, this.appId);
290311
}
291-
312+
313+
/**
314+
* Ensures the Flight client is connected, rebuilding it if necessary
315+
* (e.g. after a {@link #reset()} call).
316+
*/
317+
private synchronized void ensureFlightClient() {
318+
if (this.flightClient == null) {
319+
buildFlightClient();
320+
}
321+
}
322+
323+
/**
324+
* Resets the underlying gRPC transport by closing the current Flight client and ADBC connections,
325+
* then immediately establishes a fresh connection with a new DNS lookup and TLS handshake.
326+
* This ensures the next {@link #query(String)} or {@link #queryWithParams(String, Object...)}
327+
* call does not incur connection setup overhead.
328+
*
329+
* <p>Use this method to recover from unrecoverable transport failures such as:</p>
330+
* <ul>
331+
* <li>SSLHandshakeException with mismatched certificates (e.g. load-balancer routing to wrong backend)</li>
332+
* <li>Persistent UNAVAILABLE errors after exhausting retries</li>
333+
* <li>Stale connections pinned to decommissioned backend IPs</li>
334+
* </ul>
335+
*
336+
* <p>Example usage for long-lived clients:</p>
337+
* <pre>{@code
338+
* try {
339+
* return client.query(sql);
340+
* } catch (ExecutionException e) {
341+
* if (isTransportFailure(e.getCause())) {
342+
* client.reset();
343+
* return client.query(sql); // retry with fresh connection
344+
* }
345+
* throw e;
346+
* }
347+
* }</pre>
348+
*/
349+
public synchronized void reset() {
350+
logger.info("Resetting SpiceClient transport");
351+
352+
// Close ADBC resources (they maintain a separate Flight connection)
353+
closeADBC();
354+
355+
// Close Flight client (this also shuts down the underlying gRPC channel)
356+
if (this.flightClient != null) {
357+
try {
358+
this.flightClient.close();
359+
} catch (Exception e) {
360+
logger.warn("Error closing Flight client during reset: {}", e.getMessage());
361+
}
362+
this.flightClient = null;
363+
}
364+
this.authCallOptions = null;
365+
366+
// Eagerly re-establish the connection so the next query has no setup overhead
367+
buildFlightClient();
368+
369+
logger.info("SpiceClient transport reset and reconnected.");
370+
}
371+
292372
/**
293373
* Initializes the cached retryer instances.
294374
* Called from constructor and must be called after maxRetries is set.
@@ -907,6 +987,7 @@ public void refreshDataset(String dataset, RefreshOptions refreshOptions) throws
907987
}
908988

909989
private FlightStream queryInternal(String sql) {
990+
ensureFlightClient();
910991
FlightInfo flightInfo = this.flightClient.execute(sql, authCallOptions);
911992
Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
912993
return this.flightClient.getStream(ticket, authCallOptions);
@@ -942,12 +1023,14 @@ public void close() throws Exception {
9421023
}
9431024

9441025
// Close Flight client
945-
try {
946-
this.flightClient.close();
947-
logger.debug("Flight client closed");
948-
} catch (Exception e) {
949-
logger.warn("Error closing Flight client: {}", e.getMessage());
950-
exceptions.add(e);
1026+
if (this.flightClient != null) {
1027+
try {
1028+
this.flightClient.close();
1029+
logger.debug("Flight client closed");
1030+
} catch (Exception e) {
1031+
logger.warn("Error closing Flight client: {}", e.getMessage());
1032+
exceptions.add(e);
1033+
}
9511034
}
9521035

9531036
// Close allocator

0 commit comments

Comments
 (0)