Skip to content

Tochemey/atto

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

atto

Stateful LLM agents in Go. Each session is an actor: it stays put on one node, frees memory when idle, and survives node loss.

Concurrent callers to the same session queue in order.
Backpressure is a typed error.
The same call works whether you run on one node or a cluster.

build Go Reference Go Report Card codecov Go version License

Atto is built for people shipping agents. You wire tool.Func, agent.NewLLM, and atto.New. You stream session.Event values like any other Go iterator.

Below that surface, each session is an actor. Turns happen in order even when two HTTP handlers touch the same session at the same time. If a session gets flooded you see ErrSessionBacklogFull, not silent corruption or an OOM. The same API runs on one node or across a cluster on top of GoAkt; sticky routing and storage reload live in the runtime.

If actors are new to you: it's the framework keeping one goroutine's worth of discipline per user. You don't have to maintain a map[string]*sync.Mutex or wonder what happens when two HTTP handlers touch the same chat history.

If you already work with actors, atto plugs in the LLM pieces on top: native OpenAI, Anthropic, and Gemini adapters, schema-validated tools from plain Go functions, optimistic concurrency on persisted snapshots, and the same atto.Runtime API single-node or clustered.

Day-to-day behaviour

  • Same user, two concurrent HTTP requests: turns run one after another; history stays coherent.
  • Different users: independent session actors run in parallel.
  • A caller floods one session: the runtime returns session.ErrSessionBacklogFull when the bounded stash is full.
  • The agent needs more information mid-task: it calls request_user_input to pause; the next message resumes the same task from history.
  • Idle chats pile up in RAM: configurable passivation frees memory; the next message reloads from your store.
  • Rolling restart or node loss: a shared store.SessionStore plus sticky placement keeps session state across process boundaries.
  • Race-free persistence: Snapshot.Version provides optimistic concurrency; conflicting writes come back as store.ErrConcurrentWrite.

Status: The public API in this release is the stability target. Breaking changes are still permitted between v0.x releases; the module path is stable through v1.

Installation

go get github.com/tochemey/atto

Requires Go 1.26.2+.

Features

The root atto package is the user-facing entry point; the rest of the surface lives under agent, session, tool, store/..., and llm/....

  • One front door, always actor-backed. atto.New(ctx, model, build) builds and starts an internal goakt actor system, registers atto's runtime extension, spawns the per-process model actor, and returns a *atto.Runtime ready for invocations. The build closure receives the model-actor-backed llm.LLM; the agent it returns uses that instance and inherits retry, passivation and (in cluster mode) placement transparently.

  • Session actors and backpressure. Behind the front door, a SessionActor per session owns the conversation; per-invocation RunWorker goroutines stream events through a buffered channel (atto.WithEventBufferSize, default 64). While a turn is in flight, extra work for that session queues in the actor stash up to atto.WithStashBound (default 32); beyond that you get session.ErrSessionBacklogFull. Idle sessions passivate after atto.WithPassivationAfter (default 15 minutes) and re-hydrate from the configured store on next use. A failed Save on commit rolls the turn back in memory before replying.

  • Model actor. Every completion runs through a singleton ModelActor: streaming via pipe-to tasks, exponential backoff retries for transient failures (caps configured via atto.WithModelMaxRetries, atto.WithModelBaseBackoff, atto.WithModelMaxBackoff), and cancellation tied to the caller so one stream does not abort another.

  • Cluster mode in one option. atto.WithCluster(provider, opts...) builds a cluster-enabled actor system: it registers atto's actor kinds, wires the remote serialisable types, and resolves dependencies through atto.Extension so actors can relocate without captured globals. provider is any atto/discovery.Provider; tune ClusterBind, ClusterQuorum, ClusterReplicaCount, ClusterPartitions, and ClusterBootstrapTimeout as needed. For goakt features outside the option set, build the actor system yourself and adopt it via atto.WithActorSystem(sys); atto.RegisterClusterPrerequisites and atto.RegisterRemotePrerequisites merge atto's kinds and wire types into the config you own.

  • Persistence contract with optimistic concurrency. store.SessionStore saves Snapshot values: history, state, UpdatedAt, and Version. Save rejects stale versions with store.ErrConcurrentWrite. Load and Delete round out the interface. Implementations live under store/inmemory, store/bolt, and store/postgres, all checked against store/storetest.

  • Pause for clarification. agent.WithClarification() registers a reserved request_user_input tool. When the model calls it, the agent loop intercepts before dispatch, emits session.EventInputRequired carrying the question and a synthesised assistant message, and ends the invocation. The question commits to history through the same path as a final message, so the paused conversation survives passivation and cluster relocation. The next Runtime.Run against the same session sees the question in history and continues naturally.

  • Atomic state delta. Each agent.Invocation carries a mutable session.State plus a session.StateDelta. Ops recorded in the delta commit atomically with the assistant message for that turn; partial turns leave no half-applied state behind.

  • Streaming events. Runtime.Run yields iter.Seq2[*session.Event, error]: text deltas, tool call and result notices, input-required pauses, the final assistant message, and terminal errors (session.EventKind).

  • LLM agent loop. agent.NewLLM streams completions and runs tool round trips until the model answers without more tool calls, or until WithMaxIterations stops the loop (default 10). Configure WithInstruction, WithTemperature, WithTools, WithName (shown as the event author), and any llm.LLM adapter.

  • Typed tools. tool.Func reflects the argument struct into JSON Schema (unless you pass WithSchema), attaches an optional description, and wraps ordinary Go functions. The registry validates JSON arguments before dispatch.

  • First-party model adapters. Streaming llm.LLM implementations for OpenAI-compatible HTTP APIs (llm/openai), native Anthropic (llm/anthropic, including Request.CacheKey for prompt caching), native Gemini (llm/gemini, AI Studio and Vertex), plus helpers for Azure, Ollama, and vLLM.

  • Structured logging. atto.WithLogger(*slog.Logger) forwards every goakt runtime message (cluster bootstrap, peer discovery, actor lifecycle, supervision, remoting) through your slog.Handler. The default is silent. The handler's level is the single source of truth: a slog.LevelDebug handler surfaces gossip and placement traffic while a slog.LevelWarn handler stays quiet.

  • A2A bridge. a2a.NewServer(rt, ...) exposes the runtime over A2A JSON-RPC + SSE; a2a.RemoteAgent calls remote A2A peers as agent.Agent sub-agents. Per-task SSE topics, cross-pod resubscribe, and input-required pause-and-resume survive node loss. See the A2A section below.

  • Test doubles. llm.NewFake replays scripted chunks; tool.Fake records calls. Combine them with store/inmemory for end-to-end tests without the network or goakt.

Five-minute tour

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/tochemey/atto"
    "github.com/tochemey/atto/agent"
    "github.com/tochemey/atto/llm"
    "github.com/tochemey/atto/session"
    "github.com/tochemey/atto/tool"
)

type weatherArgs struct{ City string `json:"city"` }

func main() {
    ctx := context.Background()

    weather := tool.Func("get_weather",
        func(_ context.Context, a weatherArgs) (string, error) {
            return fmt.Sprintf("18C and cloudy in %s.", a.City), nil
        },
        tool.WithDescription("Look up the weather in a city."),
    )

    // Swap for llm/openai, llm/anthropic, or llm/gemini when you have a key.
    model := llm.NewFake(
        llm.Script{Chunks: []*llm.Chunk{{
            ToolCalls: []session.ToolCall{{
                ID:        "call-1",
                Name:      "get_weather",
                Arguments: json.RawMessage(`{"city":"Lagos"}`),
            }},
            Done: true,
        }}},
        llm.Script{Chunks: []*llm.Chunk{
            {Delta: "The weather in Lagos is 18C and cloudy."},
            {Done: true},
        }},
    )

    build := func(m llm.LLM) agent.Agent {
        return agent.NewLLM(
            agent.WithModel(m),
            agent.WithInstruction("You are a helpful assistant."),
            agent.WithTools(weather),
        )
    }

    rt, err := atto.New(ctx, model, build)
    if err != nil {
        log.Fatal(err)
    }
    defer rt.Stop(ctx)

    for ev, err := range rt.Run(ctx, "user-123", session.UserText("What's the weather in Lagos?")) {
        if err != nil {
            log.Fatal(err)
        }
        if ev.Kind == session.EventTextDelta {
            fmt.Print(ev.TextDelta)
        }
    }
}

The surface area stays small: Agent, LLM, Tool, Event, Runtime. Session lifecycle, mailboxes, stash limits, passivation, and cluster serialisation live inside the runtime. You do not subclass actors or spawn a goroutine per chat by hand.

Motivation

Go has real agent frameworks now. LangChainGo and Google's Agent Development Kit for Go are the most widely cited, and many smaller modules ship on pkg.go.dev.

Across those projects the dominant pattern is composition inside one process: wire models, tools, and storage with the library's abstractions, then solve per-session ordering, restart survival, load shedding, and multi-replica deployment yourself. Clustering and sticky sessions are usually application architecture (databases, queues, load balancers), not one shared runtime primitive in the toolkit.

Atto puts those concerns on an actor system. GoAkt already provides the runtime primitives: cluster-aware placement, supervision, scheduling, bounded mailboxes, passivation. Atto maps sessions and completions onto actors, so Runtime.Run looks the same whether atto built the actor system or you handed it a clustered one. The public surface stays small: Agent, LLM, Tool, Event, Runtime, plus adapters and stores. The runtime does the work.

Architecture at a glance

atto.New is the façade. It builds and starts a private goakt ActorSystem, registers atto's runtime extension with your llm.LLM and store.SessionStore, spawns the per-process model actor, and returns a *atto.Runtime. The build closure receives the model-actor-backed LLM and hands it to the agent that the runtime drives. Per-invocation Worker goroutines talk to a SessionActor for history and commits and a ModelActor for retried completions.

internal/actor.SessionActor loads and saves through store.SessionStore, applies persisted turn deltas (CommitTurn), and uses goakt's stash so overlapping snapshot requests wait in line or error cleanly when the stash overflows.

The model-actor bridge wraps your raw llm.LLM so streaming completions go through ModelActor, where centralised retry and backoff live. That matters most on a cluster. The wiring is internal; the agent only ever sees llm.LLM.

store.SessionStore is the contract behind inmemory, bolt, and postgres. The shared storetest suite keeps every backend honest.

Single node, then a cluster: the same Run call

Single-process callers stay on the default atto.New(ctx, model, build). For cluster mode, pass atto a discovery.Provider and a shared store. Atto registers the actor kinds, wire messages, and runtime extension internally; your code never imports goakt:

rt, _ := atto.New(ctx, model, build,
    atto.WithStore(postgresStore),
    atto.WithCluster(provider,
        atto.ClusterQuorum(2),
        atto.ClusterBind("0.0.0.0", 3320, 3321, 3322),
    ),
)

provider implements atto/discovery.Provider: four methods (ID, Start, DiscoverPeers, Stop) over a context.Context-aware lifecycle. For any backend goakt supports, a custom discovery.Provider is typically a 50 to 60 line wrapper around goakt's existing implementation. examples/cluster shows that shape against goakt/v4/discovery/kubernetes, keeping the rest of user code goakt-free.

atto.WithCluster requires an explicit atto.WithStore: the in-memory default is private per node and would silently break session affinity, so atto returns ErrClusterNeedsSharedStore if you forget. Pair it with a backend every node can reach: store/postgres, or the durable cluster-friendly backends under atto-stores.

Bring your own actor system (advanced)

Some goakt features aren't exposed through atto's option set: custom logger, custom serdes, multi-data-centre placement. To use them, build the actor system yourself and adopt it via atto.WithActorSystem(sys). atto.RegisterClusterPrerequisites and atto.RegisterRemotePrerequisites merge atto's kinds and wire types into your config without disturbing your own:

clusterCfg := gactor.NewClusterConfig().
    WithDiscovery(myProvider).
    WithKinds(myKinds...).
    WithMinimumPeersQuorum(2)
atto.RegisterClusterPrerequisites(clusterCfg)

remoteCfg := remote.NewConfig("0.0.0.0", 3330,
    remote.WithSerializables(myMessages...),
)
atto.RegisterRemotePrerequisites(remoteCfg)

sys, _ := gactor.NewActorSystem("agents",
    gactor.WithRemote(remoteCfg),
    gactor.WithCluster(clusterCfg),
    gactor.WithExtensions(atto.Extension(
        atto.ExtensionWithStore(postgresStore),
        atto.ExtensionWithLLM(model),
    )),
)
_ = sys.Start(ctx)

rt, _ := atto.New(ctx, model, build, atto.WithActorSystem(sys))

Both Register… helpers are additive: kinds and serialisables you supply survive untouched. Dependencies resolve in PreStart through atto.Extension, so actors relocate without holding stale pointers.

A2A: serve and call remote agents

a2a.NewServer(rt, ...) puts the same *atto.Runtime behind the A2A protocol: one JSON-RPC handler, one SSE stream per task, and an AgentCard auto-generated from the tools you register. The agent does not change shape; the bridge dispatches into the same SessionActor the runtime drives without A2A.

The cluster guarantees carry across:

  • Per-task SSE topics on a goakt TopicActor. tasks/resubscribe reattaches from any pod. When a reconnect lands on a different node from the one that ran the turn, a TaskRegistryActor resolves taskID → sessionID, replays a synthetic Task from the persisted projection, then forwards live events through the cluster topic. Late reconnectors past the grace window (a2a.WithResubscribeGrace, default 60s) receive ErrTaskNotFound.
  • input-required survives pod loss. A paused turn commits through the same CommitTurn path as a final assistant message. The next message/send referencing the same taskId rehydrates history from the session store and resumes; the resume can land on a different pod than the pause.
  • Task state in the session snapshot. Each A2A task is a projection stored under a reserved __a2a_task_<taskID> key inside session.State. Optimistic concurrency, store reload, and cross-pod placement reuse the same machinery as a normal turn; there is no parallel task-store implementation to keep in sync.
  • a2a.RemoteAgent satisfies agent.Agent. A remote A2A endpoint plugs into a local orchestrator as a sub-agent. Local A → remote B → local C composes; the AgentCard is fetched once and cached at construction time.
  • Auth at the bridge boundary. auth.JWT and auth.APIKey middleware sit in front of the JSON-RPC handler. The authenticated principal flows through context.Context to a ContextIDResolver (default principal/contextID), so two tenants cannot reach each other's sessions even when they reuse a contextId.
rt, _ := atto.New(ctx, model, build,
    atto.WithStore(postgresStore),
    atto.WithCluster(provider, atto.ClusterQuorum(2)),
)

srv, _ := a2a.NewServer(rt,
    a2a.WithSkillFromTool(weather, a2a.SkillTags("weather", "live")),
    a2a.WithAuth(auth.JWT(jwtCfg)),
)
http.Handle("/", srv.Handler())

examples/a2a-cluster runs this bridge on a three-pod kind cluster; the demo script kills the pod that just served a turn and asserts the resume lands on a peer.

Examples

Example What it shows
quickstart Runnable tour with a scripted fake model. No API key required.
clarification Pause-and-resume across two turns of the same session. No API key required.
single-agent Live Gemini plus a real HTTP tool.
multi-agent Coordinator delegates to a specialist ("agent as tool").
cluster Three-pod atto cluster on a local kind cluster.
a2a-cluster A2A bridge on a three-pod kind cluster; demo proves pod-loss survival.
candidate-sourcing Multi-agent recruiter on docker-compose: clustered front-door, two A2A sub-agents wired through a2a.AgentAsTool, dnssd peer discovery, Postgres-backed sessions, web chat UI.

Testing

llm.NewFake replays scripted chunks; tool.Fake records invocations. Together with store/inmemory, they give you deterministic, fast tests. New store backends plug into storetest.Run(t, factory) against the shared contract.

Roadmap and status

The five-concept public API is the stability target through v0.x. v0.1.0 shipped the runtime: sessions, cluster mode, three native model adapters, three in-tree stores, the model actor. v0.2 adds the A2A bridge — serve A2A over JSON-RPC + SSE, call remote agents through a2a.RemoteAgent, and resume across input-required pauses on a different pod. Details live in CHANGELOG.md. v0.3 is the next milestone: push-notification webhooks, gRPC transport, OAuth2 / mTLS auth, OTel spans across A2A hops, and RemoteAgent resume across remote pauses.

Security

See SECURITY.md for the disclosure process.

Community

GitHub Discussions GitHub Issues

Contributing

Bug fixes, adapters, and stores are welcome. Atto uses Conventional Commits and runs go test -race, go vet, and golangci-lint as in CI. See CONTRIBUTING.md.

Why the name?

atto is Italian for "act", and also the metric prefix 10⁻¹⁸.

Packages

 
 
 

Contributors

Languages