Skip to content

Commit e6e082b

Browse files
committed
fix: harden live updates and user-facing workflow errors
1 parent 5875bdb commit e6e082b

40 files changed

Lines changed: 872 additions & 211 deletions

src/agent-protocol/task-live-update-slack.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ export class SlackTaskLiveUpdateSink implements TaskLiveUpdateSink {
88
private messageTs?: string;
99
/** Serialize all publish calls to prevent concurrent postMessage races */
1010
private publishQueue: Promise<{ ref?: Record<string, unknown> } | null> = Promise.resolve(null);
11+
/** Count consecutive chat.update failures for observability and final-message fallback decisions */
12+
private consecutiveUpdateFailures = 0;
1113

1214
constructor(
1315
private readonly slack: SlackClient,
@@ -65,10 +67,23 @@ export class SlackTaskLiveUpdateSink implements TaskLiveUpdateSink {
6567
ts: this.messageTs,
6668
text: formatSlackText(text),
6769
});
68-
if (updated?.ok) return null;
70+
if (updated?.ok) {
71+
this.consecutiveUpdateFailures = 0;
72+
return null;
73+
}
74+
this.consecutiveUpdateFailures++;
6975
logger.warn(
70-
`[TaskLiveUpdates][Slack] chat.update failed for ts=${this.messageTs} error=${updated?.error || 'unknown_error'}; falling back to chat.postMessage`
76+
`[TaskLiveUpdates][Slack] chat.update failed for ts=${this.messageTs} error=${updated?.error || 'unknown_error'} (failure #${this.consecutiveUpdateFailures})`
7177
);
78+
// Never create a new progress message when an update fails.
79+
// Keeping the existing live-update message in place is less noisy than
80+
// attempting a post+delete recovery that can leave orphaned messages.
81+
if (mode === 'progress') {
82+
logger.warn(
83+
`[TaskLiveUpdates][Slack] Preserving existing live update ts=${this.messageTs}; suppressing progress post fallback to avoid Slack spam`
84+
);
85+
return null;
86+
}
7287
}
7388
logger.info(
7489
`[TaskLiveUpdates][Slack] Posting live update message in channel=${this.channel} thread=${this.threadTs}`
@@ -81,7 +96,8 @@ export class SlackTaskLiveUpdateSink implements TaskLiveUpdateSink {
8196
if (posted?.ok && posted.ts) {
8297
const previousTs = this.messageTs;
8398
this.messageTs = posted.ts;
84-
if (mode === 'final' && previousTs && previousTs !== posted.ts) {
99+
// Clean up the old stale message to avoid orphaned progress messages
100+
if (previousTs && previousTs !== posted.ts) {
85101
const deleted = await this.slack.chat.delete({
86102
channel: this.channel,
87103
ts: previousTs,
@@ -92,7 +108,7 @@ export class SlackTaskLiveUpdateSink implements TaskLiveUpdateSink {
92108
);
93109
} else {
94110
logger.info(
95-
`[TaskLiveUpdates][Slack] Removed stale live update message ts=${previousTs} after final fallback post`
111+
`[TaskLiveUpdates][Slack] Removed stale live update message ts=${previousTs} after fallback post`
96112
);
97113
}
98114
}

src/agent-protocol/task-live-updates.ts

Lines changed: 77 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ export class TaskLiveUpdateManager {
208208
private metadataRefreshTimer?: ReturnType<typeof setInterval>;
209209
private running = false;
210210
private inflightTick?: Promise<void>;
211+
private inflightMetadataRefresh?: Promise<void>;
212+
private readonly inflightPublishes = new Set<Promise<unknown>>();
211213
private started = false;
212214
private completed = false;
213215
private readonly startedAt = new Date();
@@ -259,12 +261,7 @@ export class TaskLiveUpdateManager {
259261
if (this.completed) return;
260262
this.completed = true;
261263
this.stop();
262-
// Wait for any in-flight tick to finish so the sink's publish queue is drained
263-
if (this.running && this.inflightTick) {
264-
try {
265-
await this.inflightTick;
266-
} catch {}
267-
}
264+
await this.waitForInflightSinkPublishes();
268265
try {
269266
logger.info(`[TaskLiveUpdates] Publishing final success update for task ${this.ctx.taskId}`);
270267
const result = await this.ctx.sink.complete(this.decorateText(finalText));
@@ -283,12 +280,7 @@ export class TaskLiveUpdateManager {
283280
if (this.completed) return;
284281
this.completed = true;
285282
this.stop();
286-
// Wait for any in-flight tick to finish so the sink's publish queue is drained
287-
if (this.running && this.inflightTick) {
288-
try {
289-
await this.inflightTick;
290-
} catch {}
291-
}
283+
await this.waitForInflightSinkPublishes();
292284
try {
293285
logger.info(`[TaskLiveUpdates] Publishing final failure update for task ${this.ctx.taskId}`);
294286
const result = await this.ctx.sink.fail(this.decorateText(finalText));
@@ -319,6 +311,17 @@ export class TaskLiveUpdateManager {
319311
}
320312

321313
async tick(): Promise<void> {
314+
if (this.inflightTick) return this.inflightTick;
315+
const promise = this.runTick().finally(() => {
316+
if (this.inflightTick === promise) {
317+
this.inflightTick = undefined;
318+
}
319+
});
320+
this.inflightTick = promise;
321+
return promise;
322+
}
323+
324+
private async runTick(): Promise<void> {
322325
if (this.completed || this.running) return;
323326
const traceState = this.getTraceState();
324327
if (!traceState.traceRef && !traceState.traceId) {
@@ -402,7 +405,13 @@ export class TaskLiveUpdateManager {
402405
},
403406
traceState.traceId
404407
);
405-
const result = await this.ctx.sink.update(message);
408+
if (this.completed) {
409+
logger.debug(
410+
`[TaskLiveUpdates] Aborting progress publish for task ${this.ctx.taskId}: task already completed before sink update`
411+
);
412+
return;
413+
}
414+
const result = await this.trackPublish(this.ctx.sink.update(message));
406415
this.recordSinkRef(result);
407416
this.ctx.appendHistory?.(cleaned, 'progress');
408417
this.lastUpdateText = cleaned;
@@ -424,27 +433,43 @@ export class TaskLiveUpdateManager {
424433
private async runFirstTick(): Promise<void> {
425434
if (this.completed) return;
426435
logger.debug(`[TaskLiveUpdates] Running first scheduled tick for task ${this.ctx.taskId}`);
427-
this.inflightTick = this.tick();
428-
await this.inflightTick;
436+
await this.tick();
429437
if (this.completed) return;
430438
this.timer = setInterval(() => {
431-
this.inflightTick = this.tick();
439+
void this.tick();
432440
}, this.ctx.config.intervalSeconds * 1000);
433441
if (typeof (this.timer as any)?.unref === 'function') {
434442
(this.timer as any).unref();
435443
}
436444
this.metadataRefreshTimer = setInterval(() => {
437-
void this.refreshProgressMetadata();
445+
void this.scheduleMetadataRefresh();
438446
}, DEFAULT_TASK_LIVE_UPDATE_METADATA_REFRESH_SECONDS * 1000);
439447
if (typeof (this.metadataRefreshTimer as any)?.unref === 'function') {
440448
(this.metadataRefreshTimer as any).unref();
441449
}
442450
}
443451

452+
private async waitForInflightSinkPublishes(): Promise<void> {
453+
const pending = [...this.inflightPublishes];
454+
if (pending.length === 0) return;
455+
try {
456+
await Promise.allSettled(pending);
457+
} catch {}
458+
}
459+
444460
private recordSinkRef(result: { ref?: Record<string, unknown> } | null | undefined): void {
445461
if (result?.ref) this.ctx.onPostedRef?.(result.ref);
446462
}
447463

464+
private async trackPublish<T>(publish: Promise<T>): Promise<T> {
465+
this.inflightPublishes.add(publish);
466+
try {
467+
return await publish;
468+
} finally {
469+
this.inflightPublishes.delete(publish);
470+
}
471+
}
472+
448473
private getTraceState(): { traceRef?: string; traceId?: string } {
449474
const resolved = this.ctx.resolveTraceState?.();
450475
return {
@@ -462,7 +487,7 @@ export class TaskLiveUpdateManager {
462487
private decorateProgressText(
463488
text: string,
464489
timing: ProgressTimingMetadata,
465-
traceId?: string
490+
_traceId?: string
466491
): string {
467492
const normalized = normalizeProgressSummary(text);
468493
const taskSummary = formatTaskSummary(timing.taskSummary);
@@ -473,8 +498,21 @@ export class TaskLiveUpdateManager {
473498
taskSummary,
474499
normalized,
475500
formatProgressMetadata(timing),
501+
`\`task_id: ${this.ctx.taskId}\``,
502+
'`live_update: true`',
476503
].filter(Boolean);
477-
return this.decorateText(blocks.join('\n\n'), traceId);
504+
return blocks.join('\n\n');
505+
}
506+
507+
private scheduleMetadataRefresh(): Promise<void> {
508+
if (this.inflightMetadataRefresh) return this.inflightMetadataRefresh;
509+
const promise = this.refreshProgressMetadata().finally(() => {
510+
if (this.inflightMetadataRefresh === promise) {
511+
this.inflightMetadataRefresh = undefined;
512+
}
513+
});
514+
this.inflightMetadataRefresh = promise;
515+
return promise;
478516
}
479517

480518
private async refreshProgressMetadata(): Promise<void> {
@@ -499,7 +537,7 @@ export class TaskLiveUpdateManager {
499537
logger.debug(
500538
`[TaskLiveUpdates] Refreshing metadata-only live update for task ${this.ctx.taskId}`
501539
);
502-
const result = await this.ctx.sink.update(message);
540+
const result = await this.trackPublish(this.ctx.sink.update(message));
503541
this.recordSinkRef(result);
504542
this.lastPostedMessage = message;
505543
} catch (err) {
@@ -559,7 +597,8 @@ export class TaskLiveUpdateManager {
559597
this.lastStallFallbackAt = now;
560598
return;
561599
}
562-
const result = await this.ctx.sink.update(message);
600+
if (this.completed) return;
601+
const result = await this.trackPublish(this.ctx.sink.update(message));
563602
this.recordSinkRef(result);
564603
if (!this.lastUpdateText) {
565604
this.ctx.appendHistory?.(baseText, 'progress');
@@ -698,20 +737,28 @@ function formatTaskLabel(task: { id: string; title: string; status: string }): s
698737
return title || task.id || task.status;
699738
}
700739

740+
function scopeHasRenderableTasks(scope: NonNullable<ProbeTaskSummary['scopes']>[number]): boolean {
741+
if (scope.tasks.some(task => !task.synthetic)) return true;
742+
return scope.children.some(child => scopeHasRenderableTasks(child));
743+
}
744+
701745
function appendTaskScopeLines(
702746
lines: string[],
703747
scope: NonNullable<ProbeTaskSummary['scopes']>[number],
704748
depth = 0
705749
): void {
750+
if (!scopeHasRenderableTasks(scope)) return;
706751
const prefix = ' '.repeat(depth);
707752
if (depth > 0 || scope.label !== 'Main Agent') {
708753
lines.push(`${prefix}${scope.label}`);
709754
}
710-
for (const task of scope.tasks.slice(0, 8)) {
755+
const visibleTasks = scope.tasks.filter(task => !task.synthetic).slice(0, 8);
756+
for (const task of visibleTasks) {
711757
lines.push(`${prefix}${formatTaskStatusMarker(task.status)} ${formatTaskLabel(task)}`);
712758
}
713-
if (scope.tasks.length > 8) {
714-
lines.push(`${prefix}• ... ${scope.tasks.length - 8} more`);
759+
const hiddenCount = scope.tasks.filter(task => !task.synthetic).length - visibleTasks.length;
760+
if (hiddenCount > 0) {
761+
lines.push(`${prefix}• ... ${hiddenCount} more`);
715762
}
716763
for (const child of scope.children) {
717764
appendTaskScopeLines(lines, child, depth + 1);
@@ -724,14 +771,16 @@ function formatTaskSummary(summary?: ProbeTaskSummary): string | undefined {
724771
if (summary.scopes.length > 0) {
725772
for (const scope of summary.scopes) appendTaskScopeLines(lines, scope);
726773
} else {
727-
for (const task of summary.tasks.slice(0, 8)) {
774+
const visibleTasks = summary.tasks.filter(task => !task.synthetic).slice(0, 8);
775+
for (const task of visibleTasks) {
728776
lines.push(`• ${formatTaskStatusMarker(task.status)} ${formatTaskLabel(task)}`);
729777
}
730-
if (summary.tasks.length > 8) {
731-
lines.push(`• ... ${summary.tasks.length - 8} more`);
778+
const hiddenCount = summary.tasks.filter(task => !task.synthetic).length - visibleTasks.length;
779+
if (hiddenCount > 0) {
780+
lines.push(`• ... ${hiddenCount} more`);
732781
}
733782
}
734-
return lines.join('\n');
783+
return lines.length > 1 ? lines.join('\n') : undefined;
735784
}
736785

737786
function formatSkillList(skills: string[]): string {

src/agent-protocol/trace-serializer.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export interface ProbeTaskSummaryItem {
114114
id: string;
115115
title: string;
116116
status: string;
117+
synthetic?: boolean;
117118
}
118119

119120
export interface ProbeTaskScopeSummary {
@@ -1256,7 +1257,7 @@ function buildProbeTaskScopeSummary(
12561257
source: 'events',
12571258
eventCount: 0,
12581259
snapshotCount: 0,
1259-
tasks: [{ id: `__scope_${node.span.spanId}`, title: scopeLabel, status }],
1260+
tasks: [{ id: `__scope_${node.span.spanId}`, title: scopeLabel, status, synthetic: true }],
12601261
children: [],
12611262
};
12621263
}
@@ -1312,6 +1313,7 @@ function buildProbeTaskScopeSummary(
13121313
id: `__scope_${n.span.spanId}`,
13131314
title: scopeLabel,
13141315
status,
1316+
synthetic: true,
13151317
});
13161318
}
13171319
}
@@ -1387,6 +1389,7 @@ function flattenTaskScopes(scopes: ProbeTaskScopeSummary[]): ProbeTaskSummaryIte
13871389
const items: ProbeTaskSummaryItem[] = [];
13881390
const visit = (scope: ProbeTaskScopeSummary) => {
13891391
for (const task of scope.tasks) {
1392+
if (task.synthetic) continue;
13901393
if (seen.has(task.id)) continue;
13911394
seen.add(task.id);
13921395
items.push(task);
@@ -1578,12 +1581,18 @@ function formatTaskLabel(task: { id: string; title: string; status: string }): s
15781581
return title || task.id || task.status;
15791582
}
15801583

1584+
function scopeHasRenderableTasks(scope: ProbeTaskScopeSummary): boolean {
1585+
if (scope.tasks.some(task => !task.synthetic)) return true;
1586+
return scope.children.some(child => scopeHasRenderableTasks(child));
1587+
}
1588+
15811589
function appendTaskScopeLines(lines: string[], scope: ProbeTaskScopeSummary, depth = 0): void {
1590+
if (!scopeHasRenderableTasks(scope)) return;
15821591
const prefix = ' '.repeat(depth);
15831592
if (depth > 0 || scope.label !== ROOT_TASK_SCOPE_LABEL) {
15841593
lines.push(`${prefix}${scope.label}`);
15851594
}
1586-
for (const task of scope.tasks) {
1595+
for (const task of scope.tasks.filter(task => !task.synthetic)) {
15871596
lines.push(`${prefix} ${formatTaskStatus(task.status)} ${formatTaskLabel(task)}`);
15881597
}
15891598
for (const child of scope.children) {
@@ -1596,7 +1605,7 @@ export function buildTraceHeaderLines(
15961605
taskSummary: ProbeTaskSummary | null
15971606
): string[] {
15981607
const lines = [`Trace source: ${traceData.source || 'unknown'}`];
1599-
if (!taskSummary) {
1608+
if (!taskSummary || taskSummary.tasks.length === 0) {
16001609
lines.push('Tasks: no task telemetry found in this trace');
16011610
return lines;
16021611
}
@@ -1883,10 +1892,12 @@ function renderYamlNode(
18831892
const action = attrs['dedup.action'] || '?';
18841893
const reason = attrs['dedup.reason'] || '';
18851894
const rewritten = attrs['dedup.rewritten'] || '';
1895+
const error = attrs['dedup.error'] || '';
18861896
const prevCount = attrs['dedup.previous_count'] || '0';
18871897
let detail = `${action}`;
18881898
if (rewritten) detail += ` → "${truncate(String(rewritten), 60)}"`;
18891899
if (reason) detail += ` (${truncate(String(reason), 80)})`;
1900+
if (error) detail += ` [error: ${truncate(String(error), 120)}]`;
18901901
lines.push(
18911902
`${pad}dedup("${truncate(String(query), 60)}") [${prevCount} prior]: ${detail}${duration}`
18921903
);
@@ -2630,9 +2641,11 @@ function formatSpanLine(
26302641
const action = attrs['dedup.action'] || '?';
26312642
const reason = attrs['dedup.reason'] || '';
26322643
const rewritten = attrs['dedup.rewritten'] || '';
2644+
const error = attrs['dedup.error'] || '';
26332645
let detail = `${action}`;
26342646
if (rewritten) detail += ` → "${truncate(String(rewritten), 50)}"`;
26352647
if (reason) detail += ` — ${truncate(String(reason), 60)}`;
2648+
if (error) detail += ` [error: ${truncate(String(error), 80)}]`;
26362649
return { line: `dedup("${truncate(String(query), 50)}") ${detail} (${duration})` };
26372650
}
26382651

src/agent-protocol/track-execution.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,9 @@ export async function trackExecution<T>(
228228
// best-effort — don't fail the task over metadata
229229
}
230230

231-
// Extract AI response text from the result.
231+
// Extract response text from the result.
232232
// result.reviewSummary.history is keyed by checkId with arrays of outputs.
233-
// We want the LAST check's text output (the final AI response), not the
234-
// first (which is typically the intent router).
233+
// We want the LAST check's text output (the final AI response or workflow text output).
235234
let responseText = 'Execution completed';
236235
try {
237236
const history = (result as any)?.reviewSummary?.history as
@@ -245,7 +244,9 @@ export async function trackExecution<T>(
245244
if (!Array.isArray(outputs)) continue;
246245
// Within a check, look at the last output first too
247246
for (let j = outputs.length - 1; j >= 0; j--) {
248-
const text = (outputs[j] as any)?.text;
247+
const out = outputs[j] as any;
248+
// Direct text field (AI response or workflow text output)
249+
const text = out?.text;
249250
if (typeof text === 'string' && text.trim().length > 0) {
250251
responseText = text.trim();
251252
break;

0 commit comments

Comments
 (0)