forked from evalstate/fast-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_agent.py
More file actions
2318 lines (2004 loc) · 91.1 KB
/
mcp_agent.py
File metadata and controls
2318 lines (2004 loc) · 91.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Base Agent class that implements the AgentProtocol interface.
This class provides default implementations of the standard agent methods
and delegates operations to an attached FastAgentLLMProtocol instance.
"""
import asyncio
import fnmatch
import re
import time
from abc import ABC
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterable,
Literal,
Mapping,
Sequence,
TypeVar,
Union,
cast,
)
import mcp
from a2a.types import AgentCard, AgentSkill
from mcp.types import (
CallToolResult,
ContentBlock,
EmbeddedResource,
GetPromptResult,
ListToolsResult,
PromptMessage,
ReadResourceResult,
TextContent,
Tool,
)
from pydantic import BaseModel
from fast_agent.agents.agent_types import AgentConfig, AgentType
from fast_agent.agents.llm_agent import DEFAULT_CAPABILITIES
from fast_agent.agents.tool_agent import ToolAgent
from fast_agent.config import MCPServerSettings
from fast_agent.constants import (
HUMAN_INPUT_TOOL_NAME,
SHELL_NOTICE_PREFIX,
should_parallelize_tool_calls,
)
from fast_agent.core.exceptions import AgentConfigError, PromptExitError
from fast_agent.core.logging.logger import get_logger
from fast_agent.interfaces import FastAgentLLMProtocol
from fast_agent.llm.model_database import ModelDatabase
from fast_agent.llm.provider_types import Provider
from fast_agent.llm.terminal_output_limits import (
calculate_terminal_output_limit_for_model,
calculate_terminal_output_limit_for_resolved_model,
)
from fast_agent.mcp.common import (
create_namespaced_name,
get_resource_name,
get_server_name,
is_namespaced_name,
)
from fast_agent.mcp.experimental_session_client import ExperimentalSessionClient
from fast_agent.mcp.mcp_aggregator import (
MCPAggregator,
MCPAttachOptions,
MCPAttachResult,
MCPDetachResult,
NamespacedTool,
ServerStatus,
)
from fast_agent.mcp.mcp_skills_loader import (
load_mcp_skill_manifests,
merge_filesystem_and_mcp_manifests,
)
from fast_agent.mcp.provider_management import (
ProviderManagedMCPState,
build_provider_managed_mcp_state,
split_managed_server_names,
)
from fast_agent.skills import SKILLS_DEFAULT, SkillManifest
from fast_agent.skills.registry import SkillRegistry
from fast_agent.tools.composite_filesystem_runtime import CompositeFilesystemRuntime
from fast_agent.tools.elicitation import (
get_elicitation_tool,
run_elicitation_form,
set_elicitation_input_callback,
)
from fast_agent.tools.filesystem_runtime_protocol import FilesystemRuntime
from fast_agent.tools.local_filesystem_runtime import LocalFilesystemRuntime
from fast_agent.tools.shell_runtime import ShellRuntime
from fast_agent.tools.skill_reader import SkillReader
from fast_agent.types import (
PromptMessageExtended,
RequestParams,
ToolTimingInfo,
)
from fast_agent.ui import console
from fast_agent.ui.message_display_helpers import resolve_highlight_index
from fast_agent.utils.async_utils import gather_with_cancel
# Define a TypeVar for models
ModelT = TypeVar("ModelT", bound=BaseModel)
ItemT = TypeVar("ItemT")
LLM = TypeVar("LLM", bound=FastAgentLLMProtocol)
# Display name overrides for tools shown in the bottom bar
TOOL_DISPLAY_NAMES: dict[str, str] = {
"read_skill": "skill",
}
if TYPE_CHECKING:
from rich.text import Text
from fast_agent.context import Context
from fast_agent.llm.usage_tracking import UsageAccumulator
class McpAgent(ABC, ToolAgent):
"""
A base Agent class that implements the AgentProtocol interface.
This class provides default implementations of the standard agent methods
and delegates LLM operations to an attached FastAgentLLMProtocol instance.
"""
def __init__(
self,
config: AgentConfig,
connection_persistence: bool = True,
context: "Context | None" = None,
**kwargs,
) -> None:
super().__init__(
config=config,
context=context,
**kwargs,
)
configured_servers = tuple(self.config.servers)
server_settings_by_name = None
if context and context.config and context.config.mcp:
server_settings_by_name = context.config.mcp.servers
self._provider_managed_mcp_state = ProviderManagedMCPState()
client_managed_servers = list(configured_servers)
provider_managed_servers: list[str] = []
if server_settings_by_name is not None:
self._provider_managed_mcp_state = build_provider_managed_mcp_state(
agent_config=self.config,
server_settings_by_name=server_settings_by_name,
)
client_managed_servers, provider_managed_servers = split_managed_server_names(
configured_servers,
server_settings_by_name,
)
self._configured_server_names = tuple(configured_servers)
self._provider_managed_server_names = tuple(provider_managed_servers)
# Create aggregator with composition
self._aggregator = MCPAggregator(
server_names=client_managed_servers,
connection_persistence=connection_persistence,
name=self.config.name,
context=context,
config=self.config, # Pass the full config for access to elicitation_handler
**kwargs,
)
self._aggregator.set_supplemental_attached_servers(self._provider_managed_server_names)
# Store the original template - resolved instruction set after build()
self._instruction_template = self.config.instruction
self._instruction = self.config.instruction # Will be replaced by builder output
self.executor = context.executor if context else None
self.logger = get_logger(f"{__name__}.{self._name}")
manifests: list[SkillManifest] = list(self.config.skill_manifests or [])
if (
self.config.skills is SKILLS_DEFAULT
and not manifests
and context
and context.skill_registry
):
try:
manifests = list(context.skill_registry.load_manifests())
except Exception:
manifests = []
self._shell_runtime: ShellRuntime | None = None
self._shell_notice_emitted = False
self._allow_shell_notice = False
self._shell_runtime_enabled = False
self._show_shell_tool_call_id = False
self._defer_shell_display_to_tool_result = False
self._shell_access_modes: tuple[str, ...] = ()
self._bash_tool: Tool | None = None
# Allow external runtime injection (e.g., for ACP terminal support)
self._external_runtime = None
# Allow filesystem runtime injection (e.g., for ACP filesystem support)
self._filesystem_runtime: FilesystemRuntime | None = None
self._skill_manifests: list[SkillManifest] = []
self._skill_map: dict[str, SkillManifest] = {}
self._skill_reader: SkillReader | None = None
self.set_skill_manifests(manifests)
self.skill_registry: SkillRegistry | None = None
if isinstance(self.config.skills, SkillRegistry):
self.skill_registry = self.config.skills
elif self.config.skills is SKILLS_DEFAULT and context and context.skill_registry:
self.skill_registry = context.skill_registry
self._warnings: list[str] = []
self._warning_messages_seen: set[str] = set()
shell_flag_requested = bool(context and getattr(context, "shell_runtime", False))
shell_config_requested = bool(self.config.shell)
skills_configured = bool(self._skill_manifests)
self._shell_runtime_activation_reason: str | None = None
reasons: list[str] = []
if shell_flag_requested:
reasons.append("--shell flag")
if shell_config_requested:
reasons.append("agent config")
if skills_configured:
reasons.append("agent skills configuration")
if reasons:
if reasons == ["agent skills configuration"]:
self._shell_runtime_activation_reason = "because agent skills are configured"
else:
self._shell_runtime_activation_reason = "via " + " and ".join(reasons)
# Derive skills directory from this agent's manifests (respects per-agent config).
# URI-backed (Skills-over-MCP) manifests have no filesystem path, so pick
# the first filesystem-backed manifest rather than indexing [0] blindly.
skills_directory = None
first_fs_manifest = next(
(m for m in self._skill_manifests if m.path is not None), None
)
if first_fs_manifest is not None:
# Path structure: <env>/skills/skill-name/SKILL.md -> parent.parent
skills_directory = first_fs_manifest.path.parent.parent
self._shell_access_modes: tuple[str, ...] = ()
if self._shell_runtime_activation_reason is not None:
modes: list[str] = []
if skills_configured:
modes.append("skills")
if shell_flag_requested:
modes.append("switch")
self._shell_access_modes = tuple(modes)
self._activate_shell_runtime(
self._shell_runtime_activation_reason,
working_directory=self.config.cwd,
skills_directory=skills_directory,
access_modes=self._shell_access_modes,
)
# Store instruction context for template resolution
self._instruction_context: dict[str, str] = {}
self._allow_shell_notice = True
# Store the default request params from config
self._default_request_params = self.config.default_request_params
# set with the "attach" method
self._llm: FastAgentLLMProtocol | None = None
# Instantiate human input tool once if enabled in config
self._human_input_tool: Tool | None = None
if self.config.human_input:
try:
self._human_input_tool = get_elicitation_tool()
except Exception:
self._human_input_tool = None
# Register the MCP UI handler as the elicitation callback so fast_agent.tools can call it
# without importing MCP types. This avoids circular imports and ensures the callback is ready.
try:
from fast_agent.human_input.elicitation_handler import elicitation_input_callback
from fast_agent.human_input.types import HumanInputRequest
async def _mcp_elicitation_adapter(
request_payload: dict,
agent_name: str | None = None,
server_name: str | None = None,
server_info: dict | None = None,
) -> str:
req = HumanInputRequest(**request_payload)
resp = await elicitation_input_callback(
request=req,
agent_name=agent_name,
server_name=server_name,
server_info=server_info,
)
return resp.response if isinstance(resp.response, str) else str(resp.response)
set_elicitation_input_callback(_mcp_elicitation_adapter)
except Exception:
# If UI handler import fails, leave callback unset; tool will error with a clear message
pass
async def __aenter__(self):
"""Initialize the agent and its MCP aggregator."""
await self._aggregator.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Clean up the agent and its MCP aggregator."""
await self._aggregator.__aexit__(exc_type, exc_val, exc_tb)
async def initialize(self) -> None:
"""
Initialize the agent and connect to the MCP servers.
NOTE: This method is called automatically when the agent is used as an async context manager.
"""
await self.__aenter__()
# Discover Skills-over-MCP skills from connected servers and merge
# them with any filesystem manifests before the instruction template
# is built (so the frontmatter lands in {{agentSkills}}).
await self._load_mcp_skill_manifests()
# Apply template substitution to the instruction with server instructions
await self._apply_instruction_templates()
await super().initialize()
async def shutdown(self) -> None:
"""
Shutdown the agent and close all MCP server connections.
NOTE: This method is called automatically when the agent is used as an async context manager.
"""
if self._shutdown_complete:
return
await self._run_lifecycle_hook("on_shutdown")
await self._aggregator.close()
await self._finalize_shutdown(run_hook=False)
def enable_shell(self, working_directory: Path | None = None) -> None:
"""
Enable shell runtime on this agent after creation.
This allows adding shell access to agents loaded from cards or created dynamically.
Args:
working_directory: Optional custom working directory for shell commands.
If not specified, uses the current working directory.
"""
if self._shell_runtime_enabled:
# Already enabled, but update working directory if specified
shell_runtime = self._shell_runtime
if working_directory is not None and shell_runtime is not None:
shell_runtime._working_directory = working_directory
local_runtime = self._local_filesystem_runtime()
if local_runtime is not None:
local_runtime.set_working_directory(working_directory)
self._maybe_enable_local_filesystem_runtime(working_directory)
return
self._activate_shell_runtime(
"via enable_shell() call",
working_directory=working_directory,
access_modes=("[red]direct[/red]",),
)
async def get_server_status(self) -> dict[str, ServerStatus]:
"""Expose server status details for UI and diagnostics consumers."""
if not self._aggregator:
return {}
status_map = await self._aggregator.collect_server_status()
server_settings_by_name = None
if self._context and self._context.config and self._context.config.mcp:
server_settings_by_name = self._context.config.mcp.servers
if not server_settings_by_name:
return status_map
auto_sampling = True
if self._context and self._context.config:
auto_sampling = self._context.config.auto_sampling
for server_name in self._provider_managed_server_names:
if server_name in status_map:
continue
server_cfg = server_settings_by_name.get(server_name)
if server_cfg is None:
continue
roots = server_cfg.roots
elicitation = server_cfg.elicitation
sampling_cfg = server_cfg.sampling
status_map[server_name] = ServerStatus(
server_name=server_name,
transport=server_cfg.transport,
is_connected=True,
instructions_enabled=server_cfg.include_instructions,
roots_configured=bool(roots),
roots_count=len(roots) if roots else 0,
elicitation_mode=elicitation.mode if elicitation else None,
sampling_mode=(
"configured"
if sampling_cfg is not None
else ("auto" if auto_sampling else "off")
),
)
return status_map
async def attach_mcp_server(
self,
*,
server_name: str,
server_config: MCPServerSettings | None = None,
options: MCPAttachOptions | None = None,
) -> MCPAttachResult:
resolved_server_config = server_config
if (
resolved_server_config is None
and self._context
and self._context.config
and self._context.config.mcp
):
resolved_server_config = self._context.config.mcp.servers.get(server_name)
if resolved_server_config is not None and resolved_server_config.management == "provider":
raise AgentConfigError(
f"Provider-managed MCP server '{server_name}' cannot be attached locally."
)
return await self._aggregator.attach_server(
server_name=server_name,
server_config=server_config,
options=options,
)
async def detach_mcp_server(self, server_name: str) -> MCPDetachResult:
return await self._aggregator.detach_server(server_name)
def list_attached_mcp_servers(self) -> list[str]:
return self._unique_preserving_order(self._aggregator.list_attached_servers())
async def list_servers(self) -> list[str]:
return self._unique_preserving_order(
[*self._aggregator.server_names, *self._provider_managed_server_names]
)
@property
def aggregator(self) -> MCPAggregator:
"""Expose the MCP aggregator for UI integrations."""
return self._aggregator
@property
def experimental_sessions(self) -> ExperimentalSessionClient:
"""Expose focused experimental-session cookie controls for this agent."""
return self._aggregator.experimental_sessions
@property
def instruction_template(self) -> str:
"""The original instruction template with placeholders."""
return self._instruction_template or ""
@property
def instruction_context(self) -> dict[str, str]:
"""Context values for instruction template resolution."""
return self._instruction_context
@property
def skill_manifests(self) -> list[SkillManifest]:
"""List of skill manifests configured for this agent."""
return self._skill_manifests
@property
def has_filesystem_runtime(self) -> bool:
"""Whether any filesystem runtime is attached."""
return self._filesystem_runtime is not None
def _local_filesystem_runtime(self) -> LocalFilesystemRuntime | None:
runtime = self._filesystem_runtime
if isinstance(runtime, LocalFilesystemRuntime):
return runtime
if isinstance(runtime, CompositeFilesystemRuntime):
primary = runtime.primary
if isinstance(primary, LocalFilesystemRuntime):
return primary
fallback = runtime.fallback
if isinstance(fallback, LocalFilesystemRuntime):
return fallback
return None
@property
def has_filesystem_read_text_file_tool(self) -> bool:
"""Whether the active filesystem runtime currently exposes read_text_file."""
if self._filesystem_runtime is None:
return False
return any(tool.name == "read_text_file" for tool in self._filesystem_runtime.tools if tool)
@property
def skill_read_tool_name(self) -> str:
"""Tool the model uses to read skill content. Forced to ``read_skill`` when any manifest is URI-backed (only it accepts both filesystem paths and resource URIs)."""
if any(manifest.uri for manifest in self._skill_manifests):
return "read_skill"
return "read_text_file" if self.has_filesystem_read_text_file_tool else "read_skill"
@property
def initialized(self) -> bool:
"""Check if both the agent and aggregator are initialized."""
return self._initialized and self._aggregator.initialized
@initialized.setter
def initialized(self, value: bool) -> None:
"""Set the initialized state of both agent and aggregator."""
self._initialized = value
self._aggregator.initialized = value
async def _apply_instruction_templates(self) -> None:
"""
Apply template substitution to the instruction, including server instructions.
This is called during initialization after servers are connected.
"""
from fast_agent.core.instruction_refresh import (
build_instruction,
format_agent_skills,
resolve_instruction_skill_manifests,
)
if not self._instruction_template:
return
# Build the instruction using the central helper
new_instruction = await build_instruction(
self._instruction_template,
aggregator=self._aggregator,
skill_manifests=resolve_instruction_skill_manifests(self, self._skill_manifests),
skill_read_tool_name=self.skill_read_tool_name,
context=self._instruction_context,
source=self._name,
)
self.set_instruction(new_instruction)
# Warn when skills are configured but not surfaced in the final instruction.
# This check must use the rendered instruction to account for internal
# templates like {{internal:smart_prompt}} that include {{agentSkills}}.
if self._skill_manifests and "{{agentSkills}}" not in self._instruction_template:
formatted_skills = format_agent_skills(
self._skill_manifests,
self.skill_read_tool_name,
)
if formatted_skills and formatted_skills not in new_instruction:
warning_message = f"[dim]Agent '{self._name}' skills are configured but no {{{{agentSkills}}}} in system prompt.[/dim]"
self._record_warning(warning_message, surface="startup_once")
self.logger.debug(f"Applied instruction templates for agent {self._name}")
@staticmethod
def _resolve_shell_working_directory(path: Path) -> Path:
"""Resolve a configured shell working directory for validation messages."""
if path.is_absolute():
return path.resolve()
return (Path.cwd() / path).resolve()
def _warn_if_invalid_shell_working_directory(self, working_directory: Path | None) -> None:
"""Emit a startup warning when a configured shell cwd is missing/invalid."""
if working_directory is None:
return
resolved = self._resolve_shell_working_directory(working_directory)
if not resolved.exists():
self._record_warning(
" ".join(
[
f"[dim]Agent '{self._name}' has shell cwd that does not exist: {resolved}.",
f"Configured cwd: {working_directory}.",
"Shell commands will fail until this path exists.[/dim]",
]
),
surface="startup_once",
)
return
if not resolved.is_dir():
self._record_warning(
" ".join(
[
f"[dim]Agent '{self._name}' has shell cwd that is not a directory: {resolved}.",
f"Configured cwd: {working_directory}.",
"Shell commands will fail until this points to a directory.[/dim]",
]
),
surface="startup_once",
)
async def _load_mcp_skill_manifests(self) -> None:
"""Fetch skills served by connected MCP servers per the Skills-over-MCP SEP.
Merges discovered MCP manifests with any pre-existing filesystem
manifests and updates the skill reader. On name collision, the
filesystem manifest wins (consistent with SkillRegistry dedup).
Disabled per-server via MCPServerSettings.mcp_skills.
"""
server_names = tuple(self._aggregator.server_names or ())
if not server_names:
return
# Collect per-server opt-out from config (default: enabled).
enabled_servers: set[str] | None = None
if self._context and self._context.config and self._context.config.mcp:
server_settings = self._context.config.mcp.servers or {}
enabled_servers = {
name
for name in server_names
if getattr(server_settings.get(name), "mcp_skills", True)
}
if not enabled_servers:
return
try:
mcp_manifests = await load_mcp_skill_manifests(
self._aggregator,
server_names,
enabled_servers=enabled_servers,
)
except Exception as exc:
# Discovery must not break agent startup.
self.logger.error(
"Failed to load MCP skills",
data={"error": str(exc)},
exc_info=True,
)
return
if not mcp_manifests:
return
merged, warnings = merge_filesystem_and_mcp_manifests(
self._skill_manifests, mcp_manifests
)
for message in warnings:
self._record_warning(f"[dim]{message}[/dim]", surface="startup_once")
self.set_skill_manifests(merged)
def set_skill_manifests(self, manifests: Sequence[SkillManifest]) -> None:
self._skill_manifests = list(manifests)
self._skill_map = {manifest.name: manifest for manifest in self._skill_manifests}
if self._skill_manifests:
# The aggregator is only needed when any manifest is URI-backed
# (Skills-over-MCP), but passing it unconditionally is cheap and
# keeps the reader uniform.
self._skill_reader = SkillReader(
self._skill_manifests,
self.logger,
aggregator=self._aggregator,
)
self._ensure_shell_runtime_for_skills()
else:
self._skill_reader = None
def _ensure_shell_runtime_for_skills(self) -> None:
if self._shell_runtime_enabled:
return
if self._external_runtime is not None:
return
# Derive skills directory from manifests (respects per-agent config).
# Skip URI-backed manifests — they have no filesystem root to anchor the shell runtime.
skills_directory = None
first_fs_manifest = next(
(m for m in self._skill_manifests if m.path is not None), None
)
if first_fs_manifest is not None:
skills_directory = first_fs_manifest.path.parent.parent
self._activate_shell_runtime(
"because agent skills are configured",
skills_directory=skills_directory,
working_directory=self.config.cwd,
access_modes=("skills",),
show_shell_notice=True,
)
def _resolve_shell_runtime_settings(self) -> tuple[int, int, int]:
timeout_seconds = 90
warning_interval_seconds = 30
config_output_byte_limit = None
shell_config = None
if self._context and self._context.config:
shell_config = self._context.config.shell_execution
if shell_config:
timeout_seconds = shell_config.timeout_seconds
warning_interval_seconds = shell_config.warning_interval_seconds
config_output_byte_limit = shell_config.output_byte_limit
if config_output_byte_limit is not None:
output_byte_limit = config_output_byte_limit
else:
model_name = self.config.model
if not model_name and self._context and self._context.config:
model_name = self._context.config.default_model
output_byte_limit = calculate_terminal_output_limit_for_model(model_name)
return timeout_seconds, warning_interval_seconds, output_byte_limit
def _shell_read_text_file_enabled(self) -> bool:
"""Return whether shell-enabled agents should expose local read_text_file."""
if not self._context or not self._context.config:
return True
return self._context.config.shell_execution.enable_read_text_file
def _resolve_shell_edit_tool_mode(self) -> Literal["write_text_file", "apply_patch", "off"]:
"""Return which shell edit tool should be exposed for the current model/config."""
default_mode: Literal["write_text_file", "apply_patch"]
if self._prefers_apply_patch_model(self._resolve_shell_tool_model_name()):
default_mode = "apply_patch"
else:
default_mode = "write_text_file"
if not self._context or not self._context.config:
return default_mode
mode_raw = self._context.config.shell_execution.write_text_file_mode
mode = mode_raw.strip().lower() if isinstance(mode_raw, str) else None
if mode == "on":
return "write_text_file"
if mode == "off":
return "off"
if mode == "apply_patch":
return "apply_patch"
if mode == "auto":
return default_mode
return default_mode
def _resolve_shell_tool_model_name(self) -> str | None:
"""Resolve the best-available model name for shell tool policy decisions."""
llm = self._llm
llm_model = llm.model_name if llm is not None else None
if isinstance(llm_model, str) and llm_model.strip():
return llm_model.strip()
model_name = self.config.model
if not model_name and self._context and self._context.config:
model_name = self._context.config.default_model
return model_name
@staticmethod
def _prefers_apply_patch_model(model_name: str | None) -> bool:
"""Return True for Codex and GPT-5.2+ models."""
if not model_name:
return False
normalized = ModelDatabase.normalize_model_name(model_name)
if "codex" in normalized:
return True
match = re.match(r"^gpt-5(?:\.(\d+))?", normalized)
if match is None:
return False
minor = match.group(1)
if minor is None:
return False
return int(minor) >= 2
def _maybe_enable_local_filesystem_runtime(self, working_directory: Path | None = None) -> None:
"""Enable local filesystem runtime when shell mode is active and configured."""
if not self._shell_runtime_enabled:
return
enable_read = self._shell_read_text_file_enabled()
edit_mode = self._resolve_shell_edit_tool_mode()
enable_write = edit_mode == "write_text_file"
enable_apply_patch = edit_mode == "apply_patch"
enable_edit_file = edit_mode == "write_text_file"
local_runtime = self._local_filesystem_runtime()
if local_runtime is not None:
if working_directory is not None:
local_runtime.set_working_directory(working_directory)
local_runtime.set_tool_handler_resolver(self._get_tool_handler)
local_runtime.set_enabled_tools(
enable_read=enable_read,
enable_write=enable_write,
enable_apply_patch=enable_apply_patch,
enable_edit_file=enable_edit_file,
)
return
runtime_working_directory = (
working_directory if working_directory is not None else self.config.cwd
)
local_runtime = LocalFilesystemRuntime(
self.logger,
working_directory=runtime_working_directory,
enable_read=enable_read,
enable_write=enable_write,
enable_apply_patch=enable_apply_patch,
enable_edit_file=enable_edit_file,
tool_handler_resolver=self._get_tool_handler,
)
if self._filesystem_runtime is None:
self._filesystem_runtime = local_runtime
else:
self._filesystem_runtime = CompositeFilesystemRuntime(
primary=self._filesystem_runtime,
fallback=local_runtime,
)
self.logger.info(
"Local filesystem runtime enabled",
runtime_type=type(self._filesystem_runtime).__name__,
read_enabled=enable_read,
write_enabled=enable_write,
apply_patch_enabled=enable_apply_patch,
edit_file_enabled=enable_edit_file,
)
def _shell_output_limit_overridden(self) -> bool:
"""Return True when shell output byte limit is explicitly configured."""
if not self._context or not self._context.config:
return False
return self._context.config.shell_execution.output_byte_limit is not None
def _on_llm_attached(self, llm: FastAgentLLMProtocol) -> None:
super()._on_llm_attached(llm)
if self._provider_managed_mcp_state.has_servers():
if self._provider_managed_mcp_state.has_connectors() and llm.provider != Provider.RESPONSES:
raise AgentConfigError(
"Provider-managed connectors are only supported for the OpenAI Responses provider."
)
if llm.provider not in {Provider.ANTHROPIC, Provider.RESPONSES}:
raise AgentConfigError(
"Provider-managed MCP is only supported for Anthropic Messages "
"and the OpenAI Responses provider."
)
if hasattr(llm, "set_provider_managed_mcp_state"):
cast("Any", llm).set_provider_managed_mcp_state(self._provider_managed_mcp_state)
local_runtime = self._local_filesystem_runtime()
if local_runtime is not None:
edit_mode = self._resolve_shell_edit_tool_mode()
local_runtime.set_enabled_tools(
enable_read=self._shell_read_text_file_enabled(),
enable_write=edit_mode == "write_text_file",
enable_apply_patch=edit_mode == "apply_patch",
enable_edit_file=edit_mode == "write_text_file",
)
if self._shell_runtime is None:
return
if self._shell_output_limit_overridden():
return
output_byte_limit = calculate_terminal_output_limit_for_resolved_model(llm.resolved_model)
self._shell_runtime.set_output_byte_limit(output_byte_limit)
def _activate_shell_runtime(
self,
activation_reason: str | None,
*,
working_directory: Path | None = None,
skills_directory: Path | None = None,
access_modes: tuple[str, ...] = (),
show_shell_notice: bool = False,
) -> None:
if activation_reason is not None and self._external_runtime is not None:
return
self._warn_if_invalid_shell_working_directory(working_directory)
timeout_seconds, warning_interval_seconds, output_byte_limit = (
self._resolve_shell_runtime_settings()
)
self._shell_runtime_activation_reason = activation_reason
self._shell_runtime = ShellRuntime(
activation_reason,
self.logger,
timeout_seconds=timeout_seconds,
warning_interval_seconds=warning_interval_seconds,
skills_directory=skills_directory,
working_directory=working_directory,
output_byte_limit=output_byte_limit,
config=self._context.config if self._context else None,
agent_name=self._name,
)
self._shell_runtime_enabled = self._shell_runtime.enabled
self._bash_tool = self._shell_runtime.tool
self._shell_access_modes = access_modes if self._shell_runtime_enabled else ()
self._maybe_enable_local_filesystem_runtime(working_directory)
if self._shell_runtime_enabled:
self._shell_runtime.announce()
if show_shell_notice and self._allow_shell_notice and not self._shell_notice_emitted:
self._shell_notice_emitted = True
try:
console.console.print(SHELL_NOTICE_PREFIX)
except Exception: # pragma: no cover - console fallback
pass
@property
def shell_runtime_enabled(self) -> bool:
return self._shell_runtime_enabled
@property
def shell_access_modes(self) -> tuple[str, ...]:
return self._shell_access_modes
@property
def shell_runtime(self) -> ShellRuntime | None:
return self._shell_runtime
def shell_notice_line(self) -> "Text | None":
if not self._shell_runtime_enabled or self._shell_runtime is None:
return None
from fast_agent.ui.shell_notice import format_shell_notice
return format_shell_notice(self._shell_access_modes, self._shell_runtime)
def _record_warning(
self,
message: str,
*,
surface: Literal["runtime_toolbar", "startup_once"] = "runtime_toolbar",
) -> None:
if message in self._warning_messages_seen:
return
self._warning_messages_seen.add(message)
self._warnings.append(message)
self.logger.warning(message)
try:
from fast_agent.ui import notification_tracker
notification_tracker.add_warning(message, surface=surface)
except Exception:
pass
@property
def warnings(self) -> list[str]:
return list(self._warnings)
def set_instruction_context(self, context: dict[str, str]) -> None:
"""
Set session-level context variables for instruction template resolution.
This should be called when an ACP session is established to provide
variables like {{env}}, {{workspaceRoot}} etc. that are resolved per-session.
Args:
context: Dict mapping placeholder names to values (e.g., {"env": "...", "workspaceRoot": "/path"})
"""
self._instruction_context.update(context)
self.logger.debug(f"Set instruction context for agent {self._name}: {list(context.keys())}")
async def __call__(
self,
message: Union[
str,
PromptMessage,
PromptMessageExtended,
Sequence[Union[str, PromptMessage, PromptMessageExtended]],
],
) -> str:
return await self.send(message)
def _matches_pattern(self, name: str, pattern: str) -> bool:
"""
Check if a name matches a pattern for a specific server.
Args:
name: The name to match (could be tool name, resource URI, or prompt name)
pattern: The pattern to match against (e.g., "add", "math*", "resource://math/*")
Returns:
True if the name matches the pattern
"""
# For resources and prompts, match directly against the pattern
return fnmatch.fnmatch(name, pattern)
def _filter_namespaced_tools(self, tools: Sequence[Tool] | None) -> list[Tool]:
"""
Apply configuration-based filtering to a collection of tools.
"""
if not tools:
return []
return [
tool
for tool in tools
if is_namespaced_name(tool.name) and self._tool_matches_filter(tool.name)
]
def _filter_server_collections(
self,
items_by_server: Mapping[str, Sequence[ItemT]],
filters: Mapping[str, Sequence[str]] | None,
value_getter: Callable[[ItemT], str],