Skip to content

Commit c131d38

Browse files
authored
fix: harden registry model sync timeouts (#2407)
* fix: harden registry model sync timeouts * fix: reconnect after ambiguous registry add
1 parent 92ee4f0 commit c131d38

5 files changed

Lines changed: 213 additions & 4 deletions

File tree

.github/workflows/pr-models-validation-registry-server.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ jobs:
256256
working-directory: ${{ env.PKG_DIR }}
257257
shell: bash
258258
run: node scripts/sync-models.js --file=./data/models.prod.json
259-
timeout-minutes: 30
259+
timeout-minutes: 70
260260

261261
smoke-test:
262262
needs:

packages/registry-server/lib/registry-service.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,13 @@ class RegistryService extends ReadyResource {
210210
const peerKey = peerInfo?.publicKey ? IdEnc.normalize(peerInfo.publicKey) : null
211211

212212
this.logger.info({ peer: peerKey || 'unknown' }, 'Swarm connection opened')
213+
conn.on('error', err => {
214+
this.logger.warn({
215+
peer: peerKey || 'unknown',
216+
error: err.message,
217+
code: err.code
218+
}, 'Swarm connection error')
219+
})
213220
conn.on('close', () => {
214221
this.logger.info({ peer: peerKey || 'unknown' }, 'Swarm connection closed')
215222
})

packages/registry-server/scripts/sync-models.js

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ const { connectToRegistry } = require('./utils/rpc-client')
99
const { parseCanonicalSource } = require('../lib/source-helpers')
1010
const QVACRegistryClient = require('../client/lib/client')
1111

12+
const ADD_MODEL_RPC_TIMEOUT_MS = 60 * 60 * 1000
13+
const ADD_MODEL_POLL_INTERVAL_MS = 10 * 1000
14+
1215
async function syncModels () {
1316
const args = process.argv.slice(2)
1417
const fileArg = args.find(arg => arg.startsWith('--file='))
@@ -30,7 +33,7 @@ async function syncModels () {
3033
const client = new QVACRegistryClient({ registryCoreKey, logger })
3134
await client.ready()
3235

33-
const connection = await connectToRegistry({ config, logger })
36+
let connection = await connectToRegistry({ config, logger })
3437

3538
try {
3639
const configModels = JSON.parse(await fs.readFile(path.resolve(filePath), 'utf8'))
@@ -95,7 +98,30 @@ async function syncModels () {
9598
modelRequest.deprecationReason = entry.deprecationReason
9699
}
97100

98-
await connection.rpc.request('add-model', modelRequest)
101+
try {
102+
await connection.rpc.request('add-model', modelRequest, { timeout: ADD_MODEL_RPC_TIMEOUT_MS })
103+
} catch (err) {
104+
if (!isAmbiguousRpcError(err)) throw err
105+
106+
logger.warn({
107+
path: sourceInfo.path,
108+
source: sourceInfo.protocol,
109+
error: err.message,
110+
code: err.code
111+
}, 'add-model RPC ended ambiguously; polling registry for completed ingest')
112+
113+
const recovery = await recoverAfterAmbiguousAdd({
114+
client,
115+
sourceInfo,
116+
logger,
117+
connection,
118+
reconnect: () => connectToRegistry({ config, logger })
119+
})
120+
connection = recovery.connection
121+
122+
if (recovery.error) throw recovery.error
123+
dbByKey.set(key, recovery.model)
124+
}
99125
} else {
100126
logger.info(`[DRY RUN] Would add: ${sourceInfo.path}`)
101127
}
@@ -185,6 +211,88 @@ async function syncModels () {
185211
}
186212
}
187213

214+
async function recoverAfterAmbiguousAdd ({
215+
client,
216+
sourceInfo,
217+
logger,
218+
connection,
219+
reconnect,
220+
waitForModel = waitForModelAfterAmbiguousAdd
221+
}) {
222+
let model = null
223+
let error = null
224+
225+
try {
226+
model = await waitForModel({
227+
client,
228+
sourceInfo,
229+
logger
230+
})
231+
} catch (err) {
232+
error = err
233+
}
234+
235+
await connection.cleanup().catch(cleanupErr => {
236+
logger.warn({ error: cleanupErr.message }, 'Failed to clean up stale RPC connection')
237+
})
238+
239+
return {
240+
connection: await reconnect(),
241+
error,
242+
model
243+
}
244+
}
245+
246+
function isAmbiguousRpcError (err) {
247+
if (!err) return false
248+
249+
const code = err.code || err.cause?.code
250+
if (code === 'ETIMEDOUT' || code === 'CHANNEL_CLOSED' || code === 'CHANNEL_DESTROYED') {
251+
return true
252+
}
253+
254+
const message = String(err.message || '').toLowerCase()
255+
return (
256+
message.includes('connection timed out') ||
257+
message.includes('channel closed') ||
258+
message.includes('channel destroyed')
259+
)
260+
}
261+
262+
async function waitForModelAfterAmbiguousAdd ({
263+
client,
264+
sourceInfo,
265+
timeoutMs = ADD_MODEL_RPC_TIMEOUT_MS,
266+
pollIntervalMs = ADD_MODEL_POLL_INTERVAL_MS,
267+
logger = console,
268+
sleep = defaultSleep
269+
}) {
270+
const startedAt = Date.now()
271+
272+
while (Date.now() - startedAt <= timeoutMs) {
273+
if (client.db?.core?.update) {
274+
await client.db.core.update()
275+
}
276+
277+
const model = await client.getModel(sourceInfo.path, sourceInfo.protocol)
278+
if (model) {
279+
logger.info({
280+
path: sourceInfo.path,
281+
source: sourceInfo.protocol
282+
}, 'Model appeared after ambiguous add-model RPC')
283+
return model
284+
}
285+
286+
await sleep(pollIntervalMs)
287+
}
288+
289+
throw new Error(`Timed out waiting for model after ambiguous add-model RPC: ${sourceInfo.path}`)
290+
}
291+
292+
function defaultSleep (ms) {
293+
return new Promise(resolve => setTimeout(resolve, ms))
294+
}
295+
188296
function needsMetadataUpdate (config, existing, sourceInfo) {
189297
return (
190298
config.engine !== existing.engine ||
@@ -274,4 +382,10 @@ if (require.main === module) {
274382
})
275383
}
276384

277-
module.exports = { syncModels }
385+
module.exports = {
386+
ADD_MODEL_RPC_TIMEOUT_MS,
387+
recoverAfterAmbiguousAdd,
388+
isAmbiguousRpcError,
389+
syncModels,
390+
waitForModelAfterAmbiguousAdd
391+
}

packages/registry-server/scripts/utils/rpc-client.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ async function connectToRegistry ({ config, logger = console, storage = './temp-
9797

9898
logger.info('RPC Client: Connected to server', { peer: peerKey })
9999

100+
conn.on('error', err => {
101+
logger.warn({
102+
peer: peerKey,
103+
error: err.message,
104+
code: err.code
105+
}, 'RPC Client: connection error')
106+
})
107+
100108
const rpc = new ProtomuxRPC(conn, {
101109
protocol: 'qvac-registry-rpc',
102110
valueEncoding: cenc.json
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
'use strict'
2+
3+
const test = require('brittle')
4+
const {
5+
ADD_MODEL_RPC_TIMEOUT_MS,
6+
recoverAfterAmbiguousAdd,
7+
isAmbiguousRpcError,
8+
waitForModelAfterAmbiguousAdd
9+
} = require('../../scripts/sync-models')
10+
11+
test('add-model RPC timeout is one hour', t => {
12+
t.is(ADD_MODEL_RPC_TIMEOUT_MS, 60 * 60 * 1000)
13+
})
14+
15+
test('isAmbiguousRpcError identifies transport timeouts and channel closes', t => {
16+
t.ok(isAmbiguousRpcError(Object.assign(new Error('connection timed out'), { code: 'ETIMEDOUT' })))
17+
t.ok(isAmbiguousRpcError(Object.assign(new Error('closed'), { code: 'CHANNEL_CLOSED' })))
18+
t.ok(isAmbiguousRpcError(new Error('Channel closed')))
19+
t.absent(isAmbiguousRpcError(new Error('License not found')))
20+
})
21+
22+
test('waitForModelAfterAmbiguousAdd polls until the model appears', async t => {
23+
const expected = { path: 'repo/model.gguf', source: 'hf' }
24+
const calls = []
25+
26+
const client = {
27+
async getModel (modelPath, source) {
28+
calls.push([modelPath, source])
29+
return calls.length === 2 ? expected : null
30+
}
31+
}
32+
33+
const result = await waitForModelAfterAmbiguousAdd({
34+
client,
35+
sourceInfo: { path: expected.path, protocol: expected.source },
36+
timeoutMs: 10,
37+
pollIntervalMs: 5,
38+
logger: { info () {} },
39+
sleep: async () => {}
40+
})
41+
42+
t.alike(result, expected)
43+
t.alike(calls, [
44+
[expected.path, expected.source],
45+
[expected.path, expected.source]
46+
])
47+
})
48+
49+
test('recoverAfterAmbiguousAdd reconnects even when polling times out', async t => {
50+
t.plan(4)
51+
52+
const staleConnection = {
53+
cleaned: false,
54+
async cleanup () {
55+
this.cleaned = true
56+
}
57+
}
58+
const freshConnection = {}
59+
const pollError = new Error('poll timed out')
60+
let reconnects = 0
61+
62+
const result = await recoverAfterAmbiguousAdd({
63+
client: {},
64+
sourceInfo: { path: 'repo/model.gguf', protocol: 'hf' },
65+
logger: { info () {}, warn () {} },
66+
connection: staleConnection,
67+
reconnect: async () => {
68+
reconnects++
69+
return freshConnection
70+
},
71+
waitForModel: async () => {
72+
throw pollError
73+
}
74+
})
75+
76+
t.is(result.error, pollError)
77+
t.is(result.connection, freshConnection)
78+
t.is(reconnects, 1)
79+
t.ok(staleConnection.cleaned)
80+
})

0 commit comments

Comments
 (0)