Skip to content

Commit 70fb804

Browse files
authored
Add reconnect method to Client interface (#322)
Context: Occasionally a client can experience a `stream not found` error when turso-server is restarted with an existing ongoing stateful `stream`, that has a baton referencing a different `ConnectionManager` instance. There is currently no way of resetting the connection on the client side, without calling `close`. After calling `close` a new Client instance must be created because there is no way of re-opening the connection. This adds a `reconnect` method to the Client interface to allow for this.
2 parents 2014f81 + 76dd3ef commit 70fb804

File tree

5 files changed

+98
-4
lines changed

5 files changed

+98
-4
lines changed

packages/libsql-client-wasm/src/wasm.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,22 @@ export class Sqlite3Client implements Client {
219219
);
220220
}
221221

222+
async reconnect(): Promise<void> {
223+
try {
224+
if (!this.closed && this.#db !== null) {
225+
this.#db.close();
226+
}
227+
} finally {
228+
this.#db = new this.#sqlite3.oo1.DB(this.#path, "c");
229+
this.closed = false;
230+
}
231+
}
232+
222233
close(): void {
223234
this.closed = true;
224235
if (this.#db !== null) {
225236
this.#db.close();
237+
this.#db = null;
226238
}
227239
}
228240

packages/libsql-client/src/http.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ const sqlCacheCapacity = 30;
7474
export class HttpClient implements Client {
7575
#client: hrana.HttpClient;
7676
protocol: "http";
77+
#url: URL;
78+
#intMode: IntMode;
79+
#customFetch: Function | undefined;
80+
#concurrency: number;
7781
#authToken: string | undefined;
7882
#promiseLimitFunction: ReturnType<typeof promiseLimit<any>>;
7983

@@ -85,11 +89,20 @@ export class HttpClient implements Client {
8589
customFetch: Function | undefined,
8690
concurrency: number,
8791
) {
88-
this.#client = hrana.openHttp(url, authToken, customFetch);
89-
this.#client.intMode = intMode;
90-
this.protocol = "http";
92+
this.#url = url;
9193
this.#authToken = authToken;
92-
this.#promiseLimitFunction = promiseLimit<any>(concurrency);
94+
this.#intMode = intMode;
95+
this.#customFetch = customFetch;
96+
this.#concurrency = concurrency;
97+
98+
this.#client = hrana.openHttp(
99+
this.#url,
100+
this.#authToken,
101+
this.#customFetch,
102+
);
103+
this.#client.intMode = this.#intMode;
104+
this.protocol = "http";
105+
this.#promiseLimitFunction = promiseLimit<any>(this.#concurrency);
93106
}
94107

95108
private async limit<T>(fn: () => Promise<T>): Promise<T> {
@@ -267,6 +280,23 @@ export class HttpClient implements Client {
267280
this.#client.close();
268281
}
269282

283+
async reconnect(): Promise<void> {
284+
try {
285+
if (!this.closed) {
286+
// Abort in-flight ops and free resources
287+
this.#client.close();
288+
}
289+
} finally {
290+
// Recreate the underlying hrana client
291+
this.#client = hrana.openHttp(
292+
this.#url,
293+
this.#authToken,
294+
this.#customFetch,
295+
);
296+
this.#client.intMode = this.#intMode;
297+
}
298+
}
299+
270300
get closed(): boolean {
271301
return this.#client.closed;
272302
}

packages/libsql-client/src/sqlite3.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,22 @@ export class Sqlite3Client implements Client {
222222
} as Replicated;
223223
}
224224

225+
async reconnect(): Promise<void> {
226+
try {
227+
if (!this.closed && this.#db !== null) {
228+
this.#db.close();
229+
}
230+
} finally {
231+
this.#db = new Database(this.#path, this.#options);
232+
this.closed = false;
233+
}
234+
}
235+
225236
close(): void {
226237
this.closed = true;
227238
if (this.#db !== null) {
228239
this.#db.close();
240+
this.#db = null;
229241
}
230242
}
231243

packages/libsql-client/src/ws.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,35 @@ export class WsClient implements Client {
403403
}
404404
}
405405

406+
async reconnect(): Promise<void> {
407+
try {
408+
for (const st of Array.from(this.#connState.streamStates)) {
409+
try {
410+
st.stream.close();
411+
} catch {}
412+
}
413+
this.#connState.client.close();
414+
} catch {}
415+
416+
if (this.#futureConnState) {
417+
try {
418+
this.#futureConnState.client.close();
419+
} catch {}
420+
this.#futureConnState = undefined;
421+
}
422+
423+
const next = this.#openConn();
424+
const version = await next.client.getVersion();
425+
426+
next.useSqlCache = version >= 2;
427+
if (next.useSqlCache) {
428+
next.sqlCache.capacity = sqlCacheCapacity;
429+
}
430+
431+
this.#connState = next;
432+
this.closed = false;
433+
}
434+
406435
_closeStream(streamState: StreamState): void {
407436
streamState.stream.close();
408437

@@ -421,6 +450,13 @@ export class WsClient implements Client {
421450
close(): void {
422451
this.#connState.client.close();
423452
this.closed = true;
453+
if (this.#futureConnState) {
454+
try {
455+
this.#futureConnState.client.close();
456+
} catch {}
457+
this.#futureConnState = undefined;
458+
}
459+
this.closed = true;
424460
}
425461
}
426462

packages/libsql-core/src/api.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ export interface Client {
240240
*/
241241
close(): void;
242242

243+
/** Reconnect after the client has been closed.
244+
*/
245+
reconnect(): void;
246+
243247
/** Is the client closed?
244248
*
245249
* This is set to `true` after a call to {@link close} or if the client encounters an unrecoverable

0 commit comments

Comments
 (0)