Skip to content

Commit 20e75ca

Browse files
authored
Release/v0.0.7 (#16)
fix nats agent session fix linter errors add IngestOne for RAG add Count for vectorstores agent can return multiple handoffs
1 parent d1602a8 commit 20e75ca

29 files changed

Lines changed: 957 additions & 188 deletions

a2a/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,6 @@ func (c *Client) AsTool() (*llm.Tool, error) {
211211
return llm.NewTool(agent.SanitizeToolName(c.card.Name), c.card.Description, handler)
212212
}
213213

214-
215214
// extractStatusMessage returns the first text part from a task status message,
216215
// or an empty string if the message is nil or contains no text parts.
217216
func extractStatusMessage(msg *sdka2a.Message) string {

a2a/message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121

2222
sdka2a "github.com/a2aproject/a2a-go/v2/a2a"
2323

24-
"github.com/henomis/phero/agent"
2524
pheroA2A "github.com/henomis/phero/a2a"
25+
"github.com/henomis/phero/agent"
2626
"github.com/henomis/phero/llm"
2727
)
2828

agent/agent.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ type Agent struct {
4444
// Result represents the final output of an agent after processing user input and executing any tool calls.
4545
type Result struct {
4646
// Parts holds the multimodal content of the final assistant message.
47-
Parts []llm.ContentPart
48-
HandoffAgent *Agent
49-
Summary *trace.RunSummary
47+
Parts []llm.ContentPart
48+
// HandoffAgents lists the agents that were handed work; more than one means fan-out.
49+
HandoffAgents []*Agent
50+
Summary *trace.RunSummary
5051
}
5152

5253
// TextContent returns the concatenation of all text parts in the result.
@@ -185,7 +186,7 @@ func (a *Agent) Run(ctx context.Context, parts ...llm.ContentPart) (result *Resu
185186
ctx = trace.WithTracer(ctx, a.tracer)
186187
ctx = trace.WithAgentName(ctx, a.name)
187188
stats := newRunStats(a.name)
188-
handoffAgentName := ""
189+
var handoffAgentNames []string
189190

190191
session, sessionIndex, err := a.prepareSession(ctx, parts, stats)
191192
if err != nil {
@@ -218,7 +219,7 @@ func (a *Agent) Run(ctx context.Context, parts ...llm.ContentPart) (result *Resu
218219
Timestamp: time.Now(),
219220
})
220221

221-
summary := stats.summary(iteration, handoffAgentName, err)
222+
summary := stats.summary(iteration, handoffAgentNames, err)
222223
if result != nil {
223224
result.Summary = summary
224225
}
@@ -249,13 +250,15 @@ func (a *Agent) Run(ctx context.Context, parts ...llm.ContentPart) (result *Resu
249250
}
250251

251252
session = iterationResult.session
252-
if iterationResult.handoffAgent != nil {
253-
handoffAgentName = iterationResult.handoffAgent.Name()
253+
if len(iterationResult.handoffAgents) > 0 {
254+
for _, ha := range iterationResult.handoffAgents {
255+
handoffAgentNames = append(handoffAgentNames, ha.Name())
256+
}
254257
}
255258

256259
// If finalMessage is nil, it means the agent executed tool calls and needs to call the LLM again.
257260
if iterationResult.lastMessage != nil {
258-
return &Result{Parts: iterationResult.lastMessage.Parts, HandoffAgent: iterationResult.handoffAgent}, nil
261+
return &Result{Parts: iterationResult.lastMessage.Parts, HandoffAgents: iterationResult.handoffAgents}, nil
259262
}
260263
}
261264
}
@@ -307,9 +310,9 @@ func (a *Agent) saveSession(ctx context.Context, messages []llm.Message, session
307310

308311
// agentIteration represents the result of one iteration of the agent loop.
309312
type agentIteration struct {
310-
session []llm.Message
311-
lastMessage *llm.Message
312-
handoffAgent *Agent
313+
session []llm.Message
314+
lastMessage *llm.Message
315+
handoffAgents []*Agent
313316
}
314317

315318
// handleAgentIteration executes one iteration of the agent loop: it calls the LLM with the current messages,
@@ -345,26 +348,23 @@ func (a *Agent) handleAgentIteration(ctx context.Context, session []llm.Message,
345348
}
346349
wg.Wait()
347350

348-
// Append results in order; find the first handoff, if any.
349-
var handoffAgent *Agent
351+
// Append results in order; collect all handoffs (fan-out when more than one).
352+
var handoffAgents []*Agent
353+
var handoffMsg *llm.Message
350354
for i, result := range results {
351355
session = append(session, *result)
352-
if hAgent, ok := a.handoffs[toolCalls[i].Function.Name]; ok && handoffAgent == nil {
353-
handoffAgent = hAgent
356+
if hAgent, ok := a.handoffs[toolCalls[i].Function.Name]; ok {
357+
handoffAgents = append(handoffAgents, hAgent)
358+
if handoffMsg == nil {
359+
handoffMsg = result
360+
}
354361
}
355362
}
356363

357-
if handoffAgent != nil {
358-
// All tool results are preserved in session. Return the handoff tool's
364+
if len(handoffAgents) > 0 {
365+
// All tool results are preserved in session. Return the first handoff tool's
359366
// result as lastMessage so callers receive it in Result.Parts.
360-
var handoffMsg *llm.Message
361-
for i, toolCall := range toolCalls {
362-
if _, ok := a.handoffs[toolCall.Function.Name]; ok {
363-
handoffMsg = results[i]
364-
break
365-
}
366-
}
367-
return agentIteration{session: session, lastMessage: handoffMsg, handoffAgent: handoffAgent}, nil
367+
return agentIteration{session: session, lastMessage: handoffMsg, handoffAgents: handoffAgents}, nil
368368
}
369369

370370
return agentIteration{session: session}, nil

agent/agent_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,8 @@ func TestRun_Simple(t *testing.T) {
262262
if result.TextContent() != "Hello, world!" {
263263
t.Fatalf("expected %q, got %q", "Hello, world!", result.TextContent())
264264
}
265-
if result.HandoffAgent != nil {
266-
t.Fatalf("expected no handoff agent, got %v", result.HandoffAgent)
265+
if len(result.HandoffAgents) != 0 {
266+
t.Fatalf("expected no handoff agents, got %v", result.HandoffAgents)
267267
}
268268
}
269269

@@ -436,11 +436,11 @@ func TestRun_Handoff(t *testing.T) {
436436
if err != nil {
437437
t.Fatalf("Run: unexpected error: %v", err)
438438
}
439-
if result.HandoffAgent == nil {
440-
t.Fatal("expected HandoffAgent to be set")
439+
if len(result.HandoffAgents) == 0 {
440+
t.Fatal("expected HandoffAgents to be set")
441441
}
442-
if result.HandoffAgent.Name() != "worker" {
443-
t.Fatalf("expected handoff to %q, got %q", "worker", result.HandoffAgent.Name())
442+
if result.HandoffAgents[0].Name() != "worker" {
443+
t.Fatalf("expected handoff to %q, got %q", "worker", result.HandoffAgents[0].Name())
444444
}
445445
}
446446

@@ -561,8 +561,8 @@ func multiToolCallResult(calls ...llm.ToolCall) *llm.Result {
561561

562562
func toolCall(toolName, callID, arguments string) llm.ToolCall {
563563
return llm.ToolCall{
564-
ID: callID,
565-
Type: llm.ToolTypeFunction,
564+
ID: callID,
565+
Type: llm.ToolTypeFunction,
566566
Function: llm.FunctionCall{Name: toolName, Arguments: arguments},
567567
}
568568
}
@@ -676,10 +676,10 @@ func TestRun_HandoffWithPrecedingToolCall(t *testing.T) {
676676
if !regularInvoked {
677677
t.Error("regular_tool was not invoked; expected it to run even when a handoff is in the same batch")
678678
}
679-
if result.HandoffAgent == nil {
680-
t.Fatal("expected HandoffAgent to be set")
679+
if len(result.HandoffAgents) == 0 {
680+
t.Fatal("expected HandoffAgents to be set")
681681
}
682-
if result.HandoffAgent.Name() != "worker" {
683-
t.Errorf("HandoffAgent = %q, want %q", result.HandoffAgent.Name(), "worker")
682+
if result.HandoffAgents[0].Name() != "worker" {
683+
t.Errorf("HandoffAgents[0] = %q, want %q", result.HandoffAgents[0].Name(), "worker")
684684
}
685685
}

agent/trace.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (s *runStats) recordMemorySave(count int, duration time.Duration) {
9494
s.memorySaved += count
9595
}
9696

97-
func (s *runStats) summary(iterations int, handoffAgent string, err error) *trace.RunSummary {
97+
func (s *runStats) summary(iterations int, handoffAgents []string, err error) *trace.RunSummary {
9898
tools := make([]trace.ToolCallSummary, 0, len(s.toolSummaries))
9999
toolNames := make([]string, 0, len(s.toolSummaries))
100100
for toolName := range s.toolSummaries {
@@ -129,7 +129,7 @@ func (s *runStats) summary(iterations int, handoffAgent string, err error) *trac
129129
Memory: s.memoryDuration,
130130
},
131131
Tools: tools,
132-
HandoffAgent: handoffAgent,
132+
HandoffAgents: handoffAgents,
133133
}
134134

135135
if err != nil {

examples/handoff/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ func main() {
168168
break
169169
}
170170

171-
if result.HandoffAgent != nil {
172-
fmt.Printf("[handoff] %s → %s\n", routingAgent.Name(), result.HandoffAgent.Name())
173-
routingAgent = result.HandoffAgent
171+
if len(result.HandoffAgents) > 0 {
172+
fmt.Printf("[handoff] %s → %s\n", routingAgent.Name(), result.HandoffAgents[0].Name())
173+
routingAgent = result.HandoffAgents[0]
174174
// Empty input: the specialist reads context from shared memory.
175175
currentInput = ""
176176
continue

examples/nats-agent/server/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"syscall"
3131
"time"
3232

33+
"github.com/google/uuid"
3334
"github.com/nats-io/nats.go"
3435

3536
"github.com/henomis/phero/agent"
@@ -66,7 +67,7 @@ func main() {
6667

6768
srv, err := natsagent.New(nc, a, *owner, *name,
6869
natsagent.WithAgentID("phero"),
69-
natsagent.WithSession(*name),
70+
natsagent.WithSession(uuid.NewString()),
7071
natsagent.WithHeartbeatInterval(10*time.Second),
7172
)
7273
if err != nil {

examples/rag-chatbot/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func main() {
123123
return
124124
}
125125

126-
if err := ragEngine.Ingest(bootstrapCtx, splitter); err != nil {
126+
if err := ragEngine.IngestOnce(bootstrapCtx, splitter); err != nil {
127127
fmt.Fprintf(os.Stderr, "failed to ingest file: %v\n", err)
128128
return
129129
}

nats/errors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ var (
2020
// ErrNilConn is returned when a nil *nats.Conn is passed to New or NewClient.
2121
ErrNilConn = errors.New("nats: NATS connection must not be nil")
2222

23-
// ErrNilAgent is returned when a nil *agent.Agent is passed to New.
24-
ErrNilAgent = errors.New("nats: agent must not be nil")
23+
// ErrNilHandler is returned when a nil Handler is passed to New.
24+
ErrNilHandler = errors.New("nats: handler must not be nil")
2525

2626
// ErrEmptyOwner is returned when the owner field is empty.
2727
ErrEmptyOwner = errors.New("nats: owner must not be empty")

nats/server.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,40 +21,47 @@ import (
2121
"time"
2222

2323
"github.com/henomis/phero/agent"
24+
"github.com/henomis/phero/llm"
2425
natsclient "github.com/nats-io/nats.go"
2526
natsio "github.com/nats-io/nats.go/micro"
2627
)
2728

2829
const protocolVersion = "0.3"
2930

30-
// Server registers a Phero agent as a NATS micro service implementing the
31+
// Handler is implemented by anything that can process a prompt — *agent.Agent
32+
// and workflow executors both satisfy it.
33+
type Handler interface {
34+
Run(ctx context.Context, parts ...llm.ContentPart) (*agent.Result, error)
35+
}
36+
37+
// Server registers a Handler as a NATS micro service implementing the
3138
// NATS Agent Protocol v0.3. It handles:
3239
//
3340
// - Service registration and discovery via $SRV.PING/INFO.agents (§3, §4).
3441
// - Streaming prompt responses on the prompt endpoint (§5, §6).
3542
// - Heartbeat publication on agents.hb.{agent}.{owner}.{name} (§8).
3643
// - On-demand status replies on the status endpoint (§8.7).
3744
type Server struct {
38-
nc *natsclient.Conn
39-
agent *agent.Agent
40-
cfg *serverConfig
41-
owner string
42-
name string
45+
nc *natsclient.Conn
46+
handler Handler
47+
cfg *serverConfig
48+
owner string
49+
name string
4350

4451
svc natsio.Service
4552
wg sync.WaitGroup
4653
}
4754

48-
// New creates a Server that wraps a ready to serve on NATS.
55+
// New creates a Server that wraps h and serves it on NATS.
4956
// owner and name are required positional arguments (§3.2):
5057
// - owner identifies the operator or account.
5158
// - name is the per-instance label (the 5th token in the subject hierarchy).
52-
func New(nc *natsclient.Conn, a *agent.Agent, owner, name string, opts ...ServerOption) (*Server, error) {
59+
func New(nc *natsclient.Conn, h Handler, owner, name string, opts ...ServerOption) (*Server, error) {
5360
if nc == nil {
5461
return nil, ErrNilConn
5562
}
56-
if a == nil {
57-
return nil, ErrNilAgent
63+
if h == nil {
64+
return nil, ErrNilHandler
5865
}
5966
if owner == "" {
6067
return nil, ErrEmptyOwner
@@ -70,12 +77,17 @@ func New(nc *natsclient.Conn, a *agent.Agent, owner, name string, opts ...Server
7077
}
7178
}
7279

80+
if cfg.session != "" {
81+
name = name + "-" + cfg.session
82+
cfg.session = name
83+
}
84+
7385
return &Server{
74-
nc: nc,
75-
agent: a,
76-
cfg: cfg,
77-
owner: owner,
78-
name: name,
86+
nc: nc,
87+
handler: h,
88+
cfg: cfg,
89+
owner: owner,
90+
name: name,
7991
}, nil
8092
}
8193

@@ -186,6 +198,9 @@ func (s *Server) processPrompt(ctx context.Context, req natsio.Request) {
186198
return
187199
}
188200

201+
// Mandatory first message: ack before any latency-inducing work (§6.4).
202+
_ = req.Respond(encodeStatusChunk("ack"))
203+
189204
// Keepalive: emit periodic ack chunks so the caller's inactivity timeout
190205
// does not fire during long-running agent work (§6.4).
191206
kaCtx, kaCancel := context.WithCancel(ctx)
@@ -203,7 +218,7 @@ func (s *Server) processPrompt(ctx context.Context, req natsio.Request) {
203218
}
204219
})
205220

206-
result, runErr := s.agent.Run(ctx, parts...)
221+
result, runErr := s.handler.Run(ctx, parts...)
207222

208223
// Stop keepalive before emitting the response/terminator so ack chunks
209224
// never race the terminator or the response chunk.

0 commit comments

Comments
 (0)