Skip to content

Commit d1602a8

Browse files
authored
release v0.0.6
- nats memory support - improve a2a protocol - nats-agents protocol support - enable parallel tool calling
1 parent 536f98d commit d1602a8

58 files changed

Lines changed: 5910 additions & 210 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ go.work.sum
3333

3434
.github/skills
3535
.github/copilot-instructions.md
36-
TODO.md
36+
TODO.md
37+
CLAUDE.md

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Phero is a modern Go framework for building multi-agent AI systems. Like ants in
2727
- **🤝 Agent orchestration** Multi-agent workflows with role specialization, coordination, and runtime handoffs
2828
- **🔀 Agent handoffs** Transfer control between agents at runtime; `Result.HandoffAgent` tells you where to route next
2929
- **🌐 A2A protocol** Expose any agent as an HTTP A2A server, or call remote A2A agents as local tools
30+
- **🔀 NATS Agent Protocol** Register agents as NATS micro services and discover/call them over pub/sub; wire-compatible with TypeScript and Python SDKs
3031
- **🧩 LLM abstraction** Work with OpenAI-compatible endpoints (OpenAI, Ollama, etc.) and Anthropic
3132
- **🖼️ Multimodal input** Mix text and images with typed content parts (`llm.Text`, `llm.ImageURL`, `llm.ImageFile`)
3233
- **🔊 Audio I/O** OpenAI backend supports speech-to-text and text-to-speech via `llm.Transcriber` and `llm.SpeechSynthesizer`
@@ -77,7 +78,7 @@ Phero is organized into focused packages, each solving a specific problem:
7778
### 🤖 Agent Layer
7879

7980
- **`agent`** Core orchestration for LLM-based agents with tool execution, chat loops, and runtime handoffs
80-
- **`memory`** Conversational context management for multi-turn interactions (in-process, file-backed, RAG-backed, or PostgreSQL-backed)
81+
- **`memory`** Conversational context management for multi-turn interactions (in-process, file-backed, RAG-backed, PostgreSQL-backed, or NATS JetStream KV-backed)
8182

8283
### 💬 LLM Layer
8384

@@ -103,6 +104,7 @@ Phero is organized into focused packages, each solving a specific problem:
103104
- **`skill`** Parse SKILL.md files and expose them as agent capabilities
104105
- **`mcp`** Model Context Protocol adapter for external tool integration
105106
- **`a2a`** Agent-to-Agent (A2A) protocol — expose agents as HTTP servers or call remote agents as tools
107+
- **`nats`** NATS Agent Protocol v0.3 — register agents as NATS micro services; discover and call them over pub/sub
106108
- **`trace`** Typed observability events; `trace/text` for human-readable colorized output; `trace/jsonfile` for NDJSON file logging; `trace.NewLLM` for raw LLM call wrapping
107109
- **`tool/agent`** Create and run a sub-agent at runtime as a delegated tool
108110
- **`tool/file`** Filesystem tools (`read`, `write`, `edit`, `glob`, `grep`)
@@ -124,9 +126,13 @@ Comprehensive examples are included in the [`examples/`](examples/) directory:
124126
| [LLM Middleware](examples/llm-middleware/) | Wrap an LLM with composable middleware for logging and other cross-cutting concerns |
125127
| [Conversational Agent](examples/conversational-agent/) | REPL-style chatbot with short-term conversational memory and a simple built-in tool |
126128
| [Long-Term Memory](examples/long-term-memory/) | REPL-style chatbot with semantic long-term memory (RAG) backed by Qdrant |
129+
| [NATS Memory](examples/nats-memory/) | Persistent chatbot backed by NATS JetStream KV; conversation survives process restarts and supports named sessions |
127130
| [Handoff](examples/handoff/) | One agent hands work off to a specialist agent at runtime using the built-in handoff mechanism |
128131
| [A2A Server](examples/a2a/server/) | Expose a Phero agent as an A2A-compliant HTTP server for cross-process agent calls |
129132
| [A2A Client](examples/a2a/client/) | Connect to a remote A2A agent and use it as a local tool inside an orchestrator |
133+
| [A2A Multi-Agent Newsroom](examples/a2a/multi-agent/) | Three specialised agents (researcher, writer, editor) each running as an independent A2A server, coordinated by a local orchestrator |
134+
| [NATS Agent](examples/nats-agent/) | Register a Phero agent as a NATS micro service and interact with it from an interactive client using the NATS Agent Protocol |
135+
| [NATS Multi-Agent Newsroom](examples/nats-agent/multi-agent/) | Three specialised agents running as NATS micro services, orchestrated via service discovery and `Client.AsTool()` |
130136
| [Debate Committee](examples/debate-committee/) | Multi-agent architecture where committee members debate independently and a judge synthesizes the final decision |
131137
| [Evaluator-Optimizer](examples/evaluator-optimizer/) | Iterative generation loop where an optimizer proposes drafts and an evaluator critiques them until quality criteria are met |
132138
| [Human-in-the-Loop](examples/human-in-the-loop/) | Multi-agent flow that pauses for explicit human approval/input before continuing |

a2a/client.go

Lines changed: 158 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,78 @@ package a2a
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"net/url"
21+
"time"
2022

2123
sdka2a "github.com/a2aproject/a2a-go/v2/a2a"
2224
"github.com/a2aproject/a2a-go/v2/a2aclient"
2325
"github.com/a2aproject/a2a-go/v2/a2aclient/agentcard"
2426

27+
"github.com/henomis/phero/agent"
2528
"github.com/henomis/phero/llm"
2629
)
2730

2831
// ClientOption configures a [Client].
2932
type ClientOption func(*clientConfig)
3033

3134
type clientConfig struct {
32-
resolver *agentcard.Resolver
35+
resolver *agentcard.Resolver
36+
pushConfig *sdka2a.PushConfig
37+
acceptedOutputModes []string
38+
preferredTransports []sdka2a.TransportProtocol
39+
interceptors []a2aclient.CallInterceptor
40+
pollingInterval time.Duration
3341
}
3442

3543
// WithResolver overrides the default [agentcard.Resolver] used to fetch the
3644
// remote AgentCard.
3745
func WithResolver(r *agentcard.Resolver) ClientOption {
46+
return func(c *clientConfig) { c.resolver = r }
47+
}
48+
49+
// WithPushConfig sets the default push notification configuration applied to
50+
// every task sent by this client.
51+
func WithPushConfig(cfg *sdka2a.PushConfig) ClientOption {
52+
return func(c *clientConfig) { c.pushConfig = cfg }
53+
}
54+
55+
// WithAcceptedOutputModes declares the MIME types the client can consume.
56+
// Agents may use this to decide which output format to produce.
57+
func WithAcceptedOutputModes(modes ...string) ClientOption {
58+
return func(c *clientConfig) { c.acceptedOutputModes = modes }
59+
}
60+
61+
// WithPreferredTransports sets the ordered list of preferred transport protocols.
62+
// The first protocol supported by both client and server will be selected.
63+
func WithPreferredTransports(protocols ...sdka2a.TransportProtocol) ClientOption {
64+
return func(c *clientConfig) { c.preferredTransports = protocols }
65+
}
66+
67+
// WithClientInterceptors registers one or more [a2aclient.CallInterceptor] values
68+
// that run before and after every outgoing A2A call. Use this to inject
69+
// authentication headers (e.g. Authorization: Bearer …), distributed tracing
70+
// spans, or custom logging.
71+
func WithClientInterceptors(interceptors ...a2aclient.CallInterceptor) ClientOption {
72+
return func(c *clientConfig) { c.interceptors = append(c.interceptors, interceptors...) }
73+
}
74+
75+
// WithPollingInterval sets the interval used when polling GetTask for completion
76+
// after a streaming subscription fails or is unavailable. Defaults to 500 ms.
77+
// Non-positive values are ignored and the default is kept.
78+
func WithPollingInterval(d time.Duration) ClientOption {
3879
return func(c *clientConfig) {
39-
c.resolver = r
80+
if d > 0 {
81+
c.pollingInterval = d
82+
}
4083
}
4184
}
4285

4386
// Client wraps a remote A2A agent and can expose it as an [llm.Tool].
4487
type Client struct {
4588
card *sdka2a.AgentCard
4689
client *a2aclient.Client
90+
cfg *clientConfig
4791
}
4892

4993
// NewClient resolves the AgentCard at baseURL and creates a transport-agnostic
@@ -63,7 +107,8 @@ func NewClient(ctx context.Context, baseURL string, opts ...ClientOption) (*Clie
63107
}
64108

65109
cfg := &clientConfig{
66-
resolver: agentcard.DefaultResolver,
110+
resolver: agentcard.DefaultResolver,
111+
pollingInterval: 500 * time.Millisecond,
67112
}
68113

69114
for _, o := range opts {
@@ -75,18 +120,43 @@ func NewClient(ctx context.Context, baseURL string, opts ...ClientOption) (*Clie
75120
return nil, err
76121
}
77122

78-
c, err := a2aclient.NewFromCard(ctx, card)
123+
clientCfg := a2aclient.Config{
124+
AcceptedOutputModes: cfg.acceptedOutputModes,
125+
PreferredTransports: cfg.preferredTransports,
126+
}
127+
if cfg.pushConfig != nil {
128+
clientCfg.PushConfig = cfg.pushConfig
129+
}
130+
131+
factoryOpts := []a2aclient.FactoryOption{a2aclient.WithConfig(clientCfg)}
132+
if len(cfg.interceptors) > 0 {
133+
factoryOpts = append(factoryOpts, a2aclient.WithCallInterceptors(cfg.interceptors...))
134+
}
135+
136+
c, err := a2aclient.NewFromCard(ctx, card, factoryOpts...)
79137
if err != nil {
80138
return nil, err
81139
}
82140

83-
return &Client{card: card, client: c}, nil
141+
return &Client{card: card, client: c, cfg: cfg}, nil
84142
}
85143

86-
// AsTool converts the remote agent into an [llm.Tool] that a phero agent can
87-
// call.
144+
// Card returns the AgentCard resolved for the remote agent.
145+
func (c *Client) Card() *sdka2a.AgentCard {
146+
return c.card
147+
}
148+
149+
// AsTool converts the remote agent into an [llm.Tool] that a phero agent can call.
150+
//
151+
// The tool name and description are taken from the remote AgentCard. The tool
152+
// handler sends a SendMessage request and waits for the task to reach a terminal
153+
// state, transparently handling both synchronous (inline Message) and asynchronous
154+
// (Task-based) responses. Async tasks are resolved via event subscription with a
155+
// polling fallback.
88156
//
89-
// The tool name and description are taken from the remote AgentCard.
157+
// The tool input and output are text-only. Non-text parts in the remote agent's
158+
// response (images, raw bytes) are not surfaced; if the response contains no text
159+
// the tool returns [ErrNoTextContent].
90160
func (c *Client) AsTool() (*llm.Tool, error) {
91161
type toolInput struct {
92162
Input string `json:"input" jsonschema:"description=Instructions or question for the remote agent."`
@@ -108,6 +178,28 @@ func (c *Client) AsTool() (*llm.Tool, error) {
108178
return nil, ErrEmptyResponse
109179
}
110180

181+
// If the server returned a non-terminal Task, wait until it completes.
182+
if task, ok := result.(*sdka2a.Task); ok && !task.Status.State.Terminal() {
183+
task, err = c.waitForTask(ctx, task)
184+
if err != nil {
185+
return nil, err
186+
}
187+
result = task
188+
}
189+
190+
// Translate task failure/cancellation to errors.
191+
if task, ok := result.(*sdka2a.Task); ok {
192+
switch task.Status.State {
193+
case sdka2a.TaskStateFailed:
194+
if reason := extractStatusMessage(task.Status.Message); reason != "" {
195+
return nil, fmt.Errorf("%w: %s", ErrTaskFailed, reason)
196+
}
197+
return nil, ErrTaskFailed
198+
case sdka2a.TaskStateCanceled:
199+
return nil, ErrTaskCanceled
200+
}
201+
}
202+
111203
text, err := extractTextFromResult(result)
112204
if err != nil {
113205
return nil, err
@@ -116,37 +208,73 @@ func (c *Client) AsTool() (*llm.Tool, error) {
116208
return &toolOutput{Output: text}, nil
117209
}
118210

119-
return llm.NewTool(c.card.Name, c.card.Description, handler)
211+
return llm.NewTool(agent.SanitizeToolName(c.card.Name), c.card.Description, handler)
120212
}
121213

122-
// extractTextFromResult extracts the first text content from a SendMessageResult.
123-
//
124-
// A result is either a *sdka2a.Message (when the agent responds inline) or a
125-
// *sdka2a.Task (when the server creates a task to track the work). Both cases
126-
// are handled here.
127-
func extractTextFromResult(result sdka2a.SendMessageResult) (string, error) {
128-
switch v := result.(type) {
129-
case *sdka2a.Message:
130-
for _, part := range v.Parts {
131-
if t := part.Text(); t != "" {
132-
return t, nil
133-
}
214+
215+
// extractStatusMessage returns the first text part from a task status message,
216+
// or an empty string if the message is nil or contains no text parts.
217+
func extractStatusMessage(msg *sdka2a.Message) string {
218+
if msg == nil {
219+
return ""
220+
}
221+
for _, part := range msg.Parts {
222+
if t := part.Text(); t != "" {
223+
return t
134224
}
225+
}
226+
return ""
227+
}
135228

136-
return "", ErrNoTextContent
229+
// waitForTask blocks until the task reaches a terminal state and returns the
230+
// final Task. It first attempts to subscribe to the task's event stream; if
231+
// that fails or the server does not support streaming, it falls back to polling
232+
// GetTask at the configured polling interval (default 500 ms).
233+
func (c *Client) waitForTask(ctx context.Context, task *sdka2a.Task) (*sdka2a.Task, error) {
234+
if task.Status.State.Terminal() {
235+
return task, nil
236+
}
237+
238+
// Try streaming subscription first (works when server supports streaming).
239+
subCtx, cancelSub := context.WithCancel(ctx)
240+
defer cancelSub()
137241

138-
case *sdka2a.Task:
139-
if v.Status.Message != nil {
140-
for _, part := range v.Status.Message.Parts {
141-
if t := part.Text(); t != "" {
142-
return t, nil
242+
for event, err := range c.client.SubscribeToTask(subCtx, &sdka2a.SubscribeToTaskRequest{ID: task.ID}) {
243+
if err != nil {
244+
break
245+
}
246+
switch v := event.(type) {
247+
case *sdka2a.Task:
248+
if v.Status.State.Terminal() {
249+
return v, nil
250+
}
251+
case *sdka2a.TaskStatusUpdateEvent:
252+
if v.Status.State.Terminal() {
253+
t, err := c.client.GetTask(ctx, &sdka2a.GetTaskRequest{ID: task.ID})
254+
if err != nil {
255+
return nil, err
143256
}
257+
return t, nil
144258
}
145259
}
260+
}
146261

147-
return "", ErrNoTextContent
262+
// Fall back to polling.
263+
ticker := time.NewTicker(c.cfg.pollingInterval)
264+
defer ticker.Stop()
148265

149-
default:
150-
return "", ErrNoTextContent
266+
for {
267+
select {
268+
case <-ctx.Done():
269+
return nil, ctx.Err()
270+
case <-ticker.C:
271+
t, err := c.client.GetTask(ctx, &sdka2a.GetTaskRequest{ID: task.ID})
272+
if err != nil {
273+
return nil, err
274+
}
275+
if t.Status.State.Terminal() {
276+
return t, nil
277+
}
278+
}
151279
}
152280
}

0 commit comments

Comments
 (0)