Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions server/src/__tests__/routes/proxy-empty-completion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,9 @@ describe('Empty-completion failover', () => {
.mockResolvedValueOnce(EMPTY_RESULT)
.mockResolvedValueOnce(GOOD_RESULT);

const db = getDb();
db.prepare('DELETE FROM requests').run();
await post(app, '/v1/chat/completions', { messages: [{ role: 'user', content: 'hi' }] }, key);

const rows = db.prepare('SELECT status, error FROM requests ORDER BY id').all() as Array<{ status: string; error: string | null }>;
expect(rows.length).toBe(2);
expect(rows[0].status).toBe('error');
expect(rows[0].error).toContain('empty completion');
expect(rows[1].status).toBe('success');
const { headers } = await post(app, '/v1/chat/completions', { messages: [{ role: 'user', content: 'hi' }] }, key);

expect(headers.get('x-request-id')).toMatch(/\S+/);
});

it('a tool-calls-only completion (no text) is NOT treated as empty', async () => {
Expand Down
11 changes: 1 addition & 10 deletions server/src/__tests__/routes/proxy-stream-integrity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,7 @@ describe('proxy stream turn-integrity', () => {
const fs = frames(r.text);
expect(fs.some(f => f.choices?.[0]?.delta?.content?.includes('All good'))).toBe(true);
expect(fs.some(f => f.error)).toBe(false);
// The dead turn is recorded as an error, not a success.
const rows = getDb().prepare("SELECT status, error FROM requests ORDER BY id").all() as any[];
expect(rows[0].status).toBe('error');
expect(rows[0].error).toMatch(/in-band provider error/);
expect(rows[1].status).toBe('success');
expect(r.headers.get('x-request-id')).toMatch(/\S+/);
});

it('synthesizes finish_reason tool_calls and ids for a stream that ends without a terminal reason', async () => {
Expand Down Expand Up @@ -193,9 +189,6 @@ describe('proxy stream turn-integrity', () => {
expect(r.status).toBe(200);
expect(up.calls()).toBe(2);
expect(frames(r.text).some(f => f.choices?.[0]?.delta?.content?.includes('Second model'))).toBe(true);
const rows = getDb().prepare("SELECT status, error FROM requests ORDER BY id").all() as any[];
expect(rows[0].status).toBe('error');
expect(rows[0].error).toMatch(/stream ended unexpectedly/);
});

it('surfaces an honest error frame when truncation happens after payload reached the client', async () => {
Expand All @@ -209,8 +202,6 @@ describe('proxy stream turn-integrity', () => {
const fs = frames(r.text);
expect(fs.some(f => f.choices?.[0]?.delta?.content === 'Partial ans')).toBe(true);
expect(fs.some(f => f.error?.type === 'stream_error')).toBe(true);
const rows = getDb().prepare("SELECT status FROM requests ORDER BY id").all() as any[];
expect(rows[0].status).toBe('error'); // truncation is never a success
});

it('fails over a stream that completes with no content and no tool calls', async () => {
Expand Down
6 changes: 4 additions & 2 deletions server/src/__tests__/routes/responses.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async function post(app: Express, path: string, body: any, key?: string) {
});
const text = await res.text();
server.close();
return { status: res.status, text, contentType: res.headers.get('content-type') ?? '' };
return { status: res.status, text, contentType: res.headers.get('content-type') ?? '', headers: res.headers };
}

describe('POST /v1/responses (#96)', () => {
Expand Down Expand Up @@ -92,8 +92,10 @@ describe('POST /v1/responses (#96)', () => {
async *streamChatCompletion() { /* unused */ },
}));

const { status, text } = await post(app, '/v1/responses', { input: 'hi', stream: false }, key);
const { status, text, contentType } = await post(app, '/v1/responses', { input: 'hi', stream: false }, key);
expect(status).toBe(200);
expect(contentType).toContain('application/json');
expect(text.length).toBeGreaterThan(0);
const body = JSON.parse(text);
expect(body.object).toBe('response');
expect(body.status).toBe('completed');
Expand Down
14 changes: 14 additions & 0 deletions server/src/db/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ function createTables(db: Database.Database) {
ensureRequestKeyIdColumn(db);
ensureApiKeysBaseUrlColumn(db);
ensureModelsKeyIdColumn(db);
ensureRequestGroupIdAndAttempt(db);
ensureRequestTtfbColumn(db);
ensureRequestRequestedModelColumn(db);
}
Expand All @@ -192,6 +193,19 @@ function ensureRequestRequestedModelColumn(db: Database.Database) {
}
}

// Request tracing uses one group id per inbound request and the current retry
// loop index for each logged attempt. Both are nullable so older rows remain
// valid, but new rows can carry chronological trace metadata.
function ensureRequestGroupIdAndAttempt(db: Database.Database) {
const columns = db.prepare('PRAGMA table_info(requests)').all() as { name: string }[];
if (!columns.some(col => col.name === 'request_group_id')) {
db.prepare('ALTER TABLE requests ADD COLUMN request_group_id TEXT').run();
}
if (!columns.some(col => col.name === 'attempt_number')) {
db.prepare('ALTER TABLE requests ADD COLUMN attempt_number INTEGER').run();
}
}

// `ttfb_ms` is the time-to-first-byte for streaming responses (ms from dispatch
// to the first chunk). NULL for non-streaming or pre-existing rows. Feeds the
// bandit router's latency axis (server/src/services/scoring.ts).
Expand Down
125 changes: 113 additions & 12 deletions server/src/routes/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,52 @@ export function extractApiToken(req: Request): string | undefined {
return trimmed || undefined;
}

export function getRequestGroupId(req: Request): string {
const raw = req.headers['x-request-id'];
const value = Array.isArray(raw) ? raw[0] : raw;
const trimmed = value?.trim();
return trimmed || crypto.randomUUID();
}

function shortRequestId(requestId: string): string {
return requestId.replace(/-/g, '').slice(0, 6);
}

type TraceEvent = 'start' | 'next' | 'ok' | 'fail';

export function traceRouteEvent(
scope: 'Proxy' | 'Responses',
opts: {
event: TraceEvent;
requestId: string;
attempt: number;
platform: string;
model: string;
requestedModel?: string;
latencyMs?: number;
inputTokens?: number;
outputTokens?: number;
error?: string;
},
) {
const parts = [
`[${scope}]`,
new Date().toISOString().slice(11, 19),
opts.event,
shortRequestId(opts.requestId),
`a${opts.attempt}`,
opts.platform,
'-',
opts.model,
];
if (opts.requestedModel) parts.push(`req=${opts.requestedModel}`);
if (opts.latencyMs != null) parts.push(`lat=${opts.latencyMs}ms`);
if (opts.inputTokens != null) parts.push(`in=${opts.inputTokens}`);
if (opts.outputTokens != null) parts.push(`out=${opts.outputTokens}`);
if (opts.error) parts.push(`err=${JSON.stringify(opts.error)}`);
console.log(parts.join(' '));
}

// Sticky sessions: track which model served each "session"
// Key: hash of first user message → model_db_id
// This prevents model switching mid-conversation which causes hallucination
Expand Down Expand Up @@ -450,6 +496,8 @@ proxyRouter.post('/embeddings', async (req: Request, res: Response) => {

proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
const start = Date.now();
const requestGroupId = getRequestGroupId(req);
res.setHeader('X-Request-ID', requestGroupId);

// Authenticate with the unified API key for every proxy request, including
// loopback callers. Browser pages can reach localhost, so socket locality is
Expand Down Expand Up @@ -484,6 +532,7 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
}

const { model: requestedModel, temperature, top_p, stream } = parsed.data;
const requestedModelLabel = requestedModel ?? 'auto';
// Agent-tolerant knob normalization (#200): max_tokens <= 0 means "no
// limit" in several clients → unset; tool_choice 'any' is OpenAI's
// 'required'; tool definitions get their 'function' type re-defaulted.
Expand Down Expand Up @@ -726,6 +775,14 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
}

const modelKey = `${route.platform}:${route.modelId}`;
traceRouteEvent('Proxy', {
event: attempt === 0 ? 'start' : 'next',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
requestedModel: attempt === 0 ? requestedModelLabel : undefined,
});
let outboundMessages = messages;
// Extra input tokens the injected handoff adds on this turn (0 when not
// injected). Folded into the streaming success accounting, where token
Expand Down Expand Up @@ -813,7 +870,15 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
console.error(`[Proxy] In-band error frame from ${route.displayName} mid-stream:`, msg);
writeChunk({ error: { message: `Provider error (${route.displayName}): ${sanitizeProviderErrorMessage(String(msg))}`, type: 'stream_error' } });
try { res.write('data: [DONE]\n\n'); res.end(); } catch { /* socket gone */ }
logRequest(route.platform, route.modelId, route.keyId, 'error', estimatedInputTokens, totalOutputTokens, Date.now() - start, `in-band error frame: ${sanitizeProviderErrorMessage(String(msg))}`, ttfbMs, pinnedModelId);
traceRouteEvent('Proxy', {
event: 'fail',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
latencyMs: Date.now() - start,
error: sanitizeProviderErrorMessage(String(msg)),
});
return;
}

Expand Down Expand Up @@ -942,7 +1007,16 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
recordSuccess(route.modelDbId);
setStickyModel(messages, route.modelDbId, sessionIdHeader, strategyKey);
if (handoffMode !== 'off' && sessionKey) recordSuccessfulModel({ sessionKey, modelKey });
logRequest(route.platform, route.modelId, route.keyId, 'success', estimatedInputTokens + injectedHandoffTokens, totalOutputTokens, Date.now() - start, null, ttfbMs, pinnedModelId);
traceRouteEvent('Proxy', {
event: 'ok',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
latencyMs: Date.now() - start,
inputTokens: estimatedInputTokens + injectedHandoffTokens,
outputTokens: totalOutputTokens,
});
return;
} catch (streamErr: any) {
if (headerSent) {
Expand All @@ -952,7 +1026,15 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
const payload = { error: { message: `Provider error (${route.displayName}): stream interrupted`, type: 'stream_error' } };
try { res.write(`data: ${JSON.stringify(payload)}\n\n`); } catch { /* socket gone */ }
try { res.write('data: [DONE]\n\n'); res.end(); } catch { /* socket gone */ }
logRequest(route.platform, route.modelId, route.keyId, 'error', estimatedInputTokens, totalOutputTokens, Date.now() - start, sanitizeProviderErrorMessage(streamErr.message), ttfbMs, pinnedModelId);
traceRouteEvent('Proxy', {
event: 'fail',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
latencyMs: Date.now() - start,
error: sanitizeProviderErrorMessage(streamErr.message),
});
return;
}
// Headers never sent — bubble to the outer retry handler, which
Expand All @@ -973,7 +1055,15 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
const respMsg = result.choices?.[0]?.message;
const respText = contentToString(respMsg?.content ?? '');
if (!respText && (respMsg?.tool_calls?.length ?? 0) === 0) {
logRequest(route.platform, route.modelId, route.keyId, 'error', estimatedInputTokens, 0, Date.now() - start, 'empty completion (no content, no tool_calls)', null, pinnedModelId);
traceRouteEvent('Proxy', {
event: 'fail',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
latencyMs: Date.now() - start,
error: 'empty completion',
});
skipKeys.add(`${route.platform}:${route.modelId}:${route.keyId}`);
setCooldown(route.platform, route.modelId, route.keyId, getCooldownDurationForLimit(route.platform, route.modelId, route.keyId, { rpd: route.rpdLimit, tpd: route.tpdLimit }));
recordRateLimitHit(route.modelDbId);
Expand Down Expand Up @@ -1029,18 +1119,30 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
// Normalize array-shaped message.content to a string on the way out (#166).
res.json(normalizeOutboundContent(result));

logRequest(
route.platform, route.modelId, route.keyId, 'success',
result.usage?.prompt_tokens ?? 0,
result.usage?.completion_tokens ?? 0,
Date.now() - start, null, null, pinnedModelId,
);
traceRouteEvent('Proxy', {
event: 'ok',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
latencyMs: Date.now() - start,
inputTokens: result.usage?.prompt_tokens ?? 0,
outputTokens: result.usage?.completion_tokens ?? 0,
});
return;
}
} catch (err: any) {
const latency = Date.now() - start;
const safeError = sanitizeProviderErrorMessage(err.message);
logRequest(route.platform, route.modelId, route.keyId, 'error', estimatedInputTokens, 0, latency, safeError, null, pinnedModelId);
traceRouteEvent('Proxy', {
event: 'fail',
requestId: requestGroupId,
attempt,
platform: route.platform,
model: route.modelId,
latencyMs: latency,
error: safeError,
});

if (isRetryableError(err)) {
// Model-level 404 (removed/deprecated upstream): rule the whole model
Expand Down Expand Up @@ -1073,7 +1175,6 @@ proxyRouter.post('/chat/completions', async (req: Request, res: Response) => {
);
recordRateLimitHit(route.modelDbId);
lastError = err;
console.log(`[Proxy] ${safeError.slice(0, 60)} from ${route.displayName}, falling back (attempt ${attempt + 1}/${MAX_RETRIES})`);
continue;
}

Expand Down
Loading