Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/pr-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ jobs:

- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 9

- name: Get pnpm store directory
shell: bash
Expand All @@ -46,4 +44,3 @@ jobs:

- name: Run build
run: pnpm build

64 changes: 51 additions & 13 deletions app/api/tasks/[taskId]/chat/interrupt/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { and, eq, isNull } from 'drizzle-orm'
import { NextResponse } from 'next/server'
import { CodexGatewayApiError, interruptCodexGatewayTurn } from '@/lib/codex-gateway/client'
import { hasActiveTurnCheckpoint, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion'
import {
finalizeActiveTurnInterrupted,
hasActiveTurnCheckpoint,
reconcileIncompleteTurnSafely,
} from '@/lib/codex-gateway/completion'
import { getTaskGatewayContext } from '@/lib/codex-gateway/task'
import { db } from '@/lib/db/client'
import { tasks } from '@/lib/db/schema'
Expand Down Expand Up @@ -44,22 +48,56 @@ export async function POST(_request: Request, { params }: RouteParams) {
}

const { gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(taskId, session.user.id)

const finalizeInterruptedLocally = async () => {
await finalizeActiveTurnInterrupted({
taskId,
sessionId: task.activeTurnSessionId,
clearGatewaySession: true,
})
}

if (!gatewayUrl) {
return NextResponse.json({ error: 'Gateway URL is not configured' }, { status: 400 })
await finalizeInterruptedLocally()
return NextResponse.json({
success: true,
data: {
sessionId: task.activeTurnSessionId,
state: null,
},
})
}

const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken)
await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => {
console.error('Failed to reconcile interrupted chat turn')
})
try {
const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken)
const reconciledTask = await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => {
console.error('Failed to reconcile interrupted chat turn')
return null
})

return NextResponse.json({
success: true,
data: {
sessionId: result.sessionId,
state: result.state,
},
})
if (reconciledTask && hasActiveTurnCheckpoint(reconciledTask)) {
await finalizeInterruptedLocally()
} else if (!reconciledTask) {
await finalizeInterruptedLocally()
}

return NextResponse.json({
success: true,
data: {
sessionId: result.sessionId,
state: result.state,
},
})
} catch {
await finalizeInterruptedLocally()
return NextResponse.json({
success: true,
data: {
sessionId: task.activeTurnSessionId,
state: null,
},
})
}
} catch (error) {
if (error instanceof CodexGatewayApiError) {
return NextResponse.json(
Expand Down
144 changes: 129 additions & 15 deletions app/api/tasks/[taskId]/chat/v2/stream/route.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { NextRequest, NextResponse } from 'next/server'
import { finalizeActiveTurnFailure, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion'
import { getCodexGatewayEventStreamUrl } from '@/lib/codex-gateway/client'
import { diagnoseCodexTurnFailure } from '@/lib/codex-gateway/failure-diagnostics'
import { getTaskGatewayContext } from '@/lib/codex-gateway/task'
import type { CodexGatewayState } from '@/lib/codex-gateway/types'
import { closeTaskStream, getTaskStream, recordTaskEvent, touchTaskStream } from '@/lib/task-events'
import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs'
import { getServerSession } from '@/lib/session/get-server-session'

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export const maxDuration = 300
export const maxDuration = 1800

const STREAM_HEARTBEAT_INTERVAL_MS = 10_000

interface RouteParams {
params: Promise<{
Expand Down Expand Up @@ -63,6 +67,10 @@ function parseSseBlock(block: string): {
}
}

function encodeSseBlock(encoder: TextEncoder, block: string): Uint8Array {
return encoder.encode(`${block}\n\n`)
}

async function persistGatewayEvent(input: {
eventName: string
payload: Record<string, unknown> | null
Expand Down Expand Up @@ -150,9 +158,38 @@ async function persistMissingSessionFailure(taskId: string, sessionId: string) {
})
}

async function logStreamFailureDiagnostic(input: {
fallbackError: string
httpStatus?: number
sessionId?: string | null
taskId: string
turnStatus?: string | null
}) {
const diagnostic = await diagnoseCodexTurnFailure({
taskId: input.taskId,
sessionId: input.sessionId,
fallbackError: input.fallbackError,
httpStatus: input.httpStatus,
turnStatus: input.turnStatus,
})

console.info(
formatKeyTaskLogMessage(TASK_FLOW_LOGS.GATEWAY_STREAM_RECONNECTING, {
sessionId: input.sessionId ?? null,
streamState: 'errored',
errorSource: diagnostic.source,
httpStatus: input.httpStatus,
turnStatus: input.turnStatus ?? null,
}),
)
console.error('Chat v2 stream upstream failed', diagnostic)
}

export async function GET(request: NextRequest, { params }: RouteParams) {
const decoder = new TextDecoder()
const encoder = new TextEncoder()
let streamId: string | null = null
let streamSessionId: string | null = null
let taskId: string | null = null

try {
Expand All @@ -175,6 +212,7 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
if (!stream || stream.taskId !== resolvedTaskId || stream.status !== 'active') {
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
}
streamSessionId = stream.sessionId

const { task, gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(resolvedTaskId, session.user.id)

Expand All @@ -191,19 +229,45 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
return NextResponse.json({ error: 'Stream session is no longer active' }, { status: 410 })
}

const streamState =
stream.lastEventAt.getTime() > stream.startedAt.getTime() ? ('resumed' as const) : ('connected' as const)
console.info(
formatKeyTaskLogMessage(
streamState === 'resumed' ? TASK_FLOW_LOGS.GATEWAY_STREAM_RESUMED : TASK_FLOW_LOGS.GATEWAY_STREAM_CONNECTED,
{
sessionId: stream.sessionId,
streamState,
},
),
)

const upstream = await fetch(getCodexGatewayEventStreamUrl(gatewayUrl, stream.sessionId, gatewayAuthToken), {
headers: {
accept: 'text/event-stream',
},
cache: 'no-store',
signal: AbortSignal.timeout(15_000),
})

if (!upstream.ok || !upstream.body) {
await closeTaskStream(resolvedStreamId, 'errored')
await logStreamFailureDiagnostic({
taskId: resolvedTaskId,
sessionId: stream.sessionId,
fallbackError: 'Codex gateway turn failed',
httpStatus: upstream.status,
})

if (upstream.status === 404 || upstream.status === 410) {
await persistMissingSessionFailure(resolvedTaskId, stream.sessionId)
} else if (upstream.status >= 500 && upstream.status < 600) {
await finalizeActiveTurnFailure({
taskId: resolvedTaskId,
sessionId: stream.sessionId,
error: 'Codex gateway turn failed',
clearGatewaySession: true,
}).catch(() => {
console.error('Failed to force finalize chat v2 stream connection error')
})
} else {
await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream connection error')
Expand All @@ -226,11 +290,12 @@ export async function GET(request: NextRequest, { params }: RouteParams) {

const reader = upstream.body.getReader()
let sseBuffer = ''
const heartbeatChunk = encoder.encode(': ping\n\n')

const handleSseBlock = async (block: string) => {
const parsedBlock = parseSseBlock(block)
if (!parsedBlock || !parsedBlock.dataText) {
return
return encodeSseBlock(encoder, block)
}

try {
Expand All @@ -243,12 +308,16 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
payload,
transcriptCursor,
})
} catch {
console.error('Failed to persist gateway stream event')
} catch (error) {
console.error('Failed to persist gateway stream event', error)
}

return encodeSseBlock(encoder, block)
}

const flushBufferedEvents = async (flushAll: boolean) => {
const encodedBlocks: Uint8Array[] = []

while (true) {
const separatorMatch = sseBuffer.match(/\r?\n\r?\n/)
if (!separatorMatch || separatorMatch.index === undefined) {
Expand All @@ -257,18 +326,32 @@ export async function GET(request: NextRequest, { params }: RouteParams) {

const block = sseBuffer.slice(0, separatorMatch.index)
sseBuffer = sseBuffer.slice(separatorMatch.index + separatorMatch[0].length)
await handleSseBlock(block)
encodedBlocks.push(await handleSseBlock(block))
}

if (flushAll && sseBuffer.trim()) {
const finalBlock = sseBuffer
console.error('Chat v2 stream ended with incomplete SSE block')
sseBuffer = ''
await handleSseBlock(finalBlock)
}

return encodedBlocks
}

const streamResponse = new ReadableStream<Uint8Array>({
async start(controller) {
let closed = false
const heartbeatTimer = setInterval(() => {
if (closed || sseBuffer.trim()) {
return
}

try {
controller.enqueue(heartbeatChunk)
} catch {
// Ignore enqueue errors after stream shutdown.
}
}, STREAM_HEARTBEAT_INTERVAL_MS)

try {
while (true) {
const { done, value } = await reader.read()
Expand All @@ -281,25 +364,44 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
continue
}

controller.enqueue(value)
sseBuffer += decoder.decode(value, { stream: true })
await flushBufferedEvents(false)
const encodedBlocks = await flushBufferedEvents(false)
for (const encodedBlock of encodedBlocks) {
controller.enqueue(encodedBlock)
}
}

sseBuffer += decoder.decode()
await flushBufferedEvents(true)
const encodedBlocks = await flushBufferedEvents(true)
for (const encodedBlock of encodedBlocks) {
controller.enqueue(encodedBlock)
}
closed = true
controller.close()
} catch (error) {
await closeTaskStream(resolvedStreamId, 'errored')
await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream reader error')
await logStreamFailureDiagnostic({
taskId: resolvedTaskId,
sessionId: stream.sessionId,
fallbackError: 'Codex gateway turn failed',
})
await finalizeActiveTurnFailure({
taskId: resolvedTaskId,
sessionId: stream.sessionId,
error: 'Codex gateway turn failed',
clearGatewaySession: true,
}).catch(() => {
console.error('Failed to force finalize chat v2 stream reader error')
})
try {
closed = true
controller.close()
} catch {
// Ignore close errors after upstream socket termination.
}
} finally {
closed = true
clearInterval(heartbeatTimer)
reader.releaseLock()
}
},
Expand All @@ -320,8 +422,20 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
}

if (taskId) {
await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream proxy error')
await logStreamFailureDiagnostic({
taskId,
sessionId: streamSessionId,
fallbackError: 'Codex gateway turn failed',
}).catch(() => {
console.error('Failed to log chat v2 stream proxy diagnostic')
})
await finalizeActiveTurnFailure({
taskId,
sessionId: streamSessionId,
error: 'Codex gateway turn failed',
clearGatewaySession: true,
}).catch(() => {
console.error('Failed to force finalize chat v2 stream proxy error')
})
}

Expand Down
Loading
Loading