@@ -4,14 +4,17 @@ import {
4
4
createDataStreamResponse ,
5
5
smoothStream ,
6
6
streamText ,
7
+ experimental_createMCPClient ,
7
8
} from 'ai' ;
9
+ import { Experimental_StdioMCPTransport } from 'ai/mcp-stdio' ;
8
10
import { auth } from '@/app/(auth)/auth' ;
9
11
import { systemPrompt } from '@/lib/ai/prompts' ;
10
12
import {
11
13
deleteChatById ,
12
14
getChatById ,
13
15
saveChat ,
14
16
saveMessages ,
17
+ getEnabledMcpServersByUserId ,
15
18
} from '@/lib/db/queries' ;
16
19
import {
17
20
generateUUID ,
@@ -29,6 +32,8 @@ import { myProvider } from '@/lib/ai/providers';
29
32
export const maxDuration = 60 ;
30
33
31
34
export async function POST ( request : Request ) {
35
+ let mcpClientsToClose : Awaited < ReturnType < typeof experimental_createMCPClient > > [ ] = [ ] ;
36
+
32
37
try {
33
38
const {
34
39
id,
@@ -45,6 +50,7 @@ export async function POST(request: Request) {
45
50
if ( ! session || ! session . user || ! session . user . id ) {
46
51
return new Response ( 'Unauthorized' , { status : 401 } ) ;
47
52
}
53
+ const userId = session . user . id ;
48
54
49
55
const userMessage = getMostRecentUserMessage ( messages ) ;
50
56
@@ -59,9 +65,9 @@ export async function POST(request: Request) {
59
65
message : userMessage ,
60
66
} ) ;
61
67
62
- await saveChat ( { id, userId : session . user . id , title } ) ;
68
+ await saveChat ( { id, userId : userId , title } ) ;
63
69
} else {
64
- if ( chat . userId !== session . user . id ) {
70
+ if ( chat . userId !== userId ) {
65
71
return new Response ( 'Unauthorized' , { status : 401 } ) ;
66
72
}
67
73
}
@@ -80,87 +86,146 @@ export async function POST(request: Request) {
80
86
} ) ;
81
87
82
88
return createDataStreamResponse ( {
83
- execute : ( dataStream ) => {
84
- const result = streamText ( {
85
- model : myProvider . languageModel ( selectedChatModel ) ,
86
- system : systemPrompt ( { selectedChatModel } ) ,
87
- messages,
88
- maxSteps : 5 ,
89
- experimental_activeTools :
90
- selectedChatModel === 'chat-model-reasoning'
91
- ? [ ]
92
- : [
93
- 'getWeather' ,
94
- 'createDocument' ,
95
- 'updateDocument' ,
96
- 'requestSuggestions' ,
97
- ] ,
98
- experimental_transform : smoothStream ( { chunking : 'word' } ) ,
99
- experimental_generateMessageId : generateUUID ,
100
- tools : {
89
+ execute : async ( dataStream ) => {
90
+ try {
91
+ const staticTools = {
101
92
getWeather,
102
93
createDocument : createDocument ( { session, dataStream } ) ,
103
94
updateDocument : updateDocument ( { session, dataStream } ) ,
104
95
requestSuggestions : requestSuggestions ( {
105
96
session,
106
97
dataStream,
107
98
} ) ,
108
- } ,
109
- onFinish : async ( { response } ) => {
110
- if ( session . user ?. id ) {
99
+ } ;
100
+ let combinedTools : Record < string , any > = { ...staticTools } ;
101
+
102
+ try {
103
+ const enabledServers = await getEnabledMcpServersByUserId ( { userId } ) ;
104
+
105
+ for ( const server of enabledServers ) {
111
106
try {
112
- const assistantId = getTrailingMessageId ( {
113
- messages : response . messages . filter (
114
- ( message ) => message . role === 'assistant' ,
115
- ) ,
116
- } ) ;
117
-
118
- if ( ! assistantId ) {
119
- throw new Error ( 'No assistant message found!' ) ;
107
+ let transport ;
108
+ const config = server . config as any ;
109
+
110
+ if ( config . transportType === 'sse' ) {
111
+ transport = {
112
+ type : 'sse' as const ,
113
+ url : config . url ,
114
+ } ;
115
+ } else if ( config . transportType === 'stdio' ) {
116
+ if ( isProductionEnvironment ) {
117
+ console . warn ( `SECURITY WARNING: Initializing MCP client with stdio transport in production for server: ${ server . name } (ID: ${ server . id } )` ) ;
118
+ }
119
+ transport = new Experimental_StdioMCPTransport ( {
120
+ command : config . command ,
121
+ args : config . args || [ ] ,
122
+ } ) ;
123
+ } else {
124
+ console . warn ( `Unsupported MCP transport type '${ config . transportType } ' for server ${ server . name } ` ) ;
125
+ continue ;
120
126
}
121
127
122
- const [ , assistantMessage ] = appendResponseMessages ( {
123
- messages : [ userMessage ] ,
124
- responseMessages : response . messages ,
125
- } ) ;
126
-
127
- await saveMessages ( {
128
- messages : [
129
- {
130
- id : assistantId ,
131
- chatId : id ,
132
- role : assistantMessage . role ,
133
- parts : assistantMessage . parts ,
134
- attachments :
135
- assistantMessage . experimental_attachments ?? [ ] ,
136
- createdAt : new Date ( ) ,
137
- } ,
138
- ] ,
139
- } ) ;
140
- } catch ( _ ) {
141
- console . error ( 'Failed to save chat' ) ;
128
+ const mcpClient = await experimental_createMCPClient ( { transport } ) ;
129
+ mcpClientsToClose . push ( mcpClient ) ;
130
+
131
+ const mcpTools = await mcpClient . tools ( ) ;
132
+ combinedTools = { ...combinedTools , ...mcpTools } ;
133
+ console . log ( `Loaded ${ Object . keys ( mcpTools ) . length } tools from MCP server: ${ server . name } ` ) ;
134
+
135
+ } catch ( mcpError ) {
136
+ console . error ( `Failed to initialize or get tools from MCP server ${ server . name } (ID: ${ server . id } ):` , mcpError ) ;
142
137
}
143
138
}
144
- } ,
145
- experimental_telemetry : {
146
- isEnabled : isProductionEnvironment ,
147
- functionId : 'stream-text' ,
148
- } ,
149
- } ) ;
150
-
151
- result . consumeStream ( ) ;
152
-
153
- result . mergeIntoDataStream ( dataStream , {
154
- sendReasoning : true ,
155
- } ) ;
139
+ } catch ( dbError ) {
140
+ console . error ( 'Failed to fetch enabled MCP servers:' , dbError ) ;
141
+ }
142
+
143
+ const activeToolsList = selectedChatModel === 'chat-model-reasoning'
144
+ ? [ ]
145
+ : Object . keys ( combinedTools ) ;
146
+
147
+ const result = streamText ( {
148
+ model : myProvider . languageModel ( selectedChatModel ) ,
149
+ system : systemPrompt ( { selectedChatModel } ) ,
150
+ messages,
151
+ maxSteps : 5 ,
152
+ tools : combinedTools ,
153
+ experimental_activeTools : activeToolsList ,
154
+ experimental_transform : smoothStream ( { chunking : 'word' } ) ,
155
+ experimental_generateMessageId : generateUUID ,
156
+ onFinish : async ( { response } ) => {
157
+ if ( session . user ?. id ) {
158
+ try {
159
+ const assistantId = getTrailingMessageId ( {
160
+ messages : response . messages . filter (
161
+ ( message ) => message . role === 'assistant' ,
162
+ ) ,
163
+ } ) ;
164
+
165
+ if ( ! assistantId ) {
166
+ throw new Error ( 'No assistant message found!' ) ;
167
+ }
168
+
169
+ const [ , assistantMessage ] = appendResponseMessages ( {
170
+ messages : [ userMessage ] ,
171
+ responseMessages : response . messages ,
172
+ } ) ;
173
+
174
+ await saveMessages ( {
175
+ messages : [
176
+ {
177
+ id : assistantId ,
178
+ chatId : id ,
179
+ role : assistantMessage . role ,
180
+ parts : assistantMessage . parts ,
181
+ attachments :
182
+ assistantMessage . experimental_attachments ?? [ ] ,
183
+ createdAt : new Date ( ) ,
184
+ } ,
185
+ ] ,
186
+ } ) ;
187
+ } catch ( _ ) {
188
+ console . error ( 'Failed to save chat messages after stream completion' ) ;
189
+ }
190
+ }
191
+ console . log ( `Closing ${ mcpClientsToClose . length } MCP clients in onFinish...` ) ;
192
+ for ( const client of mcpClientsToClose ) {
193
+ try {
194
+ await client . close ( ) ;
195
+ } catch ( closeError : unknown ) {
196
+ console . error ( 'Error closing MCP client in onFinish:' , closeError ) ;
197
+ }
198
+ }
199
+ mcpClientsToClose = [ ] ;
200
+ } ,
201
+ experimental_telemetry : {
202
+ isEnabled : isProductionEnvironment ,
203
+ functionId : 'stream-text' ,
204
+ } ,
205
+ } ) ;
206
+
207
+ result . consumeStream ( ) ;
208
+ result . mergeIntoDataStream ( dataStream , { sendReasoning : true } ) ;
209
+
210
+ } catch ( streamError ) {
211
+ console . error ( 'Error during streamText execution or MCP setup:' , streamError ) ;
212
+ throw streamError ;
213
+ } finally {
214
+ console . log ( 'Stream execute try/catch finished.' ) ;
215
+ }
156
216
} ,
157
- onError : ( ) => {
158
- return 'Oops, an error occured!' ;
217
+ onError : ( error ) => {
218
+ console . error ( 'Data stream error:' , error ) ;
219
+ return 'Oops, an error occured!' ;
159
220
} ,
160
221
} ) ;
161
222
} catch ( error ) {
223
+ console . error ( 'Error in POST /api/chat route (initial setup):' , error ) ;
224
+ for ( const client of mcpClientsToClose ) {
225
+ client . close ( ) . catch ( ( closeError : unknown ) => console . error ( 'Error closing MCP client during outer catch:' , closeError ) ) ;
226
+ }
162
227
return new Response ( 'An error occurred while processing your request!' , {
163
- status : 404 ,
228
+ status : 500 ,
164
229
} ) ;
165
230
}
166
231
}
@@ -190,6 +255,7 @@ export async function DELETE(request: Request) {
190
255
191
256
return new Response ( 'Chat deleted' , { status : 200 } ) ;
192
257
} catch ( error ) {
258
+ console . error ( 'Error deleting chat:' , error ) ;
193
259
return new Response ( 'An error occurred while processing your request!' , {
194
260
status : 500 ,
195
261
} ) ;
0 commit comments