Skip to content

Commit ec1699c

Browse files
committed
fix: address PR#60 review — refCount, shutdown path, dead code, SHA-256 keys
Must-fix #1: Remove dead ensureHealthy() + reconnectCount - ensureHealthy() was defined but never called (acquire() used direct socket.readyState check instead). Removed entirely along with the reconnectCount field from PooledConnection. Must-fix #2: Replace isActive boolean with refCount for concurrent safety - When maxConcurrentTasks > 1, multiple dispatches share the same WS connection. The old isActive boolean meant release() from dispatch A would mark the connection idle and start eviction timer, killing the connection for dispatch B still in progress. - Now: acquire() increments refCount, release() decrements. Heartbeat and eviction only start when refCount drops to 0. - Added concurrent acquire test that verifies: 2 acquires → 1 release → connection stays alive through eviction timeout → final release → connection properly evicted. Must-fix #3: Add OpenClawAgentExecutor.close() + wire into index.ts stop() - The WS pool's destroy() was never called during plugin shutdown, leaking connections and timers on hot-reload/test scenarios. - Added close() method to OpenClawAgentExecutor, called from the plugin's stop() lifecycle hook. Nice-to-have #4: Replace conn["socket"]?.readyState with conn.isOpen getter - Added public isOpen getter to GatewayRpcConnection for type-safe socket state checking. Pool code now uses conn.isOpen instead of bypassing TypeScript's private access control. Nice-to-have #5: SHA-256 buildKey instead of plaintext in Map keys - gatewayPassword was stored as plaintext in Map keys. Now uses crypto.createHash('sha256') to produce a one-way hash, reducing credential surface in heap dumps.
1 parent 1845fd4 commit ec1699c

3 files changed

Lines changed: 82 additions & 54 deletions

File tree

index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,9 @@ const plugin = {
330330
const pushStore = new PushNotificationStore();
331331
const client = new A2AClient();
332332
const taskStore = new FileTaskStore(config.storage.tasksDir);
333+
const agentExecutor = new OpenClawAgentExecutor(api, config);
333334
const executor = new QueueingAgentExecutor(
334-
new OpenClawAgentExecutor(api, config),
335+
agentExecutor,
335336
telemetry,
336337
config.limits,
337338
config.routing.defaultAgentId,
@@ -981,6 +982,7 @@ const plugin = {
981982
healthManager?.stop();
982983
auditLogger.close();
983984
client.destroy();
985+
agentExecutor.close();
984986

985987
// Stop task cleanup timer
986988
if (cleanupTimer) {

src/executor.ts

Lines changed: 34 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,11 @@ export class GatewayRpcConnection {
760760
this.rejectAllPending(error);
761761
}
762762

763+
/** Whether the underlying WebSocket socket is currently open (readyState === OPEN). */
764+
get isOpen(): boolean {
765+
return this.socket?.readyState === 1;
766+
}
767+
763768
private awaitConnectChallenge(): Promise<void> {
764769
if (this.connectChallengeRejecter) {
765770
this.connectChallengeRejecter(new Error("gateway connect challenge wait superseded"));
@@ -990,10 +995,11 @@ interface PooledConnection {
990995
conn: GatewayRpcConnection;
991996
key: string;
992997
lastUsedAt: number;
993-
isActive: boolean;
998+
/** Reference count: how many concurrent acquire() calls are using this connection.
999+
* When refCount > 0 the connection is active; when refCount === 0 it is idle. */
1000+
refCount: number;
9941001
heartbeatTimer: ReturnType<typeof setTimeout> | null;
9951002
evictionTimer: ReturnType<typeof setTimeout> | null;
996-
reconnectCount: number;
9971003
}
9981004

9991005
/**
@@ -1048,11 +1054,12 @@ export class GatewayRpcConnectionPool {
10481054
const existing = this.pool.get(key);
10491055

10501056
// Hot path: reuse healthy connection
1051-
if (existing && existing.conn && existing.conn["socket"]?.readyState === 1) {
1052-
existing.isActive = true;
1057+
if (existing && existing.conn && existing.conn.isOpen) {
1058+
existing.refCount++;
10531059
existing.lastUsedAt = Date.now();
10541060
this.stopHeartbeat(existing);
1055-
this.resetEviction(existing);
1061+
// Eviction timer is already stopped while refCount > 0 (set in release),
1062+
// but we reset it here for housekeeping so the clock starts fresh on next release.
10561063
this.totalReuses++;
10571064
return existing.conn;
10581065
}
@@ -1067,19 +1074,18 @@ export class GatewayRpcConnectionPool {
10671074
conn,
10681075
key,
10691076
lastUsedAt: Date.now(),
1070-
isActive: true,
1077+
refCount: 1,
10711078
heartbeatTimer: null,
10721079
evictionTimer: null,
1073-
reconnectCount: 0,
10741080
};
10751081
this.pool.set(key, entry);
1076-
this.resetEviction(entry);
10771082
return conn;
10781083
}
10791084

10801085
/**
1081-
* Return a connection to the pool. After release, the connection may be
1082-
* reused by the next acquire() call for the same key.
1086+
* Return a connection to the pool. Decrements the reference count; only
1087+
* when all acquire() calls have been released (refCount === 0) does the
1088+
* connection transition to idle state with heartbeat + eviction timer.
10831089
*/
10841090
release(config: GatewayRuntimeConfig): void {
10851091
if (this.destroyed) return;
@@ -1088,17 +1094,21 @@ export class GatewayRpcConnectionPool {
10881094
const entry = this.pool.get(key);
10891095
if (!entry) return;
10901096

1091-
entry.isActive = false;
1097+
entry.refCount = Math.max(0, entry.refCount - 1);
10921098
entry.lastUsedAt = Date.now();
1093-
this.startHeartbeat(entry);
1094-
this.resetEviction(entry);
1099+
1100+
if (entry.refCount === 0) {
1101+
// Connection is now idle — start keep-alive and eviction timer
1102+
this.startHeartbeat(entry);
1103+
this.resetEviction(entry);
1104+
}
10951105
}
10961106

10971107
getStats(): WsPoolStats {
10981108
let idle = 0;
10991109
let active = 0;
11001110
for (const entry of this.pool.values()) {
1101-
if (entry.isActive) active++;
1111+
if (entry.refCount > 0) active++;
11021112
else idle++;
11031113
}
11041114
return {
@@ -1131,7 +1141,9 @@ export class GatewayRpcConnectionPool {
11311141
// -- Internal helpers -------------------------------------------------------
11321142

11331143
private buildKey(config: GatewayRuntimeConfig): string {
1134-
return `${config.wsUrl}::${config.gatewayToken}::${config.gatewayPassword}`;
1144+
const h = crypto.createHash("sha256");
1145+
h.update(config.wsUrl + "\0" + config.gatewayToken + "\0" + config.gatewayPassword);
1146+
return h.digest("hex");
11351147
}
11361148

11371149
private async createAndConnect(config: GatewayRuntimeConfig, attempt: number): Promise<GatewayRpcConnection> {
@@ -1151,40 +1163,6 @@ export class GatewayRpcConnectionPool {
11511163
}
11521164
}
11531165

1154-
/**
1155-
* Check connection health. If the socket is dead, attempt a transparent reconnect.
1156-
* Returns true if the connection is healthy (or was successfully reconnected).
1157-
*/
1158-
private async ensureHealthy(entry: PooledConnection, config: GatewayRuntimeConfig): Promise<boolean> {
1159-
if (entry.conn["socket"]?.readyState === 1) {
1160-
return true;
1161-
}
1162-
1163-
// Connection is dead — reconnect
1164-
if (entry.reconnectCount >= this.maxReconnectAttempts) {
1165-
return false;
1166-
}
1167-
1168-
try {
1169-
entry.conn.close();
1170-
} catch {
1171-
// ignore close errors on dead socket
1172-
}
1173-
1174-
entry.reconnectCount++;
1175-
this.totalReconnects++;
1176-
1177-
try {
1178-
const conn = await this.createAndConnect(config, 0);
1179-
entry.conn = conn;
1180-
entry.lastUsedAt = Date.now();
1181-
entry.reconnectCount = 0; // reset on successful reconnect
1182-
return true;
1183-
} catch {
1184-
return false;
1185-
}
1186-
}
1187-
11881166
private evictConnection(entry: PooledConnection): void {
11891167
this.stopHeartbeat(entry);
11901168
this.stopEviction(entry);
@@ -1202,7 +1180,7 @@ export class GatewayRpcConnectionPool {
12021180
// Send a lightweight ping to keep the connection alive.
12031181
// We use the `request` method with a minimal method — if the socket
12041182
// is dead, the pending request will timeout and we'll learn about it.
1205-
if (entry.conn["socket"]?.readyState !== 1) {
1183+
if (!entry.conn.isOpen) {
12061184
// Socket is not open — stop heartbeat, let eviction or next acquire handle it
12071185
this.stopHeartbeat(entry);
12081186
return;
@@ -1231,7 +1209,7 @@ export class GatewayRpcConnectionPool {
12311209
private resetEviction(entry: PooledConnection): void {
12321210
this.stopEviction(entry);
12331211
entry.evictionTimer = setTimeout(() => {
1234-
if (!entry.isActive) {
1212+
if (entry.refCount === 0) {
12351213
this.evictConnection(entry);
12361214
}
12371215
}, this.idleTimeoutMs);
@@ -1275,6 +1253,11 @@ export class OpenClawAgentExecutor implements AgentExecutor {
12751253
this.wsPool = new GatewayRpcConnectionPool();
12761254
}
12771255

1256+
/** Clean up the WS connection pool. Call this during plugin shutdown. */
1257+
close(): void {
1258+
this.wsPool.destroy();
1259+
}
1260+
12781261
async execute(requestContext: RequestContext, eventBus: ExecutionEventBus): Promise<void> {
12791262
const agentId = pickAgentId(requestContext, this.defaultAgentId);
12801263
const taskId = requestContext.taskId;

tests/ws-pool.test.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,13 +460,13 @@ describe("GatewayRpcConnectionPool", () => {
460460
const pool = new GatewayRpcConnectionPool({ idleTimeoutMs: 5_000 });
461461
const config = makeConfig();
462462

463-
// Acquire — active
463+
// Acquire — active (refCount = 1)
464464
await pool.acquire(config);
465465
let stats = pool.getStats();
466466
assert.equal(stats.activeConnections, 1);
467467
assert.equal(stats.idleConnections, 0);
468468

469-
// Release — idle
469+
// Release — idle (refCount = 0)
470470
pool.release(config);
471471
stats = pool.getStats();
472472
assert.equal(stats.activeConnections, 0);
@@ -475,6 +475,49 @@ describe("GatewayRpcConnectionPool", () => {
475475
pool.destroy();
476476
});
477477

478+
it("concurrent acquire: connection is not evicted while still in use", async () => {
479+
const pool = new GatewayRpcConnectionPool({
480+
idleTimeoutMs: 200, // short timeout for test
481+
heartbeatIntervalMs: 999_999,
482+
});
483+
const config = makeConfig();
484+
485+
// Two concurrent acquires — both get the same connection
486+
const conn1 = await pool.acquire(config);
487+
const conn2 = await pool.acquire(config);
488+
489+
// Same underlying connection object
490+
assert.strictEqual(conn1, conn2, "Concurrent acquires should return the same connection");
491+
492+
let stats = pool.getStats();
493+
assert.equal(stats.activeConnections, 1, "One active connection with refCount=2");
494+
assert.equal(stats.idleConnections, 0);
495+
496+
// Release one — connection should still be active (refCount = 1)
497+
pool.release(config);
498+
stats = pool.getStats();
499+
assert.equal(stats.activeConnections, 1, "Still active after partial release");
500+
assert.equal(stats.idleConnections, 0);
501+
502+
// Wait past the idle timeout — should NOT be evicted because refCount > 0
503+
await new Promise((r) => setTimeout(r, 350));
504+
stats = pool.getStats();
505+
assert.equal(stats.connections, 1, "Connection not evicted while still in use");
506+
507+
// Release the remaining one — now idle (refCount = 0)
508+
pool.release(config);
509+
stats = pool.getStats();
510+
assert.equal(stats.activeConnections, 0);
511+
assert.equal(stats.idleConnections, 1, "Now idle after final release");
512+
513+
// Wait for eviction
514+
await new Promise((r) => setTimeout(r, 350));
515+
stats = pool.getStats();
516+
assert.equal(stats.connections, 0, "Connection evicted after becoming idle");
517+
518+
pool.destroy();
519+
});
520+
478521
it("reuses connection for many sequential dispatches", async () => {
479522
const pool = new GatewayRpcConnectionPool({ idleTimeoutMs: 5_000 });
480523
const config = makeConfig();

0 commit comments

Comments
 (0)