-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathagent_components.py
More file actions
1753 lines (1405 loc) · 72 KB
/
agent_components.py
File metadata and controls
1753 lines (1405 loc) · 72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from xai_components.base import InArg, OutArg, InCompArg, Component, BaseComponent, xai_component, dynalist, secret, SubGraphExecutor
import traceback
import abc
from collections import deque
from typing import NamedTuple
import json
import os
import requests
import random
import string
import copy
try:
import openai
except Exception as e:
pass
# Optional: If using NumpyMemory need numpy and OpenAI
try:
import numpy as np
except Exception as e:
pass
# Optional: If using vertexai provider.
try:
import vertexai
from vertexai.preview.generative_models import GenerativeModel
except Exception as e:
pass
def random_string(length):
return ''.join(random.choice(string.ascii_letters) for _ in range(length))
def is_openai_model(model_name: str) -> bool:
"""Check if the model is an OpenAI model that supports system messages."""
if not model_name:
return False
if model_name.startswith('gpt-5'):
return False
return model_name.startswith(('o1', 'o3', 'o4', 'gpt'))
def convert_old_tool_syntax_to_xml(text: str) -> str:
"""Convert old TOOL: syntax to new XML format in tool descriptions."""
import re
# Pattern to match old style: TOOL: tool_name args (args optional)
# This pattern captures tool name and optional arguments
# Match the entire line including any trailing spaces but not the newline
old_tool_pattern = r'^(\s*)TOOL:\s+(\w+)(?:\s+(.*))?$'
def replace_match(match):
indent = match.group(1)
tool_name = match.group(2)
args = match.group(3) if match.group(3) else ''
# Convert to XML format
if args:
# If args exist, check if they should be on same line or new line
args = args.strip()
if args:
return f'{indent}<tool name="{tool_name}">\n{indent}{args}\n{indent}</tool>'
else:
return f'{indent}<tool name="{tool_name}"></tool>'
else:
# No args, single line format
return f'{indent}<tool name="{tool_name}"></tool>'
# Replace all occurrences in the text
converted_text = re.sub(old_tool_pattern, replace_match, text, flags=re.MULTILINE)
return converted_text
def parse_xml_args_to_dict(xml_content: str) -> dict:
"""Parse XML-style arguments into a dictionary using a simple SAX-like approach.
Supports format like:
<arg1>value1</arg1>
<arg2>value2</arg2>
Returns None if the content is not valid XML args.
"""
import re
# Strip leading/trailing whitespace
xml_content = xml_content.strip()
# If empty, return None
if not xml_content:
return None
# Simple pattern to match XML tags with content
# This matches: <tag>content</tag>
tag_pattern = r'<(\w+)>(.*?)</\1>'
matches = re.findall(tag_pattern, xml_content, re.DOTALL)
# If no matches found, it's not XML format
if not matches:
return None
# Build dictionary from matches
result = {}
for tag_name, content in matches:
result[tag_name] = content.strip()
# Check if there's any content that's not within tags
# Remove all matched tags and see if there's leftover content
remaining = xml_content
for match in re.finditer(tag_pattern, xml_content, re.DOTALL):
remaining = remaining.replace(match.group(0), '', 1)
# If there's significant non-whitespace content remaining, it's not pure XML
if remaining.strip():
return None
return result
def parse_tool_args(args_str: str) -> tuple:
"""Parse tool arguments and return (parsed_dict, raw_string).
Supports:
1. JSON format: {"arg1": "value1", "arg2": "value2"}
2. XML format: <arg1>value1</arg1><arg2>value2</arg2>
3. Empty args: returns (None, "")
4. Plain text: returns (None, raw_text)
Returns:
tuple: (parsed_dict or None, raw_string)
"""
args_str = args_str.strip()
# Handle empty args
if not args_str:
return (None, "")
# Try JSON first
try:
parsed = json.loads(args_str)
if isinstance(parsed, dict):
return (parsed, args_str)
except:
pass
# Try XML format
xml_dict = parse_xml_args_to_dict(args_str)
if xml_dict is not None:
return (xml_dict, args_str)
# Neither JSON nor XML, return raw content
return (None, args_str)
def encode_prompt(model_id: str, conversation: list):
ret_messages = []
if 'anthropic.claude-3' in model_id.lower():
for message in conversation:
if message['role'] == 'system':
message['role'] = 'user'
if isinstance(message['content'], str):
ret_messages.append({
'role': message['role'],
'content': [
{
'type': 'text',
'text': 'SYSTEM:\n' + message['content']
}
]
})
else:
new_contents = []
for content in message['content']:
if content['type'] == 'image_url':
# f"data:image/jpeg;base64,{base64_image}"
url = content['image_url']['url']
(media_type, rest) = url.split(';', 1)
data = rest.split(',', 1)
media_type = media_type.split(':', 1)[1]
source = {
'type': 'base64',
'media_type': media_type,
'data': data[1]
}
new_contents.append({
'type': 'image',
'source': source
})
else:
new_contents.append({
'type': 'text',
'text': content['text']
})
ret_messages.append({
'role': message['role'],
'content': new_contents
})
return ret_messages
class Memory(abc.ABC):
def query(self, query: str, n: int) -> list:
pass
def add(self, id: str, text: str, metadata: dict) -> None:
pass
class VectoMemoryImpl(Memory):
def __init__(self, vs):
self.vs = vs
def query(self, query: str, n: int) -> list:
return self.vs.lookup(query, 'TEXT', n)
def add(self, id: str, text: str, metadata: dict) -> None:
self.vs.ingest_text(text, metadata)
def get_ada_embedding(text):
s = text.replace("\n", " ")
return openai.Embedding.create(input=[s], model="text-embedding-ada-002")[
"data"
][0]["embedding"]
class NumpyQueryResult(NamedTuple):
id: str
similarity: float
attributes: dict
class NumpyMemoryImpl(Memory):
def __init__(self, vectors=None, ids=None, metadata=None):
self.vectors = vectors
self.ids = ids
self.metadata = metadata
def query(self, query: str, n: int) -> list:
if self.vectors is None:
return []
if isinstance(self.vectors, list) and len(self.vectors) > 1:
self.vectors = np.vstack(self.vectors)
top_k = min(self.vectors.shape[0], n)
query_vector = get_ada_embedding(query)
similarities = self.vectors @ query_vector
indices = np.argpartition(similarities, -top_k)[-top_k:]
return [
NumpyQueryResult(
self.ids[i],
similarities[i],
self.metadata[i]
)
for i in indices
]
def add(self, vector_id: str, text: str, metadata: dict) -> None:
if isinstance(self.vectors, list) and len(self.vectors) > 1:
self.vectors = np.vstack(self.vectors)
if self.vectors is None:
self.vectors = np.array(get_ada_embedding(text)).reshape((1, -1))
self.ids = [vector_id]
self.metadata = [metadata]
else:
self.ids.append(vector_id)
self.vectors = np.vstack([self.vectors, np.array(get_ada_embedding(text))])
self.metadata.append(metadata)
@xai_component
class AgentNumpyMemory(Component):
"""Creates a local and temporary memory for the agent to store and query information.
##### outPorts:
- memory: The Memory to set on AgentInit
"""
memory: OutArg[Memory]
def execute(self, ctx) -> None:
self.memory.value = NumpyMemoryImpl()
class Tool(NamedTuple):
name: str
description: str
inputs: str
outputs: str
class MutableVariable:
_fn: any
def __init__(self):
self._fn = None
def set_fn(self, fn) -> None:
self._fn = fn
@property
def value(self) -> any:
return self._fn()
@xai_component(type="Start", color="red")
class AgentDefineTool(Component):
"""Define a tool that the agent can use when it deems necessary.
This event will be called when the Agent uses this tool. Perform the tool
actions and set the output with AgentToolOutput
##### inPorts:
- tool_name: The name of the tool.
- description: The description of the tool.
- for_toolbelt: The toolbelt to add the tool to. If not set, will be added to the default toolbelt.
##### outPorts:
- tool_input: The input for the tool coming from the agent.
- tool_args: The parsed JSON arguments if the input is valid JSON, otherwise None.
"""
tool_name: InCompArg[str]
description: InCompArg[str]
for_toolbelt: InArg[str]
tool_input: OutArg[str]
tool_args: OutArg[dict]
def init(self, ctx):
toolbelt = self.for_toolbelt.value if self.for_toolbelt.value is not None else 'default'
ctx.setdefault('toolbelt_' + toolbelt, {})[self.tool_name.value] = self
self.tool_ref = InCompArg(None)
def execute(self, ctx) -> None:
other_self = self
class CustomTool(Tool):
name = other_self.tool_name.value
description = other_self.description.value
inputs = ["text"]
output = ["text"]
def __call__(self, prompt):
other_self.tool_input.value = prompt
# Parse arguments using the new flexible parser
parsed_args, _ = parse_tool_args(prompt)
other_self.tool_args.value = parsed_args
SubGraphExecutor(other_self.next).do(ctx)
result = ctx['tool_output']
ctx['tool_output'] = None
return result
self.tool_ref.value = CustomTool(
self.tool_name.value,
self.description.value,
["text"],
["text"]
)
@xai_component(color="red")
class AgentToolOutput(Component):
"""Output the result of the tool to the agent.
##### inPorts:
- results: The results of the tool to be returned to the agent.
"""
results: InArg[dynalist]
def execute(self, ctx) -> None:
if len(self.results.value) == 1:
ctx['tool_output'] = self.results.value[0]
@xai_component
class AgentUseMCPTools(Component):
"""Declare that tools from a pre-configured MCP server should be available to the agent.
This component informs the agent runtime which MCP servers (defined in the system's
MCP configuration) are active for this specific run.
##### inPorts:
- server_name: The exact name of the MCP server as configured in the system.
- toolbelt_name: The conceptual toolbelt group to associate this server with (default: 'default').
"""
server_name: InCompArg[str]
toolbelt_name: InArg[str]
def execute(self, ctx) -> None:
tb_name = self.toolbelt_name.value if self.toolbelt_name.value else 'default'
mcp_server_list_key = f"mcp_servers_{tb_name}"
# Ensure the list exists in the context
if mcp_server_list_key not in ctx:
ctx[mcp_server_list_key] = []
# Add the server name if not already present
if self.server_name.value not in ctx[mcp_server_list_key]:
ctx[mcp_server_list_key].append(self.server_name.value)
@xai_component
class AgentMakeToolbelt(Component):
"""Create a toolbelt for the agent to use.
##### inPorts:
- name: The name of the toolbelt.
##### outPorts:
- toolbelt_spec: The toolbelt to set on AgentInit
"""
name: InArg[str]
toolbelt_spec: OutArg[dict]
def execute(self, ctx) -> None:
standard_tools = {}
mcp_servers = []
toolbelt_name = self.name.value if self.name.value is not None else 'default'
# Process standard tools defined via AgentDefineTool or devcore tools
standard_tool_key = f"toolbelt_{toolbelt_name}"
if standard_tool_key in ctx:
for tool_name, tool_component in ctx[standard_tool_key].items():
# Handle any component that has a tool_ref attribute (AgentDefineTool, devcore tools, etc.)
if hasattr(tool_component, 'tool_ref'):
# Ensure the tool definition is executed to populate tool_ref
tool_component.execute(ctx)
if hasattr(tool_component, 'tool_ref') and tool_component.tool_ref.value:
# Store the actual callable tool instance
standard_tools[tool_component.tool_ref.value.name] = tool_component.tool_ref.value
else:
print(f"Warning: Tool component '{tool_name}' did not produce a tool reference.")
# Note: We ignore components without tool_ref here, MCP servers are handled next
# Retrieve declared MCP server names
mcp_server_list_key = f"mcp_servers_{toolbelt_name}"
if mcp_server_list_key in ctx:
mcp_servers = ctx[mcp_server_list_key]
# Output the combined spec
self.toolbelt_spec.value = {
'standard_tools': standard_tools,
'mcp_servers': mcp_servers
}
@xai_component
class AgentVectoMemory(Component):
"""Creates a memory for the agent to store and query information.
##### inPorts:
- api_key: The API key for Vecto.
- vector_space: The name of the vector space to use.
- initialize: Whether to initialize the vector space.
##### outPorts:
- memory: The Memory to set on AgentInit
"""
api_key: InArg[secret]
vector_space: InCompArg[str]
initialize: InCompArg[bool]
memory: OutArg[Memory]
def execute(self, ctx) -> None:
from vecto import Vecto
api_key = os.getenv("VECTO_API_KEY") if self.api_key.value is None else self.api_key.value
headers = {'Authorization': 'Bearer ' + api_key}
response = requests.get("https://api.vecto.ai/api/v0/account/space", headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to get vector space list: {response.text}")
for space in response.json():
if space['name'] == self.vector_space.value:
vs = Vecto(api_key, space['id'])
if self.initialize.value:
vs.delete_vector_space_entries()
self.memory.value = VectoMemoryImpl(vs)
break
if not self.memory.value:
vs = Vecto(api_key)
model_id = [model for model in vs.list_models() if model.name == 'QWEN2'][0].id
res = requests.post("https://api.vecto.ai/api/v0/account/space", headers=headers, json={
"name": self.vector_space.value,
"modelId": model_id
})
data = res.json()
vs = Vecto(api_key, data['id'])
self.memory.value = VectoMemoryImpl(vs)
# TBD
#@xai_component
#class AgentToolbeltFolder(Component):
# folder: InCompArg[str]
#
# toolbelt_spec: OutArg[list]
#
# def execute(self, ctx) -> None:
# spec = []
# self.toolbelt_spec.value = spec
@xai_component
class AgentInit(Component):
"""Initialize the agent with the necessary components.
##### inPorts:
- agent_name: The name of the agent to create.
- agent_provider: The provider of the agent (Either openai, vertexai, or bedrock).
- agent_model: The model that the agent should use (Such as gpt-3.5-turbo, gemini-pro, or anthropic.claude-3-5-sonnet-20240620-v1:0).
- agent_memory: The memory that the agent should use to store data it wants to remember.
- system_prompt: The system prompt of the agent be sure to speficy
{tool_instruction} and {tools} to explain how to use them.
- max_thoughts: The maximum number of thoughts/tools the agent can use before it must respond to the user.
- toolbelt_spec: The toolbelt the agent has access to.
"""
agent_name: InCompArg[str]
agent_provider: InCompArg[str]
agent_model: InCompArg[str]
agent_memory: InCompArg[Memory]
system_prompt: InCompArg[str]
max_thoughts: InArg[int]
toolbelt_spec: InCompArg[dict]
def execute(self, ctx) -> None:
provider = self.agent_provider.value
if provider not in ['openai', 'vertexai', 'bedrock']:
raise Exception(f"Agent provider '{provider}' is not supported.")
agent_context_key = 'agent_' + self.agent_name.value
ctx[agent_context_key] = {
# Store standard tools under 'agent_toolbelt'
'agent_toolbelt': self.toolbelt_spec.value.get('standard_tools', {}),
# Store the list of enabled MCP server names
'enabled_mcp_servers': self.toolbelt_spec.value.get('mcp_servers', []),
'agent_provider': provider,
'agent_memory': self.agent_memory.value,
'agent_model': self.agent_model.value,
'agent_system_prompt': self.system_prompt.value,
'max_thoughts': self.max_thoughts.value if self.max_thoughts.value is not None else 5 # Default max_thoughts
}
@xai_component
class AgentUpdate(Component):
agent_name: InCompArg[str]
new_agent_provider: InArg[str]
new_agent_model: InArg[str]
new_system_prompt: InArg[str]
def execute(self, ctx) -> None:
if self.agent_provider.value != 'openai' and self.agent_provider.value != 'vertexai' and self.agent_provider.value != 'bedrock':
raise Exception(f"agent provider: {self.agent_provider.value} is not supported in this version of xai_agent.")
agent = ctx['agent_' + self.agent_name.value]
if self.new_agent_provider.value is not None:
agent['agent_provider'] = self.new_agent_provider.value
if self.new_agent_model.value is not None:
agent['agent_model'] = self.new_agent_model.value
if self.new_system_prompt.value is not None:
agent['agent_system_prompt'] = self.new_system_prompt.value
@xai_component(type="Start", color="purple") # Using purple for Start nodes
class AgentDefineRoute(Component):
"""Defines the starting point for a specific agent processing flow.
The AgentRouter can use the name and description to select this flow.
##### inPorts:
- agent_flow_name: A unique name for this agent route.
- description: A description of what this agent route does, used by the router.
"""
agent_flow_name: InCompArg[str]
description: InCompArg[str]
# This component doesn't have direct outputs, it triggers a subgraph execution.
def init(self, ctx):
super().init(ctx)
if not self.agent_flow_name.value:
print("Warning: AgentDefineRoute requires an agent_flow_name.")
return
# Register this start point for the router
start_points = ctx.setdefault('registered_agent_routes', {})
start_points[self.agent_flow_name.value] = {
'description': self.description.value,
'component': self
}
print(f"Registered agent start: {self.agent_flow_name.value}")
def execute(self, ctx) -> None:
# This method is called when the router selects this flow.
# The actual agent logic starts from the components connected *after* this one.
# The conversation should ideally be passed through the context by the router.
print(f"Executing AgentDefineRoute: {self.agent_flow_name.value}")
# The SubGraphExecutor in AgentRouter will handle running the subsequent components.
pass
@xai_component
class AgentRouter(Component):
"""An agent that routes a conversation to a specific agent route based on available AgentDefineRoute components.
It uses its own configured agent (specified by agent_name) to decide which flow is most appropriate.
##### inPorts:
- agent_name: The name of the agent configuration to use for the routing decision.
- conversation: The conversation history to be routed.
- final_context_key: The key in the context where the chosen agent route is expected to place its final result (default: 'agent_flow_result').
##### outPorts:
- result: The final result produced by the selected agent route.
- chosen_flow_name: The name of the agent route that was chosen.
"""
agent_name: InCompArg[str]
conversation: InArg[list]
final_context_key: InArg[str]
result: OutArg[any]
chosen_flow_name: OutArg[str]
# Reusing parts of AgentRun's logic for the LLM call might be needed.
# For simplicity, let's define a basic LLM call here.
# This assumes access to similar helper functions or context setup as AgentRun.
def execute(self, ctx) -> None:
agent_context_key = 'agent_' + self.agent_name.value
if agent_context_key not in ctx:
raise ValueError(f"Agent configuration '{self.agent_name.value}' not found. Initialize it with AgentInit first.")
agent_config = ctx[agent_context_key]
provider = agent_config['agent_provider']
model_name = agent_config['agent_model']
# 1. Find available agent routes (AgentDefineRoute components)
registered_starts = ctx.get('registered_agent_routes', {})
if not registered_starts:
raise ValueError("No agent routes (AgentDefineRoute components) found registered in the context.")
# 2. Prepare descriptions for the routing prompt
flow_descriptions = []
for name, data in registered_starts.items():
flow_descriptions.append(f"- Name: {name}\n Description: {data['description']}")
available_flows_text = "\n".join(flow_descriptions)
# 3. Construct the prompt for the router agent
routing_prompt = f"""You are a routing agent. Based on the following conversation history, choose the most appropriate agent route to handle the latest user request.
Conversation History:
{json.dumps(self.conversation.value, indent=2)}
Available agent routes:
{available_flows_text}
Respond ONLY with the exact 'Name' of the best agent route to use. Do not add any explanation or other text."""
# Prepare conversation for the LLM call
router_conversation = [
{"role": "system", "content": "You are an expert routing agent."}, # Basic system prompt
{"role": "user", "content": routing_prompt}
]
# 4. Call the LLM using the shared dispatch function
chosen_flow = None
print(f"Attempting routing with provider: {provider}, model: {model_name}")
try:
# Use a low temperature for deterministic routing
response = _dispatch_llm_call(ctx, provider, model_name, router_conversation, temperature=0.1)
chosen_flow = response['content'].strip()
except Exception as e:
print(f"Error calling LLM for routing via provider {provider}: {e}")
traceback.print_exc()
raise ConnectionError(f"Failed to get routing decision from {provider}: {e}")
if not chosen_flow or chosen_flow not in registered_starts:
available_keys = list(registered_starts.keys())
# Sometimes the model might return the name in quotes, try stripping them
if chosen_flow and chosen_flow.startswith('"') and chosen_flow.endswith('"'):
chosen_flow = chosen_flow[1:-1]
elif chosen_flow and chosen_flow.startswith("'") and chosen_flow.endswith("'"):
chosen_flow = chosen_flow[1:-1]
if not chosen_flow or chosen_flow not in available_keys:
print(f"Router LLM returned an invalid route name: '{chosen_flow}'. Available: {available_keys}")
raise ValueError(f"AgentRouter failed to select a valid agent route. Response: '{chosen_flow}'")
print(f"AgentRouter chose route: {chosen_flow}")
self.chosen_flow_name.value = chosen_flow
# 5. Get the chosen AgentDefineRoute component
chosen_start_component = registered_starts[chosen_flow]['component']
# 6. Execute the chosen subgraph
ctx['current_conversation_for_flow'] = self.conversation.value
result_key = self.final_context_key.value if self.final_context_key.value else 'agent_flow_result'
ctx[result_key] = None # Clear any previous result
print(f"Executing subgraph starting from: {chosen_flow}")
try:
# Execute the graph starting from the component *after* the AgentDefineRoute node
if chosen_start_component.next:
SubGraphExecutor(chosen_start_component.next).do(ctx)
else:
print(f"Warning: Chosen AgentDefineRoute '{chosen_flow}' has no connected components to execute.")
# Set a default result or handle as needed
ctx[result_key] = "No components connected to the chosen start flow."
except Exception as e:
print(f"Error executing chosen agent route '{chosen_flow}': {e}")
traceback.print_exc()
self.result.value = f"Error during execution of flow '{chosen_flow}': {e}"
raise # Re-raise to signal failure upstream
# 7. Retrieve the result from the context
final_result = ctx.get(result_key)
print(f"Subgraph execution finished. Result found in ctx['{result_key}']: {final_result}")
self.result.value = final_result
# Optional: Clean up context
# if 'current_conversation_for_flow' in ctx: del ctx['current_conversation_for_flow']
# if result_key in ctx: del ctx[result_key]
# Placeholder for querying core system about MCP tools
# In a real implementation, this would interact with the framework/environment
def query_core_system_for_mcp_tools(server_name: str) -> list:
"""Placeholder: Queries the core system for tools provided by a specific MCP server."""
print(f"[Placeholder] Querying core system for tools from MCP server: {server_name}")
# Example response structure - replace with actual system interaction
if server_name == "filesystem": # Example
return [{"name": "readFile", "description": "Reads a file"}, {"name": "writeFile", "description": "Writes a file"}]
if server_name == "weather": # Example
return [{"name": "get_forecast", "description": "Gets weather forecast"}]
return []
def make_tools_prompt(standard_toolbelt: dict, enabled_mcp_servers: list, metadata: dict, provided_system: str=None) -> dict:
"""Generates the tool descriptions for the system prompt, including standard and MCP tools."""
tool_desc_parts = []
# 1. Add standard tools
for key, value in standard_toolbelt.items():
# Ensure value has a description attribute
desc = getattr(value, 'description', 'No description available.')
# Convert any old TOOL: syntax to new XML format
desc = convert_old_tool_syntax_to_xml(desc)
tool_desc_parts.append(f'{key}: {desc}')
# 2. Add MCP tools
for server_name in enabled_mcp_servers:
try:
mcp_tools = query_core_system_for_mcp_tools(server_name)
for tool_info in mcp_tools:
# Optionally prefix with server name for clarity if needed: f'{server_name}.{tool_info["name"]}'
desc = convert_old_tool_syntax_to_xml(tool_info["description"])
tool_desc_parts.append(f'{tool_info["name"]}: {desc}')
except Exception as e:
print(f"Error querying MCP tools for server '{server_name}': {e}")
tool_desc_parts.append(f"# Error loading tools for MCP server: {server_name}")
tools_string = '\n'.join(tool_desc_parts)
# Memory tool descriptions with both JSON and XML examples
recall = 'lookup_memory: Fuzzily looks up a previously remembered JSON memo in your memory.\nEXAMPLE:\n\nUSER:\nWhat things did I have to do today?\nASSISTANT:\n<tool name="lookup_memory">\n{"query":"todo list"}\n</tool>\nSYSTEM:\n[{"id": 1, "summary": Todo List for Februrary", "tasks": [{"title": "Send invoices", "due_date":"2025-02-01"}]}]\nASSISTANT:\n<tool name="get_current_time">\n</tool>\nSYSTEM:\n2024-02-01T09:30:03\nASSISTANT:\nLooks like you just had to send invoices today.\n\nAlternative XML format:\n<tool name="lookup_memory">\n<query>todo list</query>\n</tool>'
remember = 'create_memory: Remembers a new json note for the future. Always provide json with a summary prompt that will serve as the lookup vector. The summary and entire json can be remembered later with lookup_memory.\nEXAMPLE:\n\nUSER:\nRemind me to send invoices on the first of Feburary.\nASSISTANT:\n<tool name="create_memory">\n{ "summary": "todo List for Februrary", "tasks": [{"title": "Send invoices", "due_date":"2025-02-01"}]}\n</tool>\n\nAlternative XML format:\n<tool name="create_memory">\n<summary>todo List for Februrary</summary>\n<tasks>[{"title": "Send invoices", "due_date":"2025-02-01"}]</tasks>\n</tool>'
return {
'tools': tools_string,
'lookup_memory': recall,
'create_memory': remember,
'memory': recall + remember,
'tool_instruction': 'To use a tool, use XML tags like this: <tool name="tool_name">arguments</tool>. Arguments can span multiple lines and can be in JSON format ({"arg": "value"}) or XML format (<arg>value</arg>). For tools with no arguments, use empty tags: <tool name="tool_name"></tool>. The system will respond with the results.',
'metadata': metadata,
'provided_system': provided_system
}
def conversation_to_vertexai(conversation) -> str:
ret = ""
for message in conversation:
ret += message['role'] + ":" + message['content']
ret += "\n\n"
return ret
@xai_component
class AgentMergeSystem(Component):
new_system_prompt: InCompArg[str]
conversation: InCompArg[any]
out_conversation: OutArg[list]
def execute(self, ctx) -> None:
new_system_prompt = self.new_system_prompt.value
conversation = copy.deepcopy(self.conversation.value)
if conversation[0]['role'] != 'system':
conversation.insert(0, {'role': 'system', 'content': new_system_prompt })
else:
provided_system = conversation[0]['content']
conversation[0]['content'] = provided_system + '\n\n' + new_system_prompt
self.out_conversation.value = conversation
# --- Standalone LLM Call Functions ---
def _run_llm_bedrock(ctx, model_name, conversation, temperature):
"""Calls the Bedrock API (Anthropic models)."""
print("Calling Bedrock (Anthropic)...")
# Ensure bedrock_client is available in context (initialized elsewhere)
bedrock_client = ctx.get('bedrock_client')
if bedrock_client is None:
raise Exception("Bedrock client has not been authorized or is not found in context.")
system = conversation[0]['content'] if conversation and conversation[0]['role'] == 'system' else None
# Pass only non-system messages to encode_prompt if it expects that
messages_to_encode = conversation[1:] if system else conversation
messages = encode_prompt(model_name, messages_to_encode) # Assumes encode_prompt handles the format
body_data = {
"messages": messages,
"max_tokens": 8192, # Consider making this configurable
"temperature": temperature,
"anthropic_version": "bedrock-2023-05-31" # Check if this needs updates for newer models
}
# Conditionally add system prompt if it exists
if system:
body_data["system"] = system
body = json.dumps(body_data)
try:
api_response = bedrock_client.invoke_model(
body=body,
modelId=model_name,
accept="application/json",
contentType="application/json"
)
response_body = json.loads(api_response.get('body').read())
# Handle potential variations in response structure (Anthropic specific)
if 'content' in response_body and isinstance(response_body['content'], list) and len(response_body['content']) > 0:
content_block = response_body['content'][0]
if content_block.get('type') == 'text':
text = content_block['text']
else:
# Handle other content types if necessary (e.g., tool_use)
print(f"Warning: Bedrock returned non-text content block: {content_block}")
text = json.dumps(content_block) # Or handle appropriately
elif 'completion' in response_body: # Older completion-style response (less likely for Claude 3+)
text = response_body['completion']
else:
print(f"Warning: Unexpected Bedrock response structure: {response_body}")
raise Exception('Unknown content structure returned from Bedrock model.')
response = {"role": "assistant", "content": text}
print("Bedrock response processed.")
return response
except Exception as e:
print(f"Error during Bedrock API call: {e}")
traceback.print_exc()
raise # Re-raise the exception
def _run_llm_vertexai(ctx, model_name, conversation, temperature):
"""Calls the Vertex AI API."""
print("Calling Vertex AI...")
if 'vertexai' not in globals() or 'GenerativeModel' not in globals():
raise ImportError("Vertex AI library not available or GenerativeModel not imported.")
# Convert conversation format if necessary (using existing helper)
# Ensure conversation_to_vertexai handles the specific format needed by the model
inputs = conversation_to_vertexai(conversation)
model = GenerativeModel(model_name)
try:
result = model.generate_content(
inputs,
generation_config={
"max_output_tokens": 8192, # Consider making configurable
"stop_sequences": ["\n\nsystem:", "\n\nuser:", "\n\nassistant:"], # Standard stops
"temperature": temperature,
"top_p": 1 # Often recommended to adjust temp OR top_p, not both heavily
},
safety_settings=[], # Adjust as needed
stream=False,
)
# Safely access text, handling potential errors or empty responses
text_response = getattr(result, 'text', '')
if not text_response and hasattr(result, 'candidates') and result.candidates:
# Try getting content from the first candidate if text is empty
first_candidate = result.candidates[0]
if hasattr(first_candidate, 'content') and hasattr(first_candidate.content, 'parts'):
text_response = "".join(part.text for part in first_candidate.content.parts if hasattr(part, 'text'))
# Basic parsing, might need refinement based on how Vertex returns roles
# Vertex might just return the assistant's text directly without the role prefix
response_content = text_response.strip()
response = {"role": "assistant", "content": response_content}
print("Vertex AI response processed.")
return response
except Exception as e:
print(f"Error during Vertex AI API call: {e}")
traceback.print_exc()
raise # Re-raise the exception
def _run_llm_openai(ctx, model_name, conversation, temperature):
"""Calls the OpenAI API."""
#print("Calling OpenAI...")
#print(f"Model: {model_name}")
#print(f"Temperature: {temperature}")
#print("Conversation:", flush=True)
#print(conversation, flush=True)
if 'openai' not in globals():
raise ImportError("OpenAI library not available or imported.")
# Clean conversation: remove trailing empty assistant message if present
if conversation and conversation[-1]['role'] == 'assistant' and not conversation[-1]['content']:
conversation = conversation[:-1]
try:
if model_name.startswith('o1') or model_name.startswith('o3') or model_name.startswith('o4') or model_name.startswith('gpt-5'):
reasoning_effort = 'low'
if temperature > 0.3:
reasoning_effort = 'medium'
elif temperature > 0.5:
reasoning_effort = 'high'
completion = openai.chat.completions.create(
model=model_name,
messages=conversation,
max_completion_tokens=8192,
reasoning_effort=reasoning_effort
)
if model_name.startswith('grok-4'):
completion = openai.chat.completions.create(
model=model_name,
messages=conversation,
max_completion_tokens=8192
)
else:
params = {
"model": model_name,
"messages": conversation,
"max_tokens": 8192,
"stop": ["\nsystem:\n", "\nSYSTEM:\n", "\nUSER:\n", "\nASSISTANT:\n"],
"temperature": temperature,
"response_format": { "type": "text" }
}
completion = openai.chat.completions.create(**params)
# Ensure message content is accessed correctly
response_content = ""
if completion.choices and completion.choices[0].message:
response_content = completion.choices[0].message.content or "" # Handle None content
response = {"role": "assistant", "content": response_content}
#print("Got raw response:", flush=True)
#print(response, flush=True)
#print("OpenAI response processed.")
return response
except Exception as e:
print(f"Error during OpenAI API call: {e}")
traceback.print_exc()
raise # Re-raise the exception
def _dispatch_llm_call(ctx, provider, model_name, conversation, temperature):
"""Dispatches the LLM call to the appropriate provider function."""
print(f"Dispatching LLM call to provider: {provider}")
if provider == 'openai':
return _run_llm_openai(ctx, model_name, conversation, temperature)
elif provider == 'vertexai':