Skip to content

Commit 5ba1b6d

Browse files
committed
feat(chat): route Gemma via LiteLLM, add Qwen 3.6, deadline + fallback
Regolo's gemma4-31b endpoint hangs upstream — every notebook chat with the default Gemma model just spun forever because the AI SDK has no built-in first-token deadline. This change: - Adds litellmFetchWithThinkingDisabled (sibling to regoloThinkingFetch) injecting Ollama's `think: false` so LiteLLM-served gemma streams content instead of burning its entire token budget on `reasoning`. - Re-routes the user-facing "Gemma 4" model from Regolo → LiteLLM. Old `gemma-regolo` ID is aliased server-side and migrated client-side (chatStore v6) to the new `gemma-litellm` ID. - Adds Qwen 3.6 27B as a selectable model (already in the existing Regolo reasoning-stream allowlist, so no extra wiring). - Introduces a 20s first-token deadline + single-step cross-provider fallback (gemma-litellm ↔ gpt-oss-regolo) in responseStreamingService. Qwen entries intentionally have no `fallback` field — the Chinese-only-when-selected firewall (informed-consent boundary, documented in ModelConfig). - Fixes pre-existing bug: getModel('litellm', modelId) ignored the modelId arg and always used LITELLM_DEFAULT_MODEL. Fallback is silent end-user-side: server emits a `fallback` SSE event, both runtime adapters log it to the browser console, no UI banner. Implementation notes: - streamAndAccumulate / streamAndAccumulateWithReasoning now have a shared `wrapWithCompatCatch` factory and an `*OrThrow` internal layer used by streamWithFallback. Existing chat router callers see the same null-on-failure shape, plus the new deadline + empty-completion safety nets for free. - Single shared deadline across initial-probe iterations (was accidentally giving 40s grace via per-call setTimeout). - Reasoning streamer split into Phase-1 (race vs deadline until first text) + Phase-2 (drain without race) — eliminates wasted Promise.race microtask hops on every reasoning chunk after first content. - Uses native AbortSignal.any() (Node 20.3+) instead of a hand-rolled composeAbortSignals helper.
1 parent f44d8d6 commit 5ba1b6d

9 files changed

Lines changed: 632 additions & 205 deletions

File tree

apps/api/routes/chat/agents/providers.ts

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { createMistral } from '@ai-sdk/mistral';
77
import { createOpenAI } from '@ai-sdk/openai';
88

99
import { env } from '../../../config/env.js';
10+
import { litellmFetchWithThinkingDisabled } from '../../../services/ai/litellmThinkingFetch.js';
1011
import { isVisionCapable } from '../../../services/ai/modelDiscovery.js';
1112
import { regoloFetchWithThinkingDisabled } from '../../../services/ai/regoloThinkingFetch.js';
1213

@@ -29,33 +30,69 @@ export { isVisionCapable };
2930
* Maps user-facing model IDs to provider/model configurations.
3031
* contextWindow is in tokens — used by downstream context management to adapt budgets.
3132
*/
32-
export const AVAILABLE_MODELS: Record<
33-
string,
34-
{ provider: 'mistral' | 'litellm' | 'regolo'; model: string; contextWindow: number }
35-
> = {
33+
export interface ModelConfig {
34+
provider: 'mistral' | 'litellm' | 'regolo';
35+
model: string;
36+
contextWindow: number;
37+
/**
38+
* User-facing model ID to fall back to when this model fails to produce
39+
* output (first-token timeout, empty completion, or upstream HTTP error).
40+
*
41+
* Chinese-trained models (Qwen) intentionally have NO fallback. The user
42+
* sees an explicit "Chinesisches Modell"-warning before selecting them
43+
* (informed-consent boundary in chatStore.ts MODEL_OPTIONS); auto-routing
44+
* either INTO or OUT OF Qwen would break that contract. Qwen failures must
45+
* surface as errors so the user can choose to retry or switch manually.
46+
*/
47+
fallback?: string;
48+
}
49+
50+
export const AVAILABLE_MODELS: Record<string, ModelConfig> = {
3651
// 'mistral' is intentionally absent — it uses agent defaults (like 'auto')
3752
// Legacy IDs kept for backward compatibility (old stored client preferences)
3853
'mistral-large': { provider: 'mistral', model: 'mistral-large-latest', contextWindow: 128000 },
3954
'mistral-medium': { provider: 'mistral', model: 'mistral-medium-latest', contextWindow: 128000 },
4055
'pixtral-large': { provider: 'mistral', model: 'pixtral-large-latest', contextWindow: 128000 },
41-
litellm: { provider: 'litellm', model: 'gpt-oss:120b', contextWindow: 16384 },
56+
litellm: {
57+
provider: 'litellm',
58+
model: 'gpt-oss:120b',
59+
contextWindow: 16384,
60+
fallback: 'gpt-oss-regolo',
61+
},
4262
regolo: {
4363
provider: 'regolo',
4464
model: env.REGOLO_DEFAULT_MODEL || 'qwen3.5-122b',
4565
contextWindow: 32768,
4666
},
47-
'gpt-oss-regolo': { provider: 'regolo', model: 'gpt-oss-120b', contextWindow: 32768 },
48-
'gemma-regolo': { provider: 'regolo', model: 'gemma4-31b', contextWindow: 32768 },
67+
'gpt-oss-regolo': {
68+
provider: 'regolo',
69+
model: 'gpt-oss-120b',
70+
contextWindow: 32768,
71+
fallback: 'gemma-litellm',
72+
},
73+
// Chinese-trained models — no `fallback` field by design. See ModelConfig.
4974
'qwen-regolo': { provider: 'regolo', model: 'qwen3.5-122b', contextWindow: 32768 },
75+
'qwen3.6-regolo': { provider: 'regolo', model: 'qwen3.6-27b', contextWindow: 32768 },
76+
};
77+
78+
const GEMMA_LITELLM: ModelConfig = {
79+
provider: 'litellm',
80+
model: 'gpt-oss:120b',
81+
contextWindow: 32768,
82+
fallback: 'gpt-oss-regolo',
5083
};
84+
AVAILABLE_MODELS['gemma-litellm'] = GEMMA_LITELLM;
85+
// Legacy ID — old persisted client state may still send 'gemma-regolo'.
86+
// Aliased to the LiteLLM-served gemma so requests don't hit Regolo's broken
87+
// gemma4-31b endpoint. ChatStore migration upgrades the persisted ID on next
88+
// page load.
89+
AVAILABLE_MODELS['gemma-regolo'] = GEMMA_LITELLM;
5190

5291
/**
5392
* Get model configuration by user-facing model ID.
5493
* Returns null if model ID is not recognized.
5594
*/
56-
export function getModelConfig(
57-
modelId: string
58-
): { provider: 'mistral' | 'litellm' | 'regolo'; model: string; contextWindow: number } | null {
95+
export function getModelConfig(modelId: string): ModelConfig | null {
5996
return AVAILABLE_MODELS[modelId] || null;
6097
}
6198

@@ -108,6 +145,7 @@ function getLiteLLMProvider() {
108145
baseURL: `${baseURL}/v1`,
109146
apiKey: env.LITELLM_API_KEY || '',
110147
name: 'litellm',
148+
fetch: litellmFetchWithThinkingDisabled,
111149
});
112150
}
113151
return litellmInstance;
@@ -169,9 +207,10 @@ export function getModel(provider: string, modelId: string): LanguageModel {
169207
return model;
170208
}
171209
case 'litellm': {
172-
console.log(`[providers] Creating LiteLLM model with default: ${LITELLM_DEFAULT_MODEL}`);
210+
const resolvedModel = modelId || LITELLM_DEFAULT_MODEL;
211+
console.log(`[providers] Creating LiteLLM model: ${resolvedModel}`);
173212
const litellm = getLiteLLMProvider();
174-
const model = litellm.chat(LITELLM_DEFAULT_MODEL);
213+
const model = litellm.chat(resolvedModel);
175214
console.log(`[providers] LiteLLM model created successfully`);
176215
return model;
177216
}

apps/api/routes/chat/notebookStreamCore.ts

Lines changed: 33 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* notebook controller and the public Gruen-O-Mat controller.
55
*/
66

7-
import { streamText, type ModelMessage } from 'ai';
7+
import { type ModelMessage } from 'ai';
88

99
import {
1010
buildConcisePromptGrundsatz,
@@ -15,10 +15,7 @@ import {
1515
getSystemCollectionConfig,
1616
} from '../../config/systemCollectionsConfig.js';
1717
import { NotebookQdrantHelper } from '../../database/services/NotebookQdrantHelper.js';
18-
import {
19-
isRegoloReasoningModel,
20-
streamRegoloWithReasoning,
21-
} from '../../services/ai/regoloReasoningStream.js';
18+
import { isRegoloReasoningModel } from '../../services/ai/regoloReasoningStream.js';
2219
import { notebookQAService } from '../../services/notebook/index.js';
2320
import { rerankNotebookResults } from '../../services/notebook/rerankNotebookResults.js';
2421
import {
@@ -30,7 +27,11 @@ import { createLogger } from '../../utils/logger.js';
3027
import { containsPromptLeakage } from '../gruenomat/topicGuard.js';
3128

3229
import { isProviderConfigured } from './agents/providers.js';
33-
import { resolveModel } from './services/responseStreamingService.js';
30+
import {
31+
resolveModel,
32+
streamForResolution,
33+
streamWithFallback,
34+
} from './services/responseStreamingService.js';
3435
import { SSEWriter } from './services/sseHelpers.js';
3536

3637
import type { SearchContext } from '../../services/notebook/types.js';
@@ -253,14 +254,10 @@ export async function handleNotebookStream(
253254

254255
// Determine AI provider and model (same resolution as chat — handles model ID → real name)
255256
const defaultAgentConfig = { provider: DEFAULT_PROVIDER, model: DEFAULT_MODEL };
256-
const {
257-
model: aiModel,
258-
provider: resolvedProvider,
259-
modelName: resolvedModelName,
260-
} = resolveModel(defaultAgentConfig, model);
261-
262-
if (!isProviderConfigured(resolvedProvider)) {
263-
sse.send('error', { error: `Provider "${resolvedProvider}" is not configured` });
257+
const primaryResolution = resolveModel(defaultAgentConfig, model);
258+
259+
if (!isProviderConfigured(primaryResolution.provider)) {
260+
sse.send('error', { error: `Provider "${primaryResolution.provider}" is not configured` });
264261
sse.end();
265262
return null;
266263
}
@@ -281,127 +278,42 @@ export async function handleNotebookStream(
281278
const t2 = Date.now();
282279
log.debug(`⏱ Model setup: ${t2 - t1}ms`);
283280

284-
const useReasoningStream = isRegoloReasoningModel(resolvedProvider, resolvedModelName);
285-
// Reasoning models spend most of their budget on the <think> block before
286-
// emitting the answer. Triple the ceiling so there's room for both phases.
281+
// Reasoning models need extra room for the <think> block before content.
287282
const baseMaxOutput = isFast ? 3000 : 16000;
288-
const maxOutputTokens = useReasoningStream ? Math.max(baseMaxOutput, 9000) : baseMaxOutput;
289283

290284
sse.send('response_start', { message: 'Generiere Antwort...' });
291285

292-
let fullText = '';
293-
let firstChunkTime: number | undefined;
294-
295-
try {
296-
if (useReasoningStream) {
297-
for await (const chunk of streamRegoloWithReasoning({
298-
model: resolvedModelName,
286+
const fullText = await streamWithFallback({
287+
primary: primaryResolution,
288+
sse,
289+
logPrefix: '[Notebook]',
290+
buildStream: async (resolution) => {
291+
const isReasoning = isRegoloReasoningModel(resolution.provider, resolution.modelName);
292+
return streamForResolution({
293+
resolution,
299294
messages: aiMessages,
300-
maxTokens: maxOutputTokens,
295+
maxTokens: isReasoning ? Math.max(baseMaxOutput, 9000) : baseMaxOutput,
301296
temperature: 0.2,
297+
sse,
302298
signal: abortController.signal,
303-
})) {
304-
if (abortController.signal.aborted) break;
305-
if (!firstChunkTime) {
306-
firstChunkTime = Date.now();
307-
log.debug(`⏱ First token latency: ${firstChunkTime - t2}ms`);
308-
}
309-
if (chunk.type === 'text') {
310-
fullText += chunk.delta;
311-
sse.send('text_delta', { text: chunk.delta });
312-
} else {
313-
sse.send('reasoning_delta', { text: chunk.delta });
314-
}
315-
}
316-
} else {
317-
const result = streamText({
318-
model: aiModel,
319-
messages: aiMessages,
320-
maxOutputTokens,
321-
temperature: 0.2,
322-
abortSignal: abortController.signal,
299+
logPrefix: '[Notebook]',
323300
});
324-
for await (const chunk of result.textStream) {
325-
if (abortController.signal.aborted) break;
326-
if (!firstChunkTime) {
327-
firstChunkTime = Date.now();
328-
log.debug(`⏱ First token latency: ${firstChunkTime - t2}ms`);
329-
}
330-
fullText += chunk;
331-
sse.send('text_delta', { text: chunk });
332-
}
333-
}
334-
} catch (streamError: unknown) {
335-
if (abortController.signal.aborted) {
336-
log.debug('Notebook stream aborted by client disconnect');
337-
log.debug(`⏱ Total (aborted): ${Date.now() - t0}ms, ${fullText.length} chars`);
338-
sse.end();
339-
return null;
340-
}
341-
const streamErrMsg = streamError instanceof Error ? streamError.message : String(streamError);
342-
const t4err = Date.now();
343-
log.warn('Stream error (accumulated %d chars): %s', fullText.length, streamErrMsg);
344-
log.debug(
345-
`⏱ Streaming (error): ${t4err - (firstChunkTime || t2)}ms, ${fullText.length} chars`
346-
);
301+
},
302+
});
347303

348-
if (fullText.length > 0) {
349-
try {
350-
const { renumberedDraft, newReferencesMap } = renumberCitationsInOrder(
351-
fullText,
352-
searchContext.referencesMap
353-
);
354-
const { cleanDraft, citations, sources } = validateAndInjectCitations(
355-
renumberedDraft,
356-
newReferencesMap
357-
);
358-
const allSources = searchContext.sortedResults
359-
.filter((_, i) => !citations.some((c) => c.index === String(i + 1)))
360-
.slice(0, 10);
361-
362-
let sourcesByCollection: SourcesByCollection | undefined;
363-
if (searchContext.isMulti && searchContext.effectiveCollectionIds) {
364-
const collectionsConfig: { [collectionId: string]: CollectionConfig } = {};
365-
for (const id of searchContext.effectiveCollectionIds) {
366-
const config = SYSTEM_COLLECTIONS[id];
367-
if (config) collectionsConfig[id] = { name: config.name };
368-
}
369-
sourcesByCollection = groupSourcesByCollection(
370-
citations,
371-
searchContext.sortedResults,
372-
collectionsConfig
373-
);
374-
}
304+
if (fullText === null) {
305+
log.debug(`⏱ Total (stream failed): ${Date.now() - t0}ms`);
306+
return null;
307+
}
375308

376-
sse.send('completion', {
377-
answer: cleanDraft,
378-
citations,
379-
sources,
380-
allSources,
381-
...(sourcesByCollection && { sourcesByCollection }),
382-
metadata: {
383-
isMulti: searchContext.isMulti,
384-
collectionName: searchContext.collectionName,
385-
effectiveCollectionIds: searchContext.effectiveCollectionIds,
386-
totalResults: searchContext.sortedResults.length,
387-
citationsCount: citations.length,
388-
partial: true,
389-
},
390-
});
391-
} catch (citationError: unknown) {
392-
log.error('Failed to process partial citations:', citationError);
393-
sse.send('error', { error: streamErrMsg || 'Stream interrupted' });
394-
}
395-
} else {
396-
sse.send('error', { error: streamErrMsg || 'Stream interrupted' });
397-
}
398-
log.debug(`⏱ Total (error path): ${Date.now() - t0}ms`);
309+
if (abortController.signal.aborted) {
310+
log.debug('Notebook stream aborted by client disconnect');
399311
sse.end();
400312
return null;
401313
}
402314

403315
const t4 = Date.now();
404-
log.debug(`⏱ Streaming: ${t4 - (firstChunkTime || t2)}ms, ${fullText.length} chars`);
316+
log.debug(`⏱ Streaming: ${t4 - t2}ms, ${fullText.length} chars`);
405317

406318
// Layer 5: Output leakage detection — check if the LLM leaked system prompt fragments
407319
if (options.systemPromptOverride && containsPromptLeakage(fullText)) {
@@ -466,7 +378,7 @@ export async function handleNotebookStream(
466378

467379
const t6 = Date.now();
468380
log.debug(
469-
`⏱ Total: ${t6 - t0}ms [${isFast ? 'fast' : 'deep'}] (search=${t1 - t0}, setup=${t2 - t1}, ttft=${(firstChunkTime || t2) - t2}, stream=${t4 - (firstChunkTime || t2)}, cite=${t5 - t4})`
381+
`⏱ Total: ${t6 - t0}ms [${isFast ? 'fast' : 'deep'}] (search=${t1 - t0}, setup=${t2 - t1}, stream=${t4 - t2}, cite=${t5 - t4})`
470382
);
471383
sse.end();
472384

0 commit comments

Comments
 (0)