From 5d907108eea933d6dc676c4c84502a61d6e980de Mon Sep 17 00:00:00 2001 From: jmal Date: Thu, 8 May 2025 17:41:17 +0800 Subject: [PATCH] perf: optimize the processing of reasoning content --- service/src/chatgpt/index.ts | 354 ++++++++++++++++++++++------------- service/src/types.ts | 17 ++ 2 files changed, 244 insertions(+), 127 deletions(-) diff --git a/service/src/chatgpt/index.ts b/service/src/chatgpt/index.ts index e8d8a7b7..c9ddac01 100644 --- a/service/src/chatgpt/index.ts +++ b/service/src/chatgpt/index.ts @@ -2,7 +2,8 @@ import * as dotenv from 'dotenv' import OpenAI from 'openai' import { HttpsProxyAgent } from 'https-proxy-agent' import type { AuditConfig, KeyConfig, UserInfo } from '../storage/model' -import { Status, UsageResponse } from '../storage/model' + +import { Status, UsageResponse } from '../storage/model' // UsageResponse is imported here import { convertImageUrl } from '../utils/image' import type { TextAuditService } from '../utils/textAudit' import { textAuditServices } from '../utils/textAudit' @@ -38,137 +39,230 @@ export async function initApi(key: KeyConfig) { httpAgent = new HttpsProxyAgent(httpsProxy) } - const client = new OpenAI({ + return new OpenAI({ baseURL: openaiBaseUrl, apiKey: key.key, httpAgent, }) - return client } const processThreads: { userId: string; abort: AbortController; messageId: string }[] = [] +// --- Helper function to parse tags from delta.content --- +function parseDeltaContentForThinkTags( + deltaContent: string, + currentIsInsideThinkTagState: boolean, +): { textPart: string; reasoningPart: string; newIsInsideThinkTag: boolean } { + let localText = '' + let localReasoning = '' + let contentToProcess = deltaContent + let newIsInsideThinkTag = currentIsInsideThinkTagState + + while (contentToProcess.length > 0) { + if (newIsInsideThinkTag) { + // Currently inside a tag, look for the end tag + const endTagIndex = contentToProcess.indexOf('') + if (endTagIndex !== -1) { + // Found the end tag + localReasoning += contentToProcess.substring(0, endTagIndex) + contentToProcess = contentToProcess.substring(endTagIndex + ''.length) + newIsInsideThinkTag = false // Exited the tag + } + else { + // No end tag found, so the rest of contentToProcess is reasoning + localReasoning += contentToProcess + contentToProcess = '' + } + } + else { + // Currently outside a tag, look for the start tag + const startTagIndex = contentToProcess.indexOf('') + if (startTagIndex !== -1) { + // Found the start tag + localText += contentToProcess.substring(0, startTagIndex) + contentToProcess = contentToProcess.substring(startTagIndex + ''.length) + newIsInsideThinkTag = true // Entered the tag + } + else { + // No start tag found, so the rest of contentToProcess is regular text + localText += contentToProcess + contentToProcess = '' + } + } + } + return { textPart: localText, reasoningPart: localReasoning, newIsInsideThinkTag } +} + +// --- Optimized stream processing logic --- +async function optimizedProcessStreamInternal( + stream: AsyncIterable, + processCallback: ((chunk: any) => void) | undefined, + initialMessageId: string, +): Promise<{ + id: string + reasoning: string + text: string + role: OpenAI.Chat.Completions.ChatCompletionRole + finish_reason: string | null + usage: UsageResponse +}> { + let accumulatedResponseReasoning = '' + let accumulatedResponseText = '' + let lastResponseId = '' + const usage = new UsageResponse() + + let isInsideThinkTagGlobal = false + // Use the standard OpenAI SDK type for finish_reason. + // If your IDE still flags this, ensure your OpenAI SDK version and TS setup are compatible. + // As a last resort, you could use `string | null` but standard types are preferred. + let lastFinishReason: string | null + let finalRole: OpenAI.Chat.Completions.ChatCompletionRole = 'assistant' + + for await (const chunk of stream) { + // The delta object is of type OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta | undefined + const delta = chunk.choices[0]?.delta as OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta + + if (chunk.id) + lastResponseId = chunk.id + + if (!delta || Object.keys(delta).length === 0) { + if (chunk.choices?.[0]?.finish_reason) + lastFinishReason = chunk.choices[0].finish_reason + + if (chunk.usage?.total_tokens !== undefined) { + usage.total_tokens = chunk.usage.total_tokens + usage.prompt_tokens = chunk.usage.prompt_tokens + usage.completion_tokens = chunk.usage.completion_tokens + } + if (processCallback && (lastFinishReason || chunk.usage?.total_tokens !== undefined)) { + const responseChunkPayload = { + id: lastResponseId || initialMessageId, + reasoning: accumulatedResponseReasoning, + text: accumulatedResponseText, + role: finalRole, + finish_reason: lastFinishReason, + } + processCallback(responseChunkPayload) + } + continue + } + + if (delta.role) + finalRole = delta.role + + // 1. Extract structured reasoning content. + // With module augmentation for `reasoning_content` and `reasoning` on `Delta`, + // `as any` is no longer needed. + const structuredReasoningFromDelta = delta.reasoning_content || delta.reasoning || '' + if (structuredReasoningFromDelta) + accumulatedResponseReasoning += structuredReasoningFromDelta + + // 2. Extract and process delta.content + const deltaContent = delta.content || '' + if (deltaContent) { + const { textPart, reasoningPart, newIsInsideThinkTag } = parseDeltaContentForThinkTags(deltaContent, isInsideThinkTagGlobal) + + if (textPart) + accumulatedResponseText += textPart + + if (reasoningPart) + accumulatedResponseReasoning += reasoningPart + + isInsideThinkTagGlobal = newIsInsideThinkTag + } + + const currentFinishReason = chunk.choices[0]?.finish_reason + if (currentFinishReason) + lastFinishReason = currentFinishReason + + const responseChunkPayload = { + id: lastResponseId || initialMessageId, + reasoning: accumulatedResponseReasoning, + text: accumulatedResponseText, + role: delta.role || finalRole, + finish_reason: lastFinishReason, + } + + processCallback?.(responseChunkPayload) + + if (chunk.usage?.total_tokens !== undefined) { + usage.total_tokens = chunk.usage.total_tokens + usage.prompt_tokens = chunk.usage.prompt_tokens + usage.completion_tokens = chunk.usage.completion_tokens + } + } + + return { + id: lastResponseId || initialMessageId, + reasoning: accumulatedResponseReasoning, + text: accumulatedResponseText, + role: finalRole, + finish_reason: lastFinishReason, + usage, + } +} + async function chatReplyProcess(options: RequestOptions) { const model = options.room.chatModel const key = await getRandomApiKey(options.user, model) const userId = options.user._id.toString() const maxContextCount = options.user.advanced.maxContextCount ?? 20 - const messageId = options.messageId - if (key == null || key === undefined) + const messageId = options.messageId // Will be used as initialMessageId + + if (key == null) throw new Error('没有对应的apikeys配置。请再试一次 | No available apikeys configuration. Please try again.') - // Add Chat Record updateRoomChatModel(userId, options.room.roomId, model) const { message, uploadFileKeys, parentMessageId, process, systemMessage, temperature, top_p } = options try { - // Initialize OpenAI client const openai = await initApi(key) - - // Create abort controller for cancellation const abort = new AbortController() processThreads.push({ userId, abort, messageId }) - // Prepare messages array for the chat completion const messages: OpenAI.Chat.ChatCompletionMessageParam[] = [] + await addPreviousMessages(parentMessageId, maxContextCount, messages) // parentMessageId can be undefined - // Add previous messages from conversation history. - await addPreviousMessages(parentMessageId, maxContextCount, messages) + if (isNotEmptyString(systemMessage)) + messages.unshift({ role: 'system', content: systemMessage }) - // Add system message if provided - if (isNotEmptyString(systemMessage)) { - messages.unshift({ - role: 'system', - content: systemMessage, - }) - } - - // Prepare the user message content (text and images) - let content: string | OpenAI.Chat.ChatCompletionContentPart[] = message - - // Handle image uploads if present + let contentForApi: string | OpenAI.Chat.ChatCompletionContentPart[] = message if (uploadFileKeys && uploadFileKeys.length > 0) { - content = [ - { - type: 'text', - text: message, - }, - ] + contentForApi = [{ type: 'text', text: message }] for (const uploadFileKey of uploadFileKeys) { - content.push({ + contentForApi.push({ type: 'image_url', - image_url: { - url: await convertImageUrl(uploadFileKey), - }, + image_url: { url: await convertImageUrl(uploadFileKey) }, }) } } - - // Add the user message - messages.push({ - role: 'user', - content, - }) - - // Create the chat completion with streaming - const stream = await openai.chat.completions.create({ - model, - messages, - temperature: temperature ?? undefined, - top_p: top_p ?? undefined, - stream: true, - stream_options: { - include_usage: true, + messages.push({ role: 'user', content: contentForApi }) + + const stream = await openai.chat.completions.create( + { + model, + messages, + temperature: temperature ?? undefined, + top_p: top_p ?? undefined, + stream: true, + stream_options: { include_usage: true }, }, - }, { - signal: abort.signal, - }) - - // Process the stream - let responseReasoning = '' - let responseText = '' - let responseId = '' - const usage = new UsageResponse() - - for await (const chunk of stream) { - // Extract the content from the chunk - // @ts-expect-error For deepseek-reasoner model only. The reasoning contents of the assistant message, before the final answer. - const reasoningContent = chunk.choices[0]?.delta?.reasoning_content || '' - responseReasoning += reasoningContent - const content = chunk.choices[0]?.delta?.content || '' - responseText += content - responseId = chunk.id - - const finish_reason = chunk.choices[0]?.finish_reason - - // Build response object similar to the original implementation - const responseChunk = { - id: chunk.id, - reasoning: responseReasoning, - text: responseText, - role: 'assistant', - finish_reason, - } - - // Call the process callback if provided - process?.(responseChunk) + { signal: abort.signal }, + ) - if (chunk?.usage?.total_tokens) { - usage.total_tokens = chunk?.usage?.total_tokens - usage.prompt_tokens = chunk?.usage?.prompt_tokens - usage.completion_tokens = chunk?.usage?.completion_tokens - } - } + // Process the stream using the optimized internal function + const streamResult = await optimizedProcessStreamInternal(stream, process, messageId) - // Final response object + // Construct the final response object const response = { - id: responseId || messageId, - reasoning: responseReasoning, - text: responseText, - role: 'assistant', + id: streamResult.id, + reasoning: streamResult.reasoning, + text: streamResult.text, + role: streamResult.role, detail: { - usage, + usage: streamResult.usage, + finish_reason: streamResult.finish_reason, // Include finish_reason in detail }, } @@ -214,8 +308,15 @@ async function containsSensitiveWords(audit: AuditConfig, text: string): Promise } if (audit.enabled) { if (!auditService) - initAuditService(audit) - return await auditService.containsSensitiveWords(text) + initAuditService(audit) // Ensure auditService is initialized if enabled + // Check if auditService was successfully initialized before calling its methods + if (auditService) { + return await auditService.containsSensitiveWords(text) + } + else { + console.warn('Audit service is enabled but not initialized. Skipping audit.') + return false + } } return false } @@ -235,31 +336,24 @@ async function getMessageById(id: string): Promise { if (chatInfo) { const parentMessageId = isPrompt ? chatInfo.options.parentMessageId - : `prompt_${id}` // parent message is the prompt + : `prompt_${id}` - if (chatInfo.status !== Status.Normal) { // jumps over deleted messages + if (chatInfo.status !== Status.Normal) { return parentMessageId ? getMessageById(parentMessageId) : undefined } else { - if (isPrompt) { // prompt + if (isPrompt) { let content: string | OpenAI.Chat.ChatCompletionContentPart[] = chatInfo.prompt if (chatInfo.images && chatInfo.images.length > 0) { - content = [ - { - type: 'text', - text: chatInfo.prompt, - }, - ] + content = [{ type: 'text', text: chatInfo.prompt }] for (const image of chatInfo.images) { - const imageUrlBase64 = await convertImageUrl(image) - if (imageUrlBase64) { + const imageUrlBase64 = await convertImageUrl(image) // Ensure this returns the correct URL format + if (imageUrlBase64) { // Should check if conversion was successful content.push({ type: 'image_url', - image_url: { - url: await convertImageUrl(image), - }, + image_url: { url: imageUrlBase64 }, // Use the converted URL }) } } @@ -272,10 +366,10 @@ async function getMessageById(id: string): Promise { } } else { - return { // completion + return { id, parentMessageId, - role: 'assistant', + role: 'assistant', // Assuming response is always from assistant content: chatInfo.response, } } @@ -293,41 +387,47 @@ async function randomKeyConfig(keys: KeyConfig[]): Promise { let unsedKeys = keys.filter(d => _lockedKeys.filter(l => d.key === l.key).length <= 0) const start = Date.now() while (unsedKeys.length <= 0) { - if (Date.now() - start > 3000) + if (Date.now() - start > 3000) // Timeout to prevent infinite loop break - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 1000)) // Wait and retry unsedKeys = keys.filter(d => _lockedKeys.filter(l => d.key === l.key).length <= 0) } if (unsedKeys.length <= 0) return null - const thisKey = unsedKeys[Math.floor(Math.random() * unsedKeys.length)] - return thisKey + return unsedKeys[Math.floor(Math.random() * unsedKeys.length)] } async function getRandomApiKey(user: UserInfo, chatModel: string): Promise { - const keys = (await getCacheApiKeys()).filter(d => hasAnyRole(d.userRoles, user.roles)) + const allKeys = await getCacheApiKeys() + const eligibleKeys = allKeys + .filter(d => hasAnyRole(d.userRoles, user.roles)) .filter(d => d.chatModels.includes(chatModel)) - return randomKeyConfig(keys) + return randomKeyConfig(eligibleKeys) } -// Helper function to add previous messages to the conversation context -async function addPreviousMessages(parentMessageId: string, maxContextCount: number, messages: OpenAI.Chat.ChatCompletionMessageParam[]): Promise { - // Recursively get previous messages +async function addPreviousMessages( + parentMessageId: string | undefined, // Can be undefined + maxContextCount: number, + messages: OpenAI.Chat.ChatCompletionMessageParam[], +): Promise { let currentMessageId = parentMessageId - while (currentMessageId) { + while (currentMessageId && messages.length < maxContextCount) { const currentChatMessage: ChatMessage | undefined = await getMessageById(currentMessageId) + if (!currentChatMessage) { // If a message in the chain is not found, stop. + break + } + + // Ensure role is compatible. OpenAI.Chat.ChatCompletionMessageParam['role'] + // is "system" | "user" | "assistant" | "tool". Assuming ChatMessage.role fits. messages.unshift({ content: currentChatMessage.content, - role: currentChatMessage.role, + role: currentChatMessage.role as OpenAI.Chat.ChatCompletionMessageParam['role'], // Explicit cast if needed } as OpenAI.Chat.ChatCompletionMessage) - currentMessageId = currentChatMessage?.parentMessageId - - if (messages.length >= maxContextCount) - break + currentMessageId = currentChatMessage.parentMessageId } } diff --git a/service/src/types.ts b/service/src/types.ts index f0f821fb..82f3442e 100644 --- a/service/src/types.ts +++ b/service/src/types.ts @@ -1,5 +1,7 @@ import type { JwtPayload } from 'jsonwebtoken' +import 'openai' + export interface RequestProps { roomId: number uuid: number @@ -51,4 +53,19 @@ export class TwoFAConfig { this.secretKey = '' this.otpauthUrl = '' } +} // 必须导入原始模块以进行扩展 + +declare module 'openai/resources/chat/completions' { + // OpenAI SDK v4 中, delta 对象的类型是 ChatCompletionChunk.Choice.Delta + // 我们需要扩展这个嵌套的接口 + // eslint-disable-next-line @typescript-eslint/no-namespace + namespace ChatCompletionChunk { + // eslint-disable-next-line @typescript-eslint/no-namespace + namespace Choice { + interface Delta { + reasoning_content?: string | null + reasoning?: string | null + } + } + } }