Skip to content

Conversation

@thedotmack
Copy link
Owner

Summary

Extracts the queue monitoring system from PR #335, excluding the Bun→Node runtime switch and Windows PowerShell fixes.

This PR focuses solely on the queue infrastructure that provides visibility and recovery for stuck messages.

What's Included

Queue Infrastructure

  • PendingMessageStore - SQLite-based persistent message queue with retry logic
  • pending_messages table - Database migration (v12) with status tracking
  • WatchdogService - 30-second polling for automatic stuck message recovery
  • Queue status states: pendingprocessingprocessed/failed

Queue UI

  • QueueButton - Header button with live badge showing queue count
  • QueueDrawer - Slide-out panel for queue management
  • QueueMessageCard - Message cards with retry/abort actions
  • useQueue hook - Real-time updates via Server-Sent Events
  • useNotifications hook - Toast notifications for queue events

API Endpoints (QueueRoutes)

  • GET /api/queue - Fetch pending/processing messages
  • GET /api/queue/recently-processed - Show recent completions
  • POST /api/queue/retry/:id - Retry specific message
  • POST /api/queue/abort/:id - Remove from queue
  • POST /api/queue/recover/:sessionId - Force-restart stuck session

Integration

  • SessionManager tracks pending message IDs during processing
  • worker-service initializes WatchdogService and QueueRoutes
  • Original timestamp preservation for accurate observation dating

What's Excluded

Bun → Node.js runtime switch - Kept Bun runtime with bun:sqlite imports
Windows PowerShell fixes - Will be addressed in separate PR (see PR #335 conversation)
sqlite-compat.ts - Runtime abstraction layer not needed

Rationale

Based on PR #335 review:

The Windows PowerShell fixes are needed regardless of runtime (affects both Bun and Node) and will be submitted as a higher-priority separate PR.

Testing

✅ Build passes successfully
✅ All TypeScript compiles
✅ Queue UI components render
✅ PendingMessageStore database operations work

Related

🤖 Generated with Claude Code

Co-Authored-By: Claude Sonnet 4.5 [email protected]

Extracted queue-related features only (excluding Bun→Node runtime switch):

**Queue Infrastructure:**
- PendingMessageStore: SQLite-based persistent message queue
- pending_messages table (migration 12) in SessionStore
- Queue status tracking (pending/processing/processed/failed)
- Retry logic with configurable max retries

**Queue UI:**
- QueueButton: Header button with badge showing queue count
- QueueDrawer: Slide-out panel for queue management
- QueueMessageCard: Individual message display with actions
- useQueue hook: Real-time queue updates via SSE
- useNotifications hook: Toast notifications for queue events

**Auto-Recovery:**
- WatchdogService: 30-second polling for stuck messages
- Automatic reset of stuck processing messages
- Manual retry/abort controls in UI
- Session orphan recovery

**API Endpoints:**
- GET /api/queue: Fetch all pending/processing messages
- GET /api/queue/recently-processed: Show recent completions
- POST /api/queue/retry/:id: Retry specific message
- POST /api/queue/abort/:id: Remove message from queue
- POST /api/queue/recover/:sessionId: Force-restart stuck session

**Integration:**
- SessionManager tracks pending message IDs during processing
- worker-service initializes WatchdogService and QueueRoutes
- Original timestamp preservation for accurate observation dating

**Kept on Bun runtime:** All sqlite imports use 'bun:sqlite' (not sqlite-compat)

This enables visible queue monitoring, manual intervention for stuck messages,
and automatic recovery from worker crashes - solving real reliability issues
documented in PR #315.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
@claude
Copy link

claude bot commented Dec 16, 2025

Pull Request Review: Queue Monitoring System

I've completed a comprehensive review of PR #338. This is a well-architected feature that addresses real operational problems with stuck messages. Here's my detailed feedback:


Strengths

Architecture & Design

  • Clean separation of concerns: PendingMessageStore (persistence) → WatchdogService (recovery) → QueueRoutes (API) is well-structured
  • Event-driven design: The queue system integrates elegantly with the existing EventEmitter-based SessionManager
  • Thoughtful state machine: The pending → processing → processed/failed lifecycle with retry logic is robust
  • Original timestamp preservation: Excellent detail - ensures observations get accurate timestamps even after recovery (src/services/sqlite/SessionStore.ts:1293)

Database Schema

  • Well-designed migration (v12): The pending_messages table schema is comprehensive with proper foreign keys and CHECK constraints
  • Good indexing strategy: Indexes on session_db_id, status, and composite indexes optimize the common query patterns
  • Retention policy: Auto-cleanup of processed messages prevents unbounded growth

Error Handling & Recovery

  • Multiple recovery mechanisms:
    • Auto-retry with backoff (max 3 retries)
    • Watchdog polling every 30s for stuck messages (>2.5min)
    • Crash recovery on startup (resets processing → pending)
    • Manual retry/abort via UI
  • Graceful degradation: System continues working even if queue encounters errors

UI Implementation

  • Rich queue visualization: QueueMessageCard intelligently parses tool inputs to show human-readable summaries
  • Real-time updates: Server-Sent Events provide live status without polling overhead
  • Good UX affordances: Toast notifications, retry/abort buttons, session recovery actions

🔍 Issues & Recommendations

🟡 Medium Priority

1. SQL Injection Risk in QueueRoutes

Location: src/services/worker/http/routes/QueueRoutes.ts:88-89

The parseIntParam method likely validates integer parameters, but ensure it properly sanitizes input. If using direct string interpolation anywhere in SQL queries, this would be a security issue. ✅ Mitigated by prepared statements in PendingMessageStore, but worth double-checking.

2. Race Condition in WatchdogService Recovery

Location: src/services/worker/WatchdogService.ts:95-116

for (const sessionDbId of sessionsWithPending) {
  const session = this.sessionManager.getSession(sessionDbId);
  if (session) {
    if (!session.generatorPromise) {
      this.startGeneratorForSession(session);
    }
  }
}

Issue: Between checking !session.generatorPromise and calling startGeneratorForSession, another thread could start the generator, leading to duplicate SDK agents for the same session.

Recommendation: Add a lock or atomic check-and-set:

if (session && !session.generatorPromise) {
  session.generatorPromise = this.sdkAgent.startSession(session, this.workerRef)...
  // Rest of logic
}

3. Unbounded Retry Loop Potential

Location: src/services/sqlite/PendingMessageStore.ts:243-268

The markFailed method retries up to maxRetries=3, but there's no exponential backoff or delay between retries. If a message repeatedly fails instantly (e.g., malformed data), it could cause rapid retry cycles.

Recommendation: Consider adding a delay or exponential backoff before re-queuing to pending.

4. Memory Leak Risk in Debug Log

Location: src/services/worker/SessionManager.ts:60-72

The debug log ring buffer (DEBUG_LOG_MAX=100) is fine for production, but if debug logging is verbose during high-traffic periods, the shift() operation on every new entry could cause GC pressure.

Recommendation: This is likely fine as-is, but monitor in production. Consider using a circular buffer implementation if this becomes a hotspot.

5. Type Safety: any in WatchdogService

Location: src/services/worker/WatchdogService.ts:37

private workerRef: any;

Issue: Using any bypasses TypeScript's type checking and could hide errors.

Recommendation: Define a proper interface:

interface WorkerServiceRef {
  broadcastProcessingStatus: () => void;
}
private workerRef?: WorkerServiceRef;

🟢 Low Priority (Nice-to-Haves)

6. Test Coverage

I don't see tests for the new queue infrastructure in the diff. For a critical feature like message persistence and recovery, comprehensive tests would be valuable:

  • Unit tests for PendingMessageStore state transitions
  • Integration tests for WatchdogService recovery scenarios
  • Edge cases: corrupt JSON in tool_input, orphaned sessions, concurrent retries

7. Observability: Metrics Export

The current logging is good, but exporting metrics (stuck message count, retry rate, recovery success rate) to a monitoring system would help track queue health over time.

8. UI: Loading States

QueueMessageCard could show loading spinners when retry/abort actions are in flight (currently appears instant).

9. Documentation

Consider adding inline comments explaining:

  • Why 2.5min was chosen as the stuck threshold
  • The retry policy (3 attempts)
  • The 24-hour retention policy

🔒 Security Assessment

No critical security issues found

  • SQL Injection: Mitigated by prepared statements throughout
  • XSS: React automatically escapes content, no dangerouslySetInnerHTML usage
  • SSRF: No user-controlled URLs in the queue system
  • Access Control: Queue API endpoints should be localhost-only (verify CLAUDE_MEM_WORKER_HOST default)

📊 Performance Considerations

Good

  • Indexed queries in PendingMessageStore minimize DB overhead
  • Watchdog polling interval (30s) is reasonable - not too aggressive
  • Server-Sent Events avoid polling in the UI

⚠️ Watch Out For

  • If queue backlog grows large (100+ messages), the getQueueMessages() query could become slow (consider pagination)
  • cleanupProcessed() runs on every watchdog check - may want to reduce frequency (e.g., once per hour)

🎯 Final Verdict

Recommendation: ✅ Approve with minor fixes

This is high-quality work that solves a real problem (stuck messages) with a well-designed solution. The queue infrastructure is production-ready with a few caveats:

  1. Must fix: Race condition in WatchdogService (item ACCIDENTALLY TAGGED #2)
  2. Should fix: Type safety for workerRef (item Missing module error on MacOs #5)
  3. Nice to have: Add retry backoff (item import-history command fails with TypeError: path argument must be string #3), tests (item Fix missing @anthropic-ai/claude-agent-sdk dependency in hooks #6)

The scope is appropriate for extraction from PR #335 - you successfully isolated the queue system without pulling in the controversial Bun→Node migration.


📝 Specific File Feedback

PendingMessageStore.ts ⭐⭐⭐⭐⭐

Excellent implementation. Clear method names, good documentation, proper error handling.

WatchdogService.ts ⭐⭐⭐⭐

Solid design. Address the race condition and type safety issues noted above.

QueueRoutes.ts ⭐⭐⭐⭐⭐

Clean REST API design. Good use of BaseRouteHandler pattern.

QueueMessageCard.tsx ⭐⭐⭐⭐

Impressive tool input parsing with fallbacks. The safeJsonParse with double-encoding detection is clever.

SessionStore.ts (migration) ⭐⭐⭐⭐⭐

Database migration is thorough with proper version tracking and foreign keys.


Great work on this PR! The queue monitoring system will significantly improve reliability. Let me know if you'd like me to elaborate on any of these points.

Related: #335, #315

@thedotmack thedotmack mentioned this pull request Dec 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants