Skip to content

Embed sync.Mutex in all agent types for thread-safe serialized execution #39

@fogfish

Description

@fogfish

Problem: Agents are stateful actors that must process one request at a time, but this is not enforced. Users can accidentally call Prompt() concurrently, causing data races and state corruption. Relying on documentation alone is insufficient.

Context:

  • Agents follow the actor model - they maintain state and must serialize execution
  • Most deployment environments (web servers, workers) are concurrent by default
  • Mutex overhead (<1μs) is negligible compared to LLM call latency (100ms-10s+)
  • Memory already has proper locking and can be safely shared across agents

Design Decision: Embed sync.Mutex in agent types to make them safe by default. Calls to Prompt() will be automatically serialized. This prevents data races while maintaining the actor model semantics.

Required Changes:

  1. Add mutex to Prompter (agent/prompter.go):
type Prompter[A any] struct {
    // mu serializes Prompt() calls. Only one request can be processed at a time.
    // This enforces the actor model: agents process one message at a time.
    mu sync.Mutex
    
    *Automata[A, *chatter.Reply]
}
  1. Add mutex to Manifold (agent/manifold.go):
type Manifold[A, B any] struct {
    // mu serializes Prompt() calls. Only one request can be processed at a time.
    // This enforces the actor model: agents process one message at a time.
    mu sync.Mutex
    
    llm      chatter.Chatter
    encoder  thinker.Encoder[A]
    decoder  thinker.Decoder[B]
    registry thinker.Registry
}

func (manifold *Manifold[A, B]) Prompt(ctx context.Context, input A, opt ...chatter.Opt) (B, error) {
    manifold.mu.Lock()
    defer manifold.mu.Unlock()
    
    var nul B
    // ... rest of existing implementation
}
  1. Add mutex to Automata (agent/automata.go):
type Automata[A, B any] struct {
    // mu serializes Prompt() calls. Only one request can be processed at a time.
    // This enforces the actor model: agents process one message at a time.
    mu sync.Mutex
    
    llm      chatter.Chatter
    memory   thinker.Memory
    reasoner thinker.Reasoner[B]
    encoder  thinker.Encoder[A]
    decoder  thinker.Decoder[B]
}

func (automata *Automata[A, B]) Prompt(ctx context.Context, input A, opt ...chatter.Opt) (B, error) {
    automata.mu.Lock()
    defer automata.mu.Unlock()
    
    var nul B
    // ... rest of existing implementation
}

func (automata *Automata[A, B]) PromptOnce(ctx context.Context, input A, opt ...chatter.Opt) (B, error) {
    automata.mu.Lock()
    defer automata.mu.Unlock()
    
    automata.Purge()
    // Call internal unlocked version or directly implement logic
    return automata.prompt(ctx, input, opt...)
}
  1. Add comprehensive godoc to each agent type:

Example for Automata (agent/automata.go):

// Automata is a generic agent that orchestrates LLM interactions with
// memory and reasoning capabilities.
//
// Thread Safety:
//
// Automata is safe for concurrent use. Multiple goroutines can call Prompt()
// on the same agent instance. However, calls are SERIALIZED - only one Prompt()
// executes at a time per instance. This enforces the actor model where agents
// process one message at a time to maintain state consistency.
//
// For true parallel processing of multiple requests, create separate agent
// instances per goroutine:
//
//   // Pattern 1: Agent per session (different memory contexts)
//   for i := 0; i < workers; i++ {
//       memory := memory.NewStream(100, systemPrompt)
//       agent := NewAutomata(llm, memory, encoder, decoder, reasoner)
//       go processSession(agent, sessions[i])
//   }
//
//   // Pattern 2: Shared memory across agents (reflected memory architecture)
//   sharedMemory := memory.NewStream(memory.INFINITE, systemPrompt)
//   for i := 0; i < workers; i++ {
//       agent := NewAutomata(llm, sharedMemory, encoder, decoder, reasoner)
//       go processRequest(agent, requests[i])
//   }
//
//   // Pattern 3: Single agent with serialized queue (OK but slower)
//   agent := NewAutomata(llm, memory, encoder, decoder, reasoner)
//   for i := 0; i < workers; i++ {
//       go func(req Request) {
//           // Calls are serialized by mutex - executes one at a time
//           result, _ := agent.Prompt(ctx, req.Input)
//           sendResponse(result)
//       }(requests[i])
//   }
//
// Memory Sharing:
//
// Memory implementations (Stream, Void) have their own thread-safety guarantees
// and can be safely shared across multiple agent instances. This is useful for
// implementing reflection-based memory architectures where multiple agents
// contribute to a shared knowledge base.
type Automata[A, B any] struct { ... }
  1. Update README.md concurrency section:

Add new section after "Agent composition":

## Concurrency

### Thread Safety Guarantees

All agent types (Prompter, Manifold, Automata) are safe for concurrent use:
- ✅ Multiple goroutines can call `Prompt()` on the same agent
- ⚠️ Calls are serialized - only one executes at a time per agent instance
- 🎯 This enforces the actor model: agents process one message at a time

### Concurrency Patterns

**Pattern 1: Agent per goroutine (RECOMMENDED for parallel processing)**
```go
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
    agent := agent.NewPrompter(llm, encoder)
    wg.Add(1)
    go func(input string) {
        defer wg.Done()
        result, _ := agent.Prompt(ctx, input)
        processResult(result)
    }(inputs[i])
}
wg.Wait()

Pattern 2: Shared agent with work queue (OK for simple cases)

agent := agent.NewPrompter(llm, encoder)

// Multiple workers, serialized execution
for i := 0; i < numWorkers; i++ {
    go func() {
        for input := range workQueue {
            result, _ := agent.Prompt(ctx, input)
            resultQueue <- result
        }
    }()
}

Pattern 3: Shared memory across agents (for reflection architectures)

// Memory is thread-safe and can be shared
sharedMemory := memory.NewStream(100, "You are a helpful assistant")

// Each agent processes independently but shares memory
for i := 0; i < numWorkers; i++ {
    agent := agent.NewAutomata(
        llm, 
        sharedMemory,  // Shared!
        encoder, 
        decoder, 
        reasoner,
    )
    go processWithAgent(agent, requests[i])
}

Performance Considerations

  • Mutex overhead: <1μs per lock/unlock
  • LLM call latency: 100ms-10s+ (dominates runtime)
  • Recommendation: Use pattern 1 (agent per goroutine) for best throughput
  • Pattern 2 acceptable when request rate is low or ordering matters

See examples/10_concurrent for complete working examples.

  1. Add FAQ entry to README.md:

Add to existing FAQ section:

<details>
<summary>Are agents safe for concurrent use?</summary>

Yes! All agent types embed a mutex and are safe for concurrent use. However, calls to `Prompt()` are serialized per agent instance - only one can execute at a time.

**For parallel processing**, create multiple agent instances:
```go
// Each goroutine gets its own agent
for i := 0; i < workers; i++ {
    agent := agent.NewPrompter(llm, encoder)
    go agent.Prompt(ctx, inputs[i])
}

Memory can be shared safely across agents:

sharedMem := memory.NewStream(100, systemPrompt)
for i := 0; i < workers; i++ {
    agent := agent.NewAutomata(llm, sharedMem, ...)
    go agent.Prompt(ctx, inputs[i])
}

The mutex overhead (<1μs) is negligible compared to LLM latency (100ms-10s).

```
  1. Add tests (agent/automata_test.go):
func TestAutomataConcurrentSafety(t *testing.T) {
    // Test that concurrent calls don't cause data races
    llm := newMockLLM()
    agent := NewAutomata(llm, memory.NewVoid(""), ...)
    
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            _, err := agent.Prompt(context.Background(), fmt.Sprintf("input-%d", n))
            if err != nil {
                t.Errorf("Concurrent call failed: %v", err)
            }
        }(i)
    }
    wg.Wait()
    
    // Run with: go test -race
}

func TestAutomataConcurrentSerialization(t *testing.T) {
    // Test that calls are actually serialized (not parallel)
    llm := &slowMockLLM{delay: 100 * time.Millisecond}
    agent := NewAutomata(llm, memory.NewVoid(""), ...)
    
    start := time.Now()
    
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            agent.Prompt(context.Background(), fmt.Sprintf("input-%d", n))
        }(i)
    }
    wg.Wait()
    
    duration := time.Since(start)
    
    // Should take ~300ms (3 * 100ms) not ~100ms (parallel)
    if duration < 250*time.Millisecond {
        t.Errorf("Calls appear to be parallel, expected serialized: %v", duration)
    }
}

Estimated Effort: 3 hours
Skills Required:

  • Go concurrency (mutexes, data races)
  • Technical writing (documentation)
  • Testing (race detector, concurrent tests)

Breaking Changes: None. This is purely additive.

Migration Guide: Not needed. Existing code continues to work. Thread safety is now automatic.

Rationale:

  1. Correctness over performance: Prevents data races by default
  2. Actor model alignment: Agents ARE actors - one message at a time
  3. Pragmatic: Most deployments are concurrent (web servers, workers)
  4. Minimal overhead: <1μs vs 100ms-10s LLM calls (0.001% overhead)
  5. Memory can still be shared: Orthogonal concern with its own locking
  6. Future-proof: Can optimize internally without breaking API

Testing:

  • Run tests with -race flag to verify no data races
  • Add concurrent execution tests
  • Add serialization verification tests
  • Test shared memory patterns

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions