feat(dashboard): SQLite workflowstore for ML jobs and OpenClaw; healt…#1656
feat(dashboard): SQLite workflowstore for ML jobs and OpenClaw; healt…#1656mkoushni wants to merge 1 commit intovllm-project:mainfrom
Conversation
…h/events APIs; AMD deploy test fix Signed-off-by: mkoushni <mkoushni@redhat.com>
✅ Deploy Preview for vllm-semantic-router ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Supply Chain Security Report — All Clear
Scanned at |
There was a problem hiding this comment.
Pull request overview
This PR migrates dashboard workflow/control-plane state (ML pipeline jobs + OpenClaw collaboration entities) from in-memory / JSON-file persistence to a shared, durable SQLite-backed workflowstore, and wires new APIs/health reporting around it.
Changes:
- Introduces
dashboard/backend/workflowstoreSQLite store with schema + legacy OpenClaw JSON import. - Updates ML pipeline runner/handlers to persist jobs + typed progress events, plus adds
/api/ml-pipeline/jobs/{id}/events. - Updates OpenClaw handlers/tests to store containers/teams/rooms/messages in SQLite and adds
/api/workflows/health.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/agent/state-taxonomy-and-inventory.md | Updates state inventory to reflect SQLite-backed durability for ML pipeline + OpenClaw. |
| dashboard/frontend/src/hooks/useConversationStorage.ts | Documents that localStorage chat is demo-only; points to OpenClaw rooms for durable history. |
| dashboard/backend/workflowstore/store.go | Adds SQLite store open/init + schema for ML pipeline + OpenClaw entities. |
| dashboard/backend/workflowstore/mlpipeline.go | Adds persisted ML job + progress event CRUD and recovery logic. |
| dashboard/backend/workflowstore/openclaw.go | Adds OpenClaw entity/message CRUD against SQLite. |
| dashboard/backend/workflowstore/legacy_import.go | Adds one-time import path from legacy OpenClaw JSON files into SQLite. |
| dashboard/backend/workflowstore/store_test.go | Adds restart/reopen and incremental message append tests for the store. |
| dashboard/backend/router/router.go | Opens workflow store, registers workflow health endpoint, injects store into OpenClaw/ML pipeline. |
| dashboard/backend/router/core_routes.go | Updates ML pipeline route wiring to create runner with store + recover running jobs. |
| dashboard/backend/router/openclaw_routes.go | Updates OpenClaw handler construction to require workflow store. |
| dashboard/backend/router/mcp_routes_test.go | Sets WorkflowDBPath for router integration tests. |
| dashboard/backend/mlpipeline/runner*.go | Replaces in-memory job map with persisted jobs + typed progress events in store. |
| dashboard/backend/handlers/workflow_health.go | Adds /api/workflows/health snapshot endpoint backed by store counts. |
| dashboard/backend/handlers/openclaw*.go + tests | Switches OpenClaw registry/rooms/messages persistence to store and updates tests to use temp SQLite. |
| dashboard/backend/handlers/mlpipeline.go | Adds /events sub-route for durable typed ML progress history. |
| dashboard/backend/handlers/deploy_test.go | Fixes AMD config fixture path to use the documented recipe. |
| dashboard/backend/config/config.go | Adds --workflow-db / env default path config for workflow SQLite. |
| return nil, fmt.Errorf("workflowstore: create dir: %w", err) | ||
| } | ||
|
|
||
| db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_busy_timeout=5000") |
There was a problem hiding this comment.
SQLite foreign-key constraints (including ON DELETE CASCADE) are not enforced unless PRAGMA foreign_keys=ON is enabled for the connection. As written, progress events won't cascade-delete with their job rows. Fix by enabling foreign keys on open (e.g., add _foreign_keys=1/_fk=1 to the DSN, or execute PRAGMA foreign_keys = ON after opening).
| db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_busy_timeout=5000") | |
| db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_busy_timeout=5000&_foreign_keys=1") |
| CREATE TABLE IF NOT EXISTS ml_pipeline_progress_events ( | ||
| id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| job_id TEXT NOT NULL, | ||
| step TEXT NOT NULL DEFAULT '', | ||
| percent INTEGER NOT NULL DEFAULT 0, | ||
| message TEXT NOT NULL DEFAULT '', | ||
| recorded_at TEXT NOT NULL, | ||
| FOREIGN KEY (job_id) REFERENCES ml_pipeline_jobs(id) ON DELETE CASCADE | ||
| ); |
There was a problem hiding this comment.
SQLite foreign-key constraints (including ON DELETE CASCADE) are not enforced unless PRAGMA foreign_keys=ON is enabled for the connection. As written, progress events won't cascade-delete with their job rows. Fix by enabling foreign keys on open (e.g., add _foreign_keys=1/_fk=1 to the DSN, or execute PRAGMA foreign_keys = ON after opening).
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| _, err := s.db.Exec( | ||
| `INSERT OR REPLACE INTO openclaw_room_message (room_id, message_id, json) VALUES (?, ?, ?)`, |
There was a problem hiding this comment.
INSERT OR REPLACE is not append-only: on a duplicate (room_id, message_id) it deletes and reinserts the row, which changes seq and can reorder history. This can also overwrite previously stored message JSON unexpectedly. Prefer idempotent insertion (INSERT OR IGNORE / ON CONFLICT(room_id, message_id) DO NOTHING) or an explicit DO UPDATE that does not touch seq (requires storing seq separately).
| `INSERT OR REPLACE INTO openclaw_room_message (room_id, message_id, json) VALUES (?, ?, ?)`, | |
| `INSERT OR IGNORE INTO openclaw_room_message (room_id, message_id, json) VALUES (?, ?, ?)`, |
| _ = r.wf.UpdateMLJobProgress(jobID, percent, step) | ||
| _ = r.wf.AppendMLProgressEvent(jobID, step, percent, message) | ||
| } | ||
|
|
||
| func (r *Runner) persistJob(j *Job) error { | ||
| rec := jobToRecord(j) | ||
| return r.wf.PutMLJob(rec) | ||
| } | ||
|
|
||
| func (r *Runner) setJobRunning(j *Job) { | ||
| j.Status = StatusRunning | ||
| _ = r.persistJob(j) | ||
| } | ||
|
|
||
| func (r *Runner) createJob(jobType string) *Job { | ||
| r.mu.Lock() | ||
| defer r.mu.Unlock() | ||
| id := fmt.Sprintf("ml-%s-%d", jobType, time.Now().UnixMilli()) | ||
| job := &Job{ | ||
| ID: id, | ||
| Type: jobType, | ||
| Status: StatusPending, | ||
| CreatedAt: time.Now(), | ||
| } | ||
| r.jobs[id] = job | ||
| _ = r.persistJob(job) | ||
| return job | ||
| } |
There was a problem hiding this comment.
Errors from workflowstore writes are being dropped (_ = ...). If the SQLite file becomes read-only, locked, or unavailable, the system will silently lose durability/typed history while still reporting progress over SSE. Consider at minimum logging these errors (ideally rate-limited) and/or propagating them to the job state so callers can see that persistence degraded.
| j.CreatedAt, _ = parseRFC3339Any(created.String) | ||
| if completed.Valid && completed.String != "" { | ||
| j.CompletedAt, _ = parseRFC3339Any(completed.String) | ||
| } |
There was a problem hiding this comment.
Time/JSON parse errors are discarded, which can silently turn CreatedAt/CompletedAt into zero values or drop OutputFiles if the stored data is malformed. Since these values drive UI ordering and job status display, it’s safer to return an error when parsing fails (or at least log and fail the read) to avoid presenting corrupted data as valid.
| if outJSON.Valid && outJSON.String != "" { | ||
| _ = json.Unmarshal([]byte(outJSON.String), &j.OutputFiles) | ||
| } |
There was a problem hiding this comment.
Time/JSON parse errors are discarded, which can silently turn CreatedAt/CompletedAt into zero values or drop OutputFiles if the stored data is malformed. Since these values drive UI ordering and job status display, it’s safer to return an error when parsing fails (or at least log and fail the read) to avoid presenting corrupted data as valid.
| func NewOpenClawHandler(dataDir string, readOnly bool, wf *workflowstore.Store) *OpenClawHandler { | ||
| if wf == nil { | ||
| panic("openclaw: workflow store is required") | ||
| } | ||
| return &OpenClawHandler{dataDir: dataDir, readOnly: readOnly, wf: wf} |
There was a problem hiding this comment.
Panicking in a constructor makes misconfiguration crash the whole process and complicates testing/embedding. Prefer returning (*OpenClawHandler, error) (or handling nil by returning a disabled handler) so caller(s) can decide whether to fail fast (log.Fatalf) or degrade gracefully.
| func NewOpenClawHandler(dataDir string, readOnly bool, wf *workflowstore.Store) *OpenClawHandler { | |
| if wf == nil { | |
| panic("openclaw: workflow store is required") | |
| } | |
| return &OpenClawHandler{dataDir: dataDir, readOnly: readOnly, wf: wf} | |
| func NewOpenClawHandler(dataDir string, readOnly bool, wf *workflowstore.Store) (*OpenClawHandler, error) { | |
| if wf == nil { | |
| return nil, fmt.Errorf("openclaw: workflow store is required") | |
| } | |
| return &OpenClawHandler{dataDir: dataDir, readOnly: readOnly, wf: wf}, nil |
| // UpdateMLJobProgress updates percent and current_step (and ensures row exists). | ||
| func (s *Store) UpdateMLJobProgress(jobID string, percent int, currentStep string) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| _, err := s.db.Exec( | ||
| `UPDATE ml_pipeline_jobs SET progress = ?, current_step = ? WHERE id = ?`, | ||
| percent, currentStep, jobID) | ||
| return err | ||
| } |
There was a problem hiding this comment.
The comment says this call 'ensures row exists', but the SQL is a plain UPDATE and will no-op if the job row is missing. Either adjust the comment to match behavior, or change the implementation to an upsert/insert-if-missing if callers rely on the guarantee.
| // ListOpenClawContainerJSON returns name -> json payloads ordered by name. | ||
| func (s *Store) ListOpenClawContainerJSON() ([]string, error) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| rows, err := s.db.Query(`SELECT json FROM openclaw_container ORDER BY name`) |
There was a problem hiding this comment.
All store operations (including read-only queries and row scanning) are serialized behind a single sync.Mutex, which can become a throughput bottleneck once multiple SSE/WS clients and API calls hit the store concurrently. Consider switching to sync.RWMutex (read-lock for list/get) or relying on *sql.DB concurrency and using narrower locks only around multi-statement transactions if you need store-level mutual exclusion.
|
can we reuse the postgre we set up now #1683? migrate all the sqlite storage into an unified one? |
Summary
Make dashboard workflow state restart-safe and server-owned for ML pipeline jobs and OpenClaw collaboration entities, instead of relying on in-memory maps, workspace-local JSON files, or browser-only state.
What changed
New
workflowstorepackage (dashboard/backend/workflowstore/): a shared SQLite database (./data/workflow.sqliteby default, override viaDASHBOARD_WORKFLOW_DB_PATHor--workflow-dbflag) that owns durable state for ML pipeline jobs, typed progress events, and OpenClaw containers, teams, rooms, and room messages.ML pipeline jobs (
dashboard/backend/mlpipeline/runner.go):ml_pipeline_jobsandml_pipeline_progress_eventstables instead of an in-memorymap[string]*Job.RecoverInterruptedMLJobsmarks anyrunningjobs asfailedwith a clear message (same pattern as the evaluation subsystem).GET /api/ml-pipeline/jobs/{id}/eventsreturns typed durable progress history (not log-derived).OpenClaw entities (
dashboard/backend/handlers/openclaw*.go):containers.json,teams.json,rooms.json,room-messages/*.json) to SQLite tables.INSERTrows instead of rewriting the entire JSON file on every message.Workflow health API:
GET /api/workflows/healthreturns a typed JSON snapshot with store connectivity status, ML job counts, and OpenClaw entity counts — no log scraping.Frontend annotation:
useConversationStorage.tsnow documents that browserlocalStoragechat history is demo/playground-only; OpenClaw room APIs are the supported server-owned collaboration history path.State inventory update:
docs/agent/state-taxonomy-and-inventory.mdrows for ML pipeline and OpenClaw updated to reflect the new persistence contract.Test fix:
TestMergeDeployPayload_RoundTripsMaintainedAMDConfigpointed at non-existentdeploy/amd/config.yaml; corrected to usedeploy/recipes/balance.yaml(the actual AMD reference recipe documented indeploy/amd/README.md).Files changed
dashboard/backend/workflowstore/{store,mlpipeline,openclaw,legacy_import,store_test}.godashboard/backend/handlers/workflow_health.godashboard/backend/handlers/openclaw_test_helpers_test.godashboard/backend/mlpipeline/runner{,_subprocess,_http,_config}.godashboard/backend/handlers/openclaw{,_rooms}.godashboard/backend/router/{router,core_routes,openclaw_routes}.godashboard/backend/config/config.goopenclaw_test.go,openclaw_image_test.go,openclaw_mcp_test.go,openclaw_room_readonly_test.go,openclaw_room_context_test.go,openclaw_worker_chat_test.go,mcp_routes_test.go,deploy_test.godashboard/frontend/src/hooks/useConversationStorage.tsdocs/agent/state-taxonomy-and-inventory.mdDesign decisions
auth.dbandevaluations.dbalready in the dashboard. Keeps local-dev simple; theStoreinterface is a natural seam for a future Postgres adapter when HA is needed.DELETE + INSERT ALLin a transaction (same semantics as the JSON-file writes they replace). Room messages use append-onlyINSERT.openclaw_containertable is empty andLegacyOpenClawDiris set. No migration tooling needed; existing JSON data is preserved on disk.Related
Test plan
go test ./dashboard/backend/workflowstore/— restart survival, recovery, incremental messagesgo test ./dashboard/backend/handlers/— all OpenClaw handler tests usenewTestOpenClawHandlerwith temp SQLitego test ./dashboard/backend/router/— MCP integration test withWorkflowDBPathsetTestMergeDeployPayload_RoundTripsMaintainedAMDConfigpasses with corrected recipe pathGET /api/workflows/healthreturns entity counts and"store":"ok"failedwith recovery message and progress events are queryable via/api/ml-pipeline/jobs/{id}/eventscontainers.json/teams.json/rooms.json, verify legacy import populates SQLite; subsequent restarts do not re-importResolve #1609