Skip to content

Commit 75f55c8

Browse files
authored
fix(logging): add preprocessing util shared by all execution paths (#2081)
* fix(logging): add preprocessing util shared by all execution paths * DRY
1 parent 4a0450d commit 75f55c8

File tree

18 files changed

+1233
-799
lines changed

18 files changed

+1233
-799
lines changed

apps/sim/app/api/chat/[identifier]/route.test.ts

Lines changed: 46 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,36 @@
66
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
77
import { createMockRequest } from '@/app/api/__test-utils__/utils'
88

9+
vi.mock('@/lib/execution/preprocessing', () => ({
10+
preprocessExecution: vi.fn().mockResolvedValue({
11+
success: true,
12+
actorUserId: 'test-user-id',
13+
workflowRecord: {
14+
id: 'test-workflow-id',
15+
userId: 'test-user-id',
16+
isDeployed: true,
17+
workspaceId: 'test-workspace-id',
18+
variables: {},
19+
},
20+
userSubscription: {
21+
plan: 'pro',
22+
status: 'active',
23+
},
24+
rateLimitInfo: {
25+
allowed: true,
26+
remaining: 100,
27+
resetAt: new Date(),
28+
},
29+
}),
30+
}))
31+
32+
vi.mock('@/lib/logs/execution/logging-session', () => ({
33+
LoggingSession: vi.fn().mockImplementation(() => ({
34+
safeStart: vi.fn().mockResolvedValue(undefined),
35+
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
36+
})),
37+
}))
38+
939
describe('Chat Identifier API Route', () => {
1040
const createMockStream = () => {
1141
return new ReadableStream({
@@ -307,48 +337,16 @@ describe('Chat Identifier API Route', () => {
307337
})
308338

309339
it('should return 503 when workflow is not available', async () => {
310-
// Override the default workflow result to return non-deployed
311-
vi.doMock('@sim/db', () => {
312-
// Track call count to return different results
313-
let callCount = 0
314-
315-
const mockLimit = vi.fn().mockImplementation(() => {
316-
callCount++
317-
if (callCount === 1) {
318-
// First call - chat query
319-
return [
320-
{
321-
id: 'chat-id',
322-
workflowId: 'unavailable-workflow',
323-
userId: 'user-id',
324-
isActive: true,
325-
authType: 'public',
326-
outputConfigs: [{ blockId: 'block-1', path: 'output' }],
327-
},
328-
]
329-
}
330-
if (callCount === 2) {
331-
// Second call - workflow query
332-
return [
333-
{
334-
isDeployed: false,
335-
},
336-
]
337-
}
338-
return []
339-
})
340-
341-
const mockWhere = vi.fn().mockReturnValue({ limit: mockLimit })
342-
const mockFrom = vi.fn().mockReturnValue({ where: mockWhere })
343-
const mockSelect = vi.fn().mockReturnValue({ from: mockFrom })
344-
345-
return {
346-
db: {
347-
select: mockSelect,
348-
},
349-
chat: {},
350-
workflow: {},
351-
}
340+
const { preprocessExecution } = await import('@/lib/execution/preprocessing')
341+
const originalImplementation = vi.mocked(preprocessExecution).getMockImplementation()
342+
343+
vi.mocked(preprocessExecution).mockResolvedValueOnce({
344+
success: false,
345+
error: {
346+
message: 'Workflow is not deployed',
347+
statusCode: 403,
348+
logCreated: true,
349+
},
352350
})
353351

354352
const req = createMockRequest('POST', { input: 'Hello' })
@@ -358,11 +356,15 @@ describe('Chat Identifier API Route', () => {
358356

359357
const response = await POST(req, { params })
360358

361-
expect(response.status).toBe(503)
359+
expect(response.status).toBe(403)
362360

363361
const data = await response.json()
364362
expect(data).toHaveProperty('error')
365-
expect(data).toHaveProperty('message', 'Chat workflow is not available')
363+
expect(data).toHaveProperty('message', 'Workflow is not deployed')
364+
365+
if (originalImplementation) {
366+
vi.mocked(preprocessExecution).mockImplementation(originalImplementation)
367+
}
366368
})
367369

368370
it('should return streaming response for valid chat messages', async () => {
@@ -378,7 +380,6 @@ describe('Chat Identifier API Route', () => {
378380
expect(response.headers.get('Cache-Control')).toBe('no-cache')
379381
expect(response.headers.get('Connection')).toBe('keep-alive')
380382

381-
// Verify createStreamingResponse was called with correct workflow info
382383
expect(mockCreateStreamingResponse).toHaveBeenCalledWith(
383384
expect.objectContaining({
384385
workflow: expect.objectContaining({
@@ -408,7 +409,6 @@ describe('Chat Identifier API Route', () => {
408409
expect(response.status).toBe(200)
409410
expect(response.body).toBeInstanceOf(ReadableStream)
410411

411-
// Test that we can read from the response stream
412412
if (response.body) {
413413
const reader = response.body.getReader()
414414
const { value, done } = await reader.read()
@@ -447,7 +447,6 @@ describe('Chat Identifier API Route', () => {
447447
})
448448

449449
it('should handle invalid JSON in request body', async () => {
450-
// Create a request with invalid JSON
451450
const req = {
452451
method: 'POST',
453452
headers: new Headers(),

apps/sim/app/api/chat/[identifier]/route.ts

Lines changed: 35 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import { randomUUID } from 'crypto'
12
import { db } from '@sim/db'
2-
import { chat, workflow, workspace } from '@sim/db/schema'
3+
import { chat } from '@sim/db/schema'
34
import { eq } from 'drizzle-orm'
45
import { type NextRequest, NextResponse } from 'next/server'
5-
import { v4 as uuidv4 } from 'uuid'
66
import { z } from 'zod'
7+
import { preprocessExecution } from '@/lib/execution/preprocessing'
78
import { createLogger } from '@/lib/logs/console/logger'
89
import { LoggingSession } from '@/lib/logs/execution/logging-session'
910
import { ChatFiles } from '@/lib/uploads'
@@ -93,7 +94,7 @@ export async function POST(
9394
if (!deployment.isActive) {
9495
logger.warn(`[${requestId}] Chat is not active: ${identifier}`)
9596

96-
const executionId = uuidv4()
97+
const executionId = randomUUID()
9798
const loggingSession = new LoggingSession(
9899
deployment.workflowId,
99100
executionId,
@@ -140,82 +141,35 @@ export async function POST(
140141
return addCorsHeaders(createErrorResponse('No input provided', 400), request)
141142
}
142143

143-
const workflowResult = await db
144-
.select({
145-
isDeployed: workflow.isDeployed,
146-
workspaceId: workflow.workspaceId,
147-
variables: workflow.variables,
148-
})
149-
.from(workflow)
150-
.where(eq(workflow.id, deployment.workflowId))
151-
.limit(1)
144+
const executionId = randomUUID()
152145

153-
if (workflowResult.length === 0 || !workflowResult[0].isDeployed) {
154-
logger.warn(`[${requestId}] Workflow not found or not deployed: ${deployment.workflowId}`)
155-
156-
const executionId = uuidv4()
157-
const loggingSession = new LoggingSession(
158-
deployment.workflowId,
159-
executionId,
160-
'chat',
161-
requestId
162-
)
146+
const loggingSession = new LoggingSession(deployment.workflowId, executionId, 'chat', requestId)
163147

164-
await loggingSession.safeStart({
165-
userId: deployment.userId,
166-
workspaceId: workflowResult[0]?.workspaceId || '',
167-
variables: {},
168-
})
148+
const preprocessResult = await preprocessExecution({
149+
workflowId: deployment.workflowId,
150+
userId: deployment.userId,
151+
triggerType: 'chat',
152+
executionId,
153+
requestId,
154+
checkRateLimit: false, // Chat bypasses rate limits
155+
checkDeployment: true, // Chat requires deployed workflows
156+
loggingSession,
157+
})
169158

170-
await loggingSession.safeCompleteWithError({
171-
error: {
172-
message: 'Chat workflow is not available. The workflow is not deployed.',
173-
stackTrace: undefined,
174-
},
175-
traceSpans: [],
176-
})
177-
178-
return addCorsHeaders(createErrorResponse('Chat workflow is not available', 503), request)
159+
if (!preprocessResult.success) {
160+
logger.warn(`[${requestId}] Preprocessing failed: ${preprocessResult.error?.message}`)
161+
return addCorsHeaders(
162+
createErrorResponse(
163+
preprocessResult.error?.message || 'Failed to process request',
164+
preprocessResult.error?.statusCode || 500
165+
),
166+
request
167+
)
179168
}
180169

181-
let workspaceOwnerId = deployment.userId
182-
if (workflowResult[0].workspaceId) {
183-
const workspaceData = await db
184-
.select({ ownerId: workspace.ownerId })
185-
.from(workspace)
186-
.where(eq(workspace.id, workflowResult[0].workspaceId))
187-
.limit(1)
188-
189-
if (workspaceData.length === 0) {
190-
logger.error(`[${requestId}] Workspace not found for workflow ${deployment.workflowId}`)
191-
192-
const executionId = uuidv4()
193-
const loggingSession = new LoggingSession(
194-
deployment.workflowId,
195-
executionId,
196-
'chat',
197-
requestId
198-
)
199-
200-
await loggingSession.safeStart({
201-
userId: deployment.userId,
202-
workspaceId: workflowResult[0].workspaceId || '',
203-
variables: {},
204-
})
205-
206-
await loggingSession.safeCompleteWithError({
207-
error: {
208-
message: 'Workspace not found. Critical configuration error - please contact support.',
209-
stackTrace: undefined,
210-
},
211-
traceSpans: [],
212-
})
213-
214-
return addCorsHeaders(createErrorResponse('Workspace not found', 500), request)
215-
}
216-
217-
workspaceOwnerId = workspaceData[0].ownerId
218-
}
170+
const { actorUserId, workflowRecord } = preprocessResult
171+
const workspaceOwnerId = actorUserId!
172+
const workspaceId = workflowRecord?.workspaceId || ''
219173

220174
try {
221175
const selectedOutputs: string[] = []
@@ -232,12 +186,10 @@ export async function POST(
232186
const { SSE_HEADERS } = await import('@/lib/utils')
233187
const { createFilteredResult } = await import('@/app/api/workflows/[id]/execute/route')
234188

235-
const executionId = crypto.randomUUID()
236-
237189
const workflowInput: any = { input, conversationId }
238190
if (files && Array.isArray(files) && files.length > 0) {
239191
const executionContext = {
240-
workspaceId: workflowResult[0].workspaceId || '',
192+
workspaceId,
241193
workflowId: deployment.workflowId,
242194
executionId,
243195
}
@@ -257,20 +209,13 @@ export async function POST(
257209
} catch (fileError: any) {
258210
logger.error(`[${requestId}] Failed to process chat files:`, fileError)
259211

260-
const fileLoggingSession = new LoggingSession(
261-
deployment.workflowId,
262-
executionId,
263-
'chat',
264-
requestId
265-
)
266-
267-
await fileLoggingSession.safeStart({
212+
await loggingSession.safeStart({
268213
userId: workspaceOwnerId,
269-
workspaceId: workflowResult[0].workspaceId || '',
214+
workspaceId,
270215
variables: {},
271216
})
272217

273-
await fileLoggingSession.safeCompleteWithError({
218+
await loggingSession.safeCompleteWithError({
274219
error: {
275220
message: `File upload failed: ${fileError.message || 'Unable to process uploaded files'}`,
276221
stackTrace: fileError.stack,
@@ -285,9 +230,9 @@ export async function POST(
285230
const workflowForExecution = {
286231
id: deployment.workflowId,
287232
userId: deployment.userId,
288-
workspaceId: workflowResult[0].workspaceId,
289-
isDeployed: true,
290-
variables: workflowResult[0].variables || {},
233+
workspaceId,
234+
isDeployed: workflowRecord?.isDeployed ?? false,
235+
variables: workflowRecord?.variables || {},
291236
}
292237

293238
const stream = await createStreamingResponse({

0 commit comments

Comments
 (0)