Skip to content

Commit e3fe61f

Browse files
AlexsJonesclaude
andcommitted
feat!: real-time workflow canvas updates via WebSocket
BREAKING CHANGE: Ensemble CRD replaces PersonaPack (see commit 432355b). The workflow canvas now updates in real-time when agent runs change phase (Running → Succeeded/Failed). Previously, status was stale until a manual page refresh because useNodesState captured initial values and never synced with polling data. Two fixes: 1. useEffect in EnsembleCanvas syncs runPhaseMap changes into existing nodes without resetting user-dragged positions 2. useRunEventInvalidation() hook watches WebSocket for agent.run.* events and immediately invalidates the runs query — giving near-instant updates instead of waiting for the 5s poll interval Applied to all three canvas variants: - EnsembleCanvas (per-pack detail page, editable) - GlobalEnsembleCanvas (ensemble list page, read-only) - DashboardEnsembleCanvas (dashboard widget) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c5b9e45 commit e3fe61f

1 file changed

Lines changed: 54 additions & 2 deletions

File tree

web/src/components/ensemble-canvas.tsx

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { useMemo, useCallback, useState } from "react";
1+
import { useMemo, useCallback, useState, useEffect, useRef } from "react";
22
import {
33
ReactFlow,
44
Background,
@@ -23,6 +23,8 @@ import {
2323
useEnsembles,
2424
usePatchEnsembleRelationships,
2525
} from "@/hooks/use-api";
26+
import { useWebSocket } from "@/hooks/use-websocket";
27+
import { useQueryClient } from "@tanstack/react-query";
2628
import { Save, Plus, Trash2, Database } from "lucide-react";
2729
import type {
2830
Ensemble,
@@ -31,6 +33,33 @@ import type {
3133
AgentRun,
3234
} from "@/lib/api";
3335

36+
// ── Real-time run status updates via WebSocket ─────────────────────────────
37+
38+
/** Invalidates the runs query when a run lifecycle event arrives over the
39+
* WebSocket, giving the canvas near-instant status updates. */
40+
function useRunEventInvalidation() {
41+
const { events } = useWebSocket();
42+
const qc = useQueryClient();
43+
const lastSeenRef = useRef(0);
44+
45+
useEffect(() => {
46+
if (events.length <= lastSeenRef.current) return;
47+
const newEvents = events.slice(lastSeenRef.current);
48+
lastSeenRef.current = events.length;
49+
50+
const hasRunEvent = newEvents.some(
51+
(e) =>
52+
e.topic === "agent.run.completed" ||
53+
e.topic === "agent.run.failed" ||
54+
e.topic === "agent.run.started" ||
55+
e.topic === "agent.run.requested",
56+
);
57+
if (hasRunEvent) {
58+
qc.invalidateQueries({ queryKey: ["runs"] });
59+
}
60+
}, [events, qc]);
61+
}
62+
3463
// ── Shared node data ────────────────────────────────────────────────────────
3564

3665
export interface PersonaNodeData {
@@ -406,6 +435,7 @@ interface EnsembleCanvasProps {
406435
}
407436

408437
export function EnsembleCanvas({ pack }: EnsembleCanvasProps) {
438+
useRunEventInvalidation();
409439
const { data: runs } = useRuns();
410440
const patchMutation = usePatchEnsembleRelationships();
411441
const relationships = pack.spec.relationships || [];
@@ -442,9 +472,29 @@ export function EnsembleCanvas({ pack }: EnsembleCanvasProps) {
442472

443473
const initialEdges = useMemo(() => buildEdges(relationships), [relationships]);
444474

445-
const [nodes, , onNodesChange] = useNodesState(initialNodes);
475+
const [nodes, setNodes, onNodesChange] = useNodesState(initialNodes);
446476
const [edges, setEdges, onEdgesChange] = useEdgesState(initialEdges);
447477

478+
// Sync run status into nodes when polling data changes — preserves
479+
// user-dragged positions while updating phase/task indicators.
480+
useEffect(() => {
481+
setNodes((prev) =>
482+
prev.map((node) => {
483+
const personaName = node.id;
484+
const status = runPhaseMap.get(personaName);
485+
const newPhase = status?.phase;
486+
const newTask = status?.task;
487+
if (node.data.runPhase === newPhase && node.data.runTask === newTask) {
488+
return node; // no change — keep reference stable
489+
}
490+
return {
491+
...node,
492+
data: { ...node.data, runPhase: newPhase, runTask: newTask },
493+
};
494+
}),
495+
);
496+
}, [runPhaseMap, setNodes]);
497+
448498
const onConnect = useCallback(
449499
(connection: Connection) => {
450500
if (!connection.source || !connection.target) return;
@@ -551,6 +601,7 @@ export function EnsembleCanvas({ pack }: EnsembleCanvasProps) {
551601
// ══════════════════════════════════════════════════════════════════════════════
552602

553603
export function GlobalEnsembleCanvas() {
604+
useRunEventInvalidation();
554605
const { data: packs } = useEnsembles();
555606
const { data: runs } = useRuns();
556607

@@ -654,6 +705,7 @@ export function GlobalEnsembleCanvas() {
654705
// ══════════════════════════════════════════════════════════════════════════════
655706

656707
export function DashboardEnsembleCanvas() {
708+
useRunEventInvalidation();
657709
const { data: packs } = useEnsembles();
658710
const { data: runs } = useRuns();
659711
const [selectedPack, setSelectedPack] = useState<string>("__all__");

0 commit comments

Comments
 (0)