Skip to content

fix: unwrap workflow outputs in value_js + oauth2 in SSE server#564

Open
buger wants to merge 7 commits intomainfrom
fix/workflow-output-unwrapping
Open

fix: unwrap workflow outputs in value_js + oauth2 in SSE server#564
buger wants to merge 7 commits intomainfrom
fix/workflow-output-unwrapping

Conversation

@buger
Copy link
Copy Markdown
Contributor

@buger buger commented Mar 26, 2026

Summary

  • Workflow output unwrapping bug: value_js in workflow outputs received raw ReviewSummary wrappers ({ issues: [], output: {...} }) instead of unwrapped step results. This caused all workflow tools (slack-search, slack-read-thread, discourse-read-thread, discourse-reply) to return { success: false, error: "Unknown error" } because outputs['step'].success was undefined — the actual data was nested inside .output. Added unwrapOutputs() helper (same logic as buildProviderTemplateContext) and applied it to value_js, if conditions, Liquid templates, and expression mappings.

  • OAuth2 in SSE server: executeHttpClientTool only handled bearer auth, missing oauth2_client_credentials. When the AI called http_client tools with oauth2 auth (e.g. MongoDB Atlas atlas-api), no token exchange happened, causing 401 Unauthorized.

Impact

This was causing complete tool failure in production. A traced task (e6a694b7) showed the AI calling slack-search 4x and slack-read-thread 9x over 7 minutes — every call returned "Unknown error", and the AI produced empty output {"text":""}.

Test plan

  • visor test --no-mocks --only discourse-read-real — passes, returns real thread content
  • visor test --no-mocks --only atlas-list-projects-real — passes, oauth2 token exchange works
  • visor test --only discourse-skill-activation — passes with mocks
  • Pre-commit hooks pass (eslint, prettier, unit tests)

🤖 Generated with Claude Code

…to SSE server

Two bugs fixed:

1. Workflow output value_js received raw ReviewSummary wrappers instead of
   unwrapped step outputs. This caused every workflow tool (slack-search,
   slack-read-thread, discourse-read-thread, discourse-reply) to return
   "Unknown error" because outputs['step'].success was undefined (actual
   data was nested in .output). Script steps already unwrapped correctly
   via buildProviderTemplateContext, but workflow-executor's value_js,
   if conditions, and Liquid contexts did not.

2. executeHttpClientTool in mcp-custom-sse-server only handled bearer auth,
   not oauth2_client_credentials. When the AI called http_client tools with
   oauth2 auth (e.g. MongoDB Atlas), no token exchange happened and requests
   went out without Authorization headers, causing 401 errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@probelabs
Copy link
Copy Markdown
Contributor

probelabs Bot commented Mar 26, 2026

Pull Request Overview

Summary

This PR primarily adds task telemetry visualization and local observability infrastructure improvements, along with several minor fixes across the codebase.

Note: The PR description mentions fixes for workflow output unwrapping and OAuth2 in SSE server, but those specific files (src/providers/mcp-custom-sse-server.ts, src/workflow-executor.ts) are not present in the actual diff. The changes described below are what's actually in this PR based on the code diff.

Files Changed Analysis

Task Telemetry & Trace Improvements (4 files, +900/-60 lines)

src/agent-protocol/trace-serializer.ts (+839/-56)

  • Added buildTraceReport() function returning structured TraceReport with tree, task summary, and header
  • Added extractProbeTaskSummary() to parse task telemetry from OTEL spans (created, updated, completed, batch operations)
  • Added resolveTraceSpans() to unify trace fetching from Grafana, Jaeger, and local files
  • Added buildTraceHeaderLines() to format task status with markers ([x], [~], [!], [-])
  • Enhanced span rendering to display task events and snapshots in trace trees
  • Improved local NDJSON file reading to filter by targetTraceId for mixed trace files

src/agent-protocol/task-live-updates.ts (+150/-11)

  • Added task summary extraction and formatting for live updates
  • Displays task status with checkboxes and scopes (Code Explorer, Search Delegate, Engineer)
  • Shows task counts and event/snapshot statistics
  • Added inflightTick, inflightMetadataRefresh, and inflightPublishes tracking for better concurrency control

src/agent-protocol/task-live-update-slack.ts (+40/-7)

  • Added publishQueue to serialize all publish calls and prevent concurrent postMessage races
  • Added consecutiveUpdateFailures counter for observability
  • Suppresses progress message fallback on chat.update failures to avoid Slack spam

src/agent-protocol/task-evaluator.ts (+4/-4)

  • Updated to use buildTraceReport() instead of serializeTraceForPrompt()

src/agent-protocol/tasks-cli-handler.ts (+7/-5)

  • Updated trace handler to use buildTraceReport() and display header text

Observability Infrastructure (9 new files, +357 lines)

deploy/observability/local/ directory

  • Complete local observability stack replacing single-container LGTM:
    • docker-compose.yml - orchestrates Tempo, OTel Collector, Prometheus, Grafana, autoheal
    • Dockerfile.tempo / Dockerfile.otelcol - adds busybox for health checks
    • tempo.yaml - distributed tracing backend configuration
    • otelcol.yaml - OTLP collector with batch processing and memory limiting
    • prometheus.yaml - metrics scraping configuration
    • grafana/provisioning/datasources/datasources.yaml - auto-configures Tempo and Prometheus
    • README.md - setup and usage documentation

docs/telemetry-setup.md (+18/-2)

  • Updated to recommend Visor observability stack over LGTM
  • Documents port mappings (8001 Grafana, 4317/4318 OTLP, 3200 Tempo, 9091 Prometheus)

Other Improvements (15 files, +150/-30 lines)

defaults/code-talk.yaml (+13/-0)

  • Enabled enableTasks: true for code-talk workflow
  • Added comprehensive task protocol documentation for delegates

src/ai-review-service.ts (+37/-7)

  • Exported createProbeTracerAdapter() for testing
  • Added recordTaskEvent() method to emit task telemetry
  • Mirrored span-worthy task events to local NDJSON fallback trace
  • Added formatUserFacingExecutionError() usage

src/frontends/*.ts (5 files, +86/-7)

  • Added formatUserFacingExecutionMessage() to error formatting in all frontends
  • Enhanced error extraction from result.issues for system/error rule IDs

src/cli-main.ts (+9/-1)

  • Added child-process-error-handler import at startup
  • Made sessionRegistry.clearAllSessions() async

src/index.ts (+6/-1)

  • Added child-process-error-handler import
  • Made sessionRegistry.clearAllSessions() async

src/email/polling-runner.ts (+10/-3)

  • Added storageCleanupStop tracking for periodic cleanup

src/generated/config-schema.ts (+19/-8)

  • Added debounce and debounce_key to step config schema
  • Made workflow optional in schedule config

package.json / package-lock.json (+5/-5)

  • Bumped @probelabs/probe from 0.6.0-rc311 to 0.6.0-rc313

Architecture & Impact Assessment

Task Telemetry System

New task summary extraction:

  • Parses task events from OTEL spans (task.created, task.updated, task.completed, task.batch_*)
  • Builds hierarchical scopes (Main Agent, Code Explorer, Search Delegate, Engineer)
  • Formats task status with markers: [x] completed, [~] in_progress, [!] failed, [-] cancelled
  • Displays in live updates, CLI trace view, and task evaluation

Trace report structure:

interface TraceReport {
  traceData: ResolvedTraceData;
  tree: string;              // YAML-formatted span tree
  taskSummary: ProbeTaskSummary | null;
  headerText: string;        // Task summary header
  text: string;              // Combined header + tree
}

Observability Stack

Multi-container architecture:

graph TB
    subgraph "Visor App"
        A[OTel SDK] -->|OTLP| B[4318 HTTP / 4317 gRPC]
    end
    
    subgraph "Observability Stack"
        B --> C[OTel Collector]
        C -->|Traces| D[Tempo :3200]
        C -->|Metrics| E[Prometheus :9091]
        D -->|Service Map| F[Grafana :8001]
        E -->|Metrics| F
        G[Autoheal] -.->|Health Checks| C
        G -.->|Health Checks| D
        G -.->|Health Checks| E
        G -.->|Health Checks| F
    end
Loading

Benefits over LGTM:

  • Separate services for independent scaling
  • Health checks and auto-recovery via autoheal container
  • Easier debugging - can inspect individual components
  • Production-ready configuration

Slack Live Updates Concurrency Fix

Before: Race condition where tick() and complete() both see messageTs=undefined and each call chat.postMessage, creating duplicate messages.

After: publishQueue serializes all publish calls:

private enqueue(text: string, mode: 'progress' | 'final') {
  this.publishQueue = this.publishQueue
    .catch(() => {})
    .then(() => this.publish(text, mode));
  return this.publishQueue;
}

Scope Discovery & Context Expansion

Direct Impact

Task telemetry visualization:

  • Live updates in Slack now show task progress with checkboxes
  • visor tasks trace command displays task summary header
  • Task evaluation includes task state from traces

Observability infrastructure:

  • Local development now has separate Tempo, Prometheus, Grafana services
  • Better debugging capabilities with individual component inspection
  • Health checks and auto-recovery for long-running dev sessions

Frontend error formatting:

  • All frontends (Slack, Teams, Telegram, WhatsApp, Email) now format errors consistently
  • Better error messages from system/error rule IDs

Production Impact

Severity: Low-Medium (improvements and observability enhancements)

No critical bugs fixed in this PR - The description mentions workflow output unwrapping and OAuth2 fixes, but those files are not in the diff.

References

Code Locations

Task telemetry changes:

  • src/agent-protocol/trace-serializer.ts:343-440 - resolveTraceSpans
  • src/agent-protocol/trace-serializer.ts:923-1485 - task telemetry parsing
  • src/agent-protocol/trace-serializer.ts:1547-1615 - buildTraceReport
  • src/agent-protocol/task-live-updates.ts:660-731 - task summary formatting

Observability stack:

  • deploy/observability/local/docker-compose.yml - complete stack
  • deploy/observability/local/README.md - setup docs

Slack live updates:

  • src/agent-protocol/task-live-update-slack.ts:31-50 - enqueue logic

Frontend error handling:

  • src/frontends/slack-frontend.ts:598-606 - error formatting example

Related Patterns

  • src/utils/template-context.ts:66-76 - buildProviderTemplateContext unwrap pattern (mentioned in desc but not changed)
  • src/failure-condition-evaluator.ts:173-183 - condition evaluator unwrap (mentioned but not changed)
  • src/utils/oauth2-token-cache.ts - OAuth2TokenCache (mentioned but not changed)
Metadata
  • Review Effort: 3 / 5
  • Primary Label: enhancement

Powered by Visor from Probelabs

Last updated: 2026-05-04T11:50:37.803Z | Triggered by: pr_updated | Commit: e6e082b

💡 TIP: You can chat with Visor using /visor ask <your question>

@probelabs
Copy link
Copy Markdown
Contributor

probelabs Bot commented Mar 26, 2026

Security Issues (1)

Severity Location Issue
🟠 Error contract:0
Output schema validation failed: must have required property 'issues'

Performance Issues (9)

Severity Location Issue
🟡 Warning src/agent-protocol/trace-serializer.ts:344-440
resolveTraceSpans() makes multiple sequential backend calls without early exit optimization. If the first backend (e.g., Grafana) returns spans successfully, it still continues to check other backends in the loop. The code has early returns after successful fetchFromGrafanaTempo/fetchFromJaeger, but the structure could be clearer and the remote backend loop could exit immediately after the first successful response instead of continuing through remaining backends.
💡 SuggestionAdd a 'break' statement immediately after successful span retrieval from any remote backend to exit the for-of loop early. The current code does return early, but the loop structure could be more explicit about exiting after first success.
🟡 Warning src/agent-protocol/trace-serializer.ts:588-640
findTraceFile() reads the entire first line of each trace file to check traceId, but the new fetchFromLocalFiles() implementation reads ALL lines of NDJSON files when targetTraceId is set. For large mixed trace files with many traces, this O(n) scan through all lines happens multiple times - once in findTraceFile() and again in fetchFromLocalFiles(). This is redundant I/O.
💡 SuggestionCache the file-to-traceId mapping after the first scan, or combine the traceId discovery and span extraction into a single pass. Consider maintaining an in-memory index of trace files to their traceIds.
🟡 Warning src/agent-protocol/trace-serializer.ts:508-545
fetchFromLocalFiles() creates a readline interface and streams through the entire NDJSON file line-by-line, parsing each JSON object. For large trace files with many spans, this is O(n) memory and CPU intensive. The code does filter by targetTraceId early, but still parses every JSON object in the file even when only interested in one trace.
💡 SuggestionConsider adding a fast-path index file that maps traceIds to byte offsets, allowing direct seeking to the relevant trace data. Alternatively, use a binary format with better random access properties than NDJSON.
🟡 Warning src/agent-protocol/trace-serializer.ts:923-1486
extractProbeTaskSummary() and related task telemetry functions (parseTaskStatusSnapshot, summarizeTaskTelemetrySpans, buildProbeTaskScopeSummary, buildTemporalTaskScopes) perform multiple passes over the spans array with sorting, filtering, and Map operations. For traces with thousands of spans, this O(n log n) sorting and multiple iterations could impact trace rendering performance, especially in live updates where this runs every 10 seconds.
💡 SuggestionConsider caching the parsed task summary per traceId, or optimize to a single-pass algorithm that builds task scopes while iterating spans once. The current implementation sorts spans multiple times and creates intermediate Map structures.
🟡 Warning src/agent-protocol/task-live-updates.ts:311-324
tick() method uses an inflightTick promise to prevent concurrent executions, which is good. However, the pattern of storing the promise and checking it could lead to missed ticks if a tick is already in progress when a new one is scheduled. The comment says 'if this.completed return this.inflightTick' but the actual check just returns without waiting, potentially skipping updates.
💡 SuggestionConsider whether skipped ticks are acceptable for live updates. If not, implement a queue or flag to request a re-tick after the current one completes. The current implementation may miss state changes during long-running ticks.
🟡 Warning src/agent-protocol/task-live-updates.ts:207-317
scheduleMetadataRefresh() uses the same coalescing pattern as tick(), preventing concurrent metadata refreshes. However, metadata refresh happens every 30 seconds independently of ticks, and both operations call extractTraceSkillMetadata() which fetches spans. This could result in redundant span fetching if a tick and metadata refresh happen close together.
💡 SuggestionConsider sharing the fetched spans between tick and metadata refresh operations, or increase the metadata refresh interval to reduce redundant fetches. Add a cache with TTL for span data to avoid re-fetching the same trace within a short window.
🟡 Warning src/agent-protocol/task-live-update-slack.ts:40-54
enqueue() chains promises by reassigning this.publishQueue, which creates a chain of promises. In theory this is fine, but if publish() calls fail repeatedly without resolution, the chain could grow unbounded. The .catch(() => {}) swallows errors, which is good for flow but could hide issues.
💡 SuggestionConsider adding a maximum queue depth limit. If the queue grows too large (e.g., >100 pending publishes), start dropping or coalescing updates to prevent memory issues from promise chain buildup.
🟡 Warning src/agent-protocol/task-live-updates.ts:313-328
trackPublish() adds promises to inflightPublishes Set but never removes them if the promise rejects. The try/finally block handles successful completion, but if the promise rejects before being added to the Set, or if there's a race condition, promises could accumulate in the Set causing a memory leak over long-running tasks with many live updates.
💡 SuggestionEnsure promises are always removed from the Set even if they reject. The current finally block should handle this, but consider adding a defensive cleanup or periodic Set clearing to prevent accumulation.
🟡 Warning src/ai-review-service.ts:117-137
emitEvent() mirrors span-worthy events to local NDJSON fallback trace by creating a new child span and immediately ending it, then also calling emitNdjsonFullSpan(). This double-emission creates both an OTel span and an NDJSON record for the same event, doubling I/O and memory usage for telemetry.
💡 SuggestionConsider whether both emission paths are necessary. If OTel export is working reliably, the NDJSON fallback may be redundant. Alternatively, make the NDJSON emission conditional on a flag or only emit to one backend, not both.

Quality Issues (14)

Severity Location Issue
🟢 Info src/agent-protocol/trace-serializer.ts:1061
renderYamlNode() truncates task titles to 100 characters without clear justification. This magic number should be a named constant.
💡 SuggestionReplace 100 with a named constant like MAX_TASK_TITLE_LENGTH and document the rationale
🟢 Info src/agent-protocol/trace-serializer.ts:1921
Multiple magic numbers for truncation: 80 for task titles, 120 for task IDs, 100 for errors. These should be named constants.
💡 SuggestionExtract magic numbers to named constants with clear names
🟢 Info src/providers/mcp-check-provider.ts:597
The stderr error handler only logs debug messages. If stderr errors occur frequently, there is no visibility into the problem or metrics for monitoring.
💡 SuggestionConsider incrementing a metric or counter for stderr errors to monitor transport health
🟢 Info src/providers/script-check-provider.ts:382
The stderr error handler only logs debug messages. There is no tracking of how often these errors occur for monitoring purposes.
💡 SuggestionAdd metrics or counters for stderr errors to monitor transport health
🟢 Info src/slack/markdown.ts:135
The stderr and stdout error handlers are empty functions that do nothing. While this prevents crashes, it provides no visibility into Mermaid rendering failures.
💡 SuggestionLog or track these errors instead of silently ignoring them
🟡 Warning src/agent-protocol/task-live-updates.ts:260
The enqueue() method chains promises with .catch(() => {}) which silently swallows errors. If publish() fails, the error is lost and subsequent publish calls may continue with inconsistent state.
💡 SuggestionLog errors caught in .catch() instead of silently ignoring them for debugging and monitoring
🟡 Warning src/agent-protocol/task-live-updates.ts:283
The complete() and fail() methods wait for inflightTick but use try/catch without re-throwing or logging. If the tick fails, the error is swallowed.
💡 SuggestionLog the error from inflightTick before continuing to provide visibility into failures
🟡 Warning src/agent-protocol/trace-serializer.ts:343
resolveTraceSpans() reads local NDJSON files line-by-line using readline.createInterface, parsing every line to filter by targetTraceId. This is O(n) for every trace lookup, inefficient for large files.
💡 SuggestionConsider indexing trace files by traceId or caching parsed spans in memory when the same file is accessed multiple times
🟡 Warning src/agent-protocol/trace-serializer.ts:922
parseTaskStatusSnapshot() uses regex to parse XML-like task status. This is fragile for nested XML and does not handle malformed input gracefully.
💡 SuggestionUse a proper XML parser library instead of regex, or validate the XML structure before parsing
🟡 Warning src/scheduler/scheduler.ts:289
When workflow is not set (reminder mode), the code only checks if job.inputs.text exists. It does not validate that inputs.text is a non-empty string.
💡 SuggestionAdd validation that inputs.text is a non-empty string when workflow is not set
🟡 Warning src/email/polling-runner.ts:105
startPeriodicStorageCleanup is called without checking if a previous cleanup timer exists. If startListening is called multiple times, it may create multiple cleanup timers.
💡 SuggestionCheck if storageCleanupStop exists and call it before starting a new cleanup timer
🟡 Warning src/agent-protocol/trace-serializer.ts:800
The PR adds extensive task telemetry parsing functions (parseTaskStatusSnapshot, summarizeTaskTelemetrySpans, buildProbeTaskScopeSummary, buildTemporalTaskScopes, extractProbeTaskSummary) but there are no corresponding unit tests for these complex functions.
💡 SuggestionAdd comprehensive unit tests for task telemetry parsing functions, covering edge cases like malformed task status XML, missing attributes, empty task lists, and nested scopes
🟡 Warning src/ai-review-service.ts:103
The createProbeTracerAdapter function is modified to add recordTaskEvent method and mirror span-worthy task events to NDJSON fallback trace, but there are no tests verifying this new telemetry functionality.
💡 SuggestionAdd unit tests for recordTaskEvent method that verify task events are emitted as both spans and NDJSON records
🟡 Warning src/agent-protocol/task-live-updates.ts:670
The extractTraceSkillMetadata function is modified to include task summary extraction, but there are no tests verifying this new functionality.
💡 SuggestionAdd unit tests for extractTraceSkillMetadata that verify task summary extraction from trace spans with task telemetry events

Powered by Visor from Probelabs

Last updated: 2026-05-04T11:50:07.635Z | Triggered by: pr_updated | Commit: e6e082b

💡 TIP: You can chat with Visor using /visor ask <your question>

buger and others added 6 commits March 30, 2026 18:59
- Fix on_message trigger dispatch to match normal message path:
  seed setFirstMessage so human_input checks auto-resolve and the
  full intent-router → build-config → generate-response chain runs
  with proper tool loading (Jira MCP, Slack, etc.)
- Inject trigger.inputs.text as the AI message with original Slack
  message appended, so triggers can give specific instructions
- Fix live update race condition: serialize publish() calls via a
  promise queue in SlackTaskLiveUpdateSink to prevent duplicate
  Slack messages when tick() and complete() run concurrently
- Track inflightTick promise so complete()/fail() await in-flight
  ticks before publishing the final update
- Fix self-bot message detection for bot_message subtypes by also
  checking ev.bot_id against the bot's own bot_id from auth.test
- Add resolveChannelName() to SlackClient for #channel-name support
  in scheduler output targets via conversations.list with caching
- Allow cron jobs without workflow (inputs.text as user message)
- Make StaticCronJob.workflow optional in types
- Fix workflow output warning to only fire for undefined (not null)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add debounce-manager for throttling check executions and integrate
it into level-dispatch. Supports configurable throttle settings
per check via config types.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add global uncaughtException handler that suppresses transient I/O
errors (EIO, EPIPE, ECONNRESET, ERR_STREAM_DESTROYED) from dying
child processes instead of crashing the entire visor process.

Three layers of defense:
- Global handler in child-process-error-handler.ts (imported early)
- Worktree manager skips process.exit(1) for transient I/O errors
- Stream-level error handlers on MCP transport stderr pipes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Update @probelabs/probe to v0.6.0-rc313 with enriched task telemetry
  (agent scope fields, full task state on events, task.items_json)
- Parse task.items_json from batch events for proper titles on batch
  created/updated/completed/deleted operations
- Collapse sub-agent scopes (engineer, code-explorer) that lack
  meaningful task titles into deduplicated single-line entries instead
  of showing repetitive generic "Engineer Task" items
- Preserve sub-agent task titles when they exist (from task tool snapshots)
- Group repeated sub-agent iterations under a single scope label

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant