-
Notifications
You must be signed in to change notification settings - Fork 623
Expand file tree
/
Copy pathrun_agent.py
More file actions
4819 lines (4228 loc) · 235 KB
/
run_agent.py
File metadata and controls
4819 lines (4228 loc) · 235 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
AI Agent Runner with Tool Calling
This module provides a clean, standalone agent that can execute AI models
with tool calling capabilities. It handles the conversation loop, tool execution,
and response management.
Features:
- Automatic tool calling loop until completion
- Configurable model parameters
- Error handling and recovery
- Message history management
- Support for multiple model providers
Usage:
from run_agent import AIAgent
agent = AIAgent(base_url="http://localhost:30000/v1", model="claude-opus-4-20250514")
response = agent.run_conversation("Tell me about the latest Python updates")
"""
import copy
import hashlib
import json
import logging
logger = logging.getLogger(__name__)
import os
import random
import re
import sys
import time
import threading
from types import SimpleNamespace
import uuid
from typing import List, Dict, Any, Optional
from openai import OpenAI
import fire
from datetime import datetime
from pathlib import Path
# Load .env from ~/.hermes/.env first, then project root as dev fallback
from dotenv import load_dotenv
_hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
_user_env = _hermes_home / ".env"
_project_env = Path(__file__).parent / '.env'
if _user_env.exists():
try:
load_dotenv(dotenv_path=_user_env, encoding="utf-8")
except UnicodeDecodeError:
load_dotenv(dotenv_path=_user_env, encoding="latin-1")
logger.info("Loaded environment variables from %s", _user_env)
elif _project_env.exists():
try:
load_dotenv(dotenv_path=_project_env, encoding="utf-8")
except UnicodeDecodeError:
load_dotenv(dotenv_path=_project_env, encoding="latin-1")
logger.info("Loaded environment variables from %s", _project_env)
else:
logger.info("No .env file found. Using system environment variables.")
# Point mini-swe-agent at ~/.hermes/ so it shares our config
os.environ.setdefault("MSWEA_GLOBAL_CONFIG_DIR", str(_hermes_home))
os.environ.setdefault("MSWEA_SILENT_STARTUP", "1")
# Import our tool system
from model_tools import get_tool_definitions, handle_function_call, check_toolset_requirements
from tools.terminal_tool import cleanup_vm
from tools.interrupt import set_interrupt as _set_interrupt
from tools.browser_tool import cleanup_browser
import requests
from hermes_constants import OPENROUTER_BASE_URL, OPENROUTER_MODELS_URL
# Agent internals extracted to agent/ package for modularity
from agent.prompt_builder import (
DEFAULT_AGENT_IDENTITY, PLATFORM_HINTS,
MEMORY_GUIDANCE, SESSION_SEARCH_GUIDANCE, SKILLS_GUIDANCE,
)
from agent.model_metadata import (
fetch_model_metadata, get_model_context_length,
estimate_tokens_rough, estimate_messages_tokens_rough,
get_next_probe_tier, parse_context_limit_from_error,
save_context_length,
)
from agent.context_compressor import ContextCompressor
from agent.prompt_caching import apply_anthropic_cache_control
from agent.prompt_builder import build_skills_system_prompt, build_context_files_prompt
from agent.display import (
KawaiiSpinner, build_tool_preview as _build_tool_preview,
get_cute_tool_message as _get_cute_tool_message_impl,
_detect_tool_failure,
)
from agent.trajectory import (
convert_scratchpad_to_think, has_incomplete_scratchpad,
save_trajectory as _save_trajectory_to_file,
)
class _SafeWriter:
"""Transparent stdout wrapper that catches OSError from broken pipes.
When hermes-agent runs as a systemd service, Docker container, or headless
daemon, the stdout pipe can become unavailable (idle timeout, buffer
exhaustion, socket reset). Any print() call then raises
``OSError: [Errno 5] Input/output error``, which can crash
run_conversation() — especially via double-fault when the except handler
also tries to print.
This wrapper delegates all writes to the underlying stream and silently
catches OSError. It is installed once at the start of run_conversation()
and is transparent when stdout is healthy (zero overhead on the happy path).
"""
__slots__ = ("_inner",)
def __init__(self, inner):
object.__setattr__(self, "_inner", inner)
def write(self, data):
try:
return self._inner.write(data)
except OSError:
return len(data) if isinstance(data, str) else 0
def flush(self):
try:
self._inner.flush()
except OSError:
pass
def fileno(self):
return self._inner.fileno()
def isatty(self):
try:
return self._inner.isatty()
except OSError:
return False
def __getattr__(self, name):
return getattr(self._inner, name)
class IterationBudget:
"""Thread-safe shared iteration counter for parent and child agents.
Tracks total LLM-call iterations consumed across a parent agent and all
its subagents. A single ``IterationBudget`` is created by the parent
and passed to every child so they share the same cap.
``execute_code`` (programmatic tool calling) iterations are refunded via
:meth:`refund` so they don't eat into the budget.
"""
def __init__(self, max_total: int):
self.max_total = max_total
self._used = 0
self._lock = threading.Lock()
def consume(self) -> bool:
"""Try to consume one iteration. Returns True if allowed."""
with self._lock:
if self._used >= self.max_total:
return False
self._used += 1
return True
def refund(self) -> None:
"""Give back one iteration (e.g. for execute_code turns)."""
with self._lock:
if self._used > 0:
self._used -= 1
@property
def used(self) -> int:
return self._used
@property
def remaining(self) -> int:
with self._lock:
return max(0, self.max_total - self._used)
class AIAgent:
"""
AI Agent with tool calling capabilities.
This class manages the conversation flow, tool execution, and response handling
for AI models that support function calling.
"""
def __init__(
self,
base_url: str = None,
api_key: str = None,
provider: str = None,
api_mode: str = None,
model: str = "anthropic/claude-opus-4.6", # OpenRouter format
max_iterations: int = 90, # Default tool-calling iterations (shared with subagents)
tool_delay: float = 1.0,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
save_trajectories: bool = False,
verbose_logging: bool = False,
quiet_mode: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
log_prefix: str = "",
providers_allowed: List[str] = None,
providers_ignored: List[str] = None,
providers_order: List[str] = None,
provider_sort: str = None,
provider_require_parameters: bool = False,
provider_data_collection: str = None,
session_id: str = None,
tool_progress_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
platform: str = None,
skip_context_files: bool = False,
skip_memory: bool = False,
session_db=None,
honcho_session_key: str = None,
iteration_budget: "IterationBudget" = None,
fallback_model: Dict[str, Any] = None,
checkpoints_enabled: bool = False,
checkpoint_max_snapshots: int = 50,
):
"""
Initialize the AI Agent.
Args:
base_url (str): Base URL for the model API (optional)
api_key (str): API key for authentication (optional, uses env var if not provided)
provider (str): Provider identifier (optional; used for telemetry/routing hints)
api_mode (str): API mode override: "chat_completions" or "codex_responses"
model (str): Model name to use (default: "anthropic/claude-opus-4.6")
max_iterations (int): Maximum number of tool calling iterations (default: 90)
tool_delay (float): Delay between tool calls in seconds (default: 1.0)
enabled_toolsets (List[str]): Only enable tools from these toolsets (optional)
disabled_toolsets (List[str]): Disable tools from these toolsets (optional)
save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False)
verbose_logging (bool): Enable verbose logging for debugging (default: False)
quiet_mode (bool): Suppress progress output for clean CLI experience (default: False)
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 100)
log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "")
providers_allowed (List[str]): OpenRouter providers to allow (optional)
providers_ignored (List[str]): OpenRouter providers to ignore (optional)
providers_order (List[str]): OpenRouter providers to try in order (optional)
provider_sort (str): Sort providers by price/throughput/latency (optional)
session_id (str): Pre-generated session ID for logging (optional, auto-generated if not provided)
tool_progress_callback (callable): Callback function(tool_name, args_preview) for progress notifications
clarify_callback (callable): Callback function(question, choices) -> str for interactive user questions.
Provided by the platform layer (CLI or gateway). If None, the clarify tool returns an error.
max_tokens (int): Maximum tokens for model responses (optional, uses model default if not set)
reasoning_config (Dict): OpenRouter reasoning configuration override (e.g. {"effort": "none"} to disable thinking).
If None, defaults to {"enabled": True, "effort": "medium"} for OpenRouter. Set to disable/customize reasoning.
prefill_messages (List[Dict]): Messages to prepend to conversation history as prefilled context.
Useful for injecting a few-shot example or priming the model's response style.
Example: [{"role": "user", "content": "Hi!"}, {"role": "assistant", "content": "Hello!"}]
platform (str): The interface platform the user is on (e.g. "cli", "telegram", "discord", "whatsapp").
Used to inject platform-specific formatting hints into the system prompt.
skip_context_files (bool): If True, skip auto-injection of SOUL.md, AGENTS.md, and .cursorrules
into the system prompt. Use this for batch processing and data generation to avoid
polluting trajectories with user-specific persona or project instructions.
honcho_session_key (str): Session key for Honcho integration (e.g., "telegram:123456" or CLI session_id).
When provided and Honcho is enabled in config, enables persistent cross-session user modeling.
"""
self.model = model
self.max_iterations = max_iterations
# Shared iteration budget — parent creates, children inherit.
# Consumed by every LLM turn across parent + all subagents.
self.iteration_budget = iteration_budget or IterationBudget(max_iterations)
self.tool_delay = tool_delay
self.save_trajectories = save_trajectories
self.verbose_logging = verbose_logging
self.quiet_mode = quiet_mode
self.ephemeral_system_prompt = ephemeral_system_prompt
self.platform = platform # "cli", "telegram", "discord", "whatsapp", etc.
self.skip_context_files = skip_context_files
self.log_prefix_chars = log_prefix_chars
self.log_prefix = f"{log_prefix} " if log_prefix else ""
# Store effective base URL for feature detection (prompt caching, reasoning, etc.)
# When no base_url is provided, the client defaults to OpenRouter, so reflect that here.
self.base_url = base_url or OPENROUTER_BASE_URL
provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None
self.provider = provider_name or "openrouter"
if api_mode in {"chat_completions", "codex_responses"}:
self.api_mode = api_mode
elif self.provider == "openai-codex":
self.api_mode = "codex_responses"
elif (provider_name is None) and "chatgpt.com/backend-api/codex" in self.base_url.lower():
self.api_mode = "codex_responses"
self.provider = "openai-codex"
else:
self.api_mode = "chat_completions"
self.tool_progress_callback = tool_progress_callback
self.thinking_callback = thinking_callback
self.reasoning_callback = reasoning_callback
self.clarify_callback = clarify_callback
self.step_callback = step_callback
self._last_reported_tool = None # Track for "new tool" mode
# Interrupt mechanism for breaking out of tool loops
self._interrupt_requested = False
self._interrupt_message = None # Optional message that triggered interrupt
# Subagent delegation state
self._delegate_depth = 0 # 0 = top-level agent, incremented for children
self._active_children = [] # Running child AIAgents (for interrupt propagation)
# Store OpenRouter provider preferences
self.providers_allowed = providers_allowed
self.providers_ignored = providers_ignored
self.providers_order = providers_order
self.provider_sort = provider_sort
self.provider_require_parameters = provider_require_parameters
self.provider_data_collection = provider_data_collection
# Store toolset filtering options
self.enabled_toolsets = enabled_toolsets
self.disabled_toolsets = disabled_toolsets
# Model response configuration
self.max_tokens = max_tokens # None = use model default
self.reasoning_config = reasoning_config # None = use default (medium for OpenRouter)
self.prefill_messages = prefill_messages or [] # Prefilled conversation turns
# Anthropic prompt caching: auto-enabled for Claude models via OpenRouter.
# Reduces input costs by ~75% on multi-turn conversations by caching the
# conversation prefix. Uses system_and_3 strategy (4 breakpoints).
is_openrouter = "openrouter" in self.base_url.lower()
is_claude = "claude" in self.model.lower()
self._use_prompt_caching = is_openrouter and is_claude
self._cache_ttl = "5m" # Default 5-minute TTL (1.25x write cost)
# Iteration budget pressure: warn the LLM as it approaches max_iterations.
# Warnings are injected into the last tool result JSON (not as separate
# messages) so they don't break message structure or invalidate caching.
self._budget_caution_threshold = 0.7 # 70% — nudge to start wrapping up
self._budget_warning_threshold = 0.9 # 90% — urgent, respond now
self._budget_pressure_enabled = True
# Persistent error log -- always writes WARNING+ to ~/.hermes/logs/errors.log
# so tool failures, API errors, etc. are inspectable after the fact.
from agent.redact import RedactingFormatter
_error_log_dir = Path.home() / ".hermes" / "logs"
_error_log_dir.mkdir(parents=True, exist_ok=True)
_error_log_path = _error_log_dir / "errors.log"
from logging.handlers import RotatingFileHandler
_error_file_handler = RotatingFileHandler(
_error_log_path, maxBytes=2 * 1024 * 1024, backupCount=2,
)
_error_file_handler.setLevel(logging.WARNING)
_error_file_handler.setFormatter(RedactingFormatter(
'%(asctime)s %(levelname)s %(name)s: %(message)s',
))
logging.getLogger().addHandler(_error_file_handler)
if self.verbose_logging:
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
for handler in logging.getLogger().handlers:
handler.setFormatter(RedactingFormatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S',
))
# Keep third-party libraries at WARNING level to reduce noise
# We have our own retry and error logging that's more informative
logging.getLogger('openai').setLevel(logging.WARNING)
logging.getLogger('openai._base_client').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('httpcore').setLevel(logging.WARNING)
logging.getLogger('asyncio').setLevel(logging.WARNING)
# Suppress Modal/gRPC related debug spam
logging.getLogger('hpack').setLevel(logging.WARNING)
logging.getLogger('hpack.hpack').setLevel(logging.WARNING)
logging.getLogger('grpc').setLevel(logging.WARNING)
logging.getLogger('modal').setLevel(logging.WARNING)
logging.getLogger('rex-deploy').setLevel(logging.INFO) # Keep INFO for sandbox status
logger.info("Verbose logging enabled (third-party library logs suppressed)")
else:
# Set logging to INFO level for important messages only
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# Suppress noisy library logging
logging.getLogger('openai').setLevel(logging.ERROR)
logging.getLogger('openai._base_client').setLevel(logging.ERROR)
logging.getLogger('httpx').setLevel(logging.ERROR)
logging.getLogger('httpcore').setLevel(logging.ERROR)
if self.quiet_mode:
# In quiet mode (CLI default), suppress all tool/infra log
# noise. The TUI has its own rich display for status; logger
# INFO/WARNING messages just clutter it.
for quiet_logger in [
'tools', # all tools.* (terminal, browser, web, file, etc.)
'minisweagent', # mini-swe-agent execution backend
'run_agent', # agent runner internals
'trajectory_compressor',
'cron', # scheduler (only relevant in daemon mode)
'hermes_cli', # CLI helpers
]:
logging.getLogger(quiet_logger).setLevel(logging.ERROR)
# Initialize OpenAI client - defaults to OpenRouter
client_kwargs = {}
# Default to OpenRouter if no base_url provided
if base_url:
client_kwargs["base_url"] = base_url
else:
client_kwargs["base_url"] = OPENROUTER_BASE_URL
# Handle API key - OpenRouter is the primary provider
if api_key:
client_kwargs["api_key"] = api_key
else:
# Primary: OPENROUTER_API_KEY, fallback to direct provider keys
client_kwargs["api_key"] = os.getenv("OPENROUTER_API_KEY", "")
# OpenRouter app attribution — shows hermes-agent in rankings/analytics
effective_base = client_kwargs.get("base_url", "")
if "openrouter" in effective_base.lower():
client_kwargs["default_headers"] = {
"HTTP-Referer": "https://github.com/NousResearch/hermes-agent",
"X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent",
}
elif "api.kimi.com" in effective_base.lower():
# Kimi Code API requires a recognized coding-agent User-Agent
# (see https://github.com/MoonshotAI/kimi-cli)
client_kwargs["default_headers"] = {
"User-Agent": "KimiCLI/1.0",
}
self._client_kwargs = client_kwargs # stored for rebuilding after interrupt
try:
self.client = OpenAI(**client_kwargs)
if not self.quiet_mode:
print(f"🤖 AI Agent initialized with model: {self.model}")
if base_url:
print(f"🔗 Using custom base URL: {base_url}")
# Always show API key info (masked) for debugging auth issues
key_used = client_kwargs.get("api_key", "none")
if key_used and key_used != "dummy-key" and len(key_used) > 12:
print(f"🔑 Using API key: {key_used[:8]}...{key_used[-4:]}")
else:
print(f"⚠️ Warning: API key appears invalid or missing (got: '{key_used[:20] if key_used else 'none'}...')")
except Exception as e:
raise RuntimeError(f"Failed to initialize OpenAI client: {e}")
# Provider fallback — a single backup model/provider tried when the
# primary is exhausted (rate-limit, overload, connection failure).
# Config shape: {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}
self._fallback_model = fallback_model if isinstance(fallback_model, dict) else None
self._fallback_activated = False
if self._fallback_model:
fb_p = self._fallback_model.get("provider", "")
fb_m = self._fallback_model.get("model", "")
if fb_p and fb_m and not self.quiet_mode:
print(f"🔄 Fallback model: {fb_m} ({fb_p})")
# Get available tools with filtering
self.tools = get_tool_definitions(
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets,
quiet_mode=self.quiet_mode,
)
# Show tool configuration and store valid tool names for validation
self.valid_tool_names = set()
if self.tools:
self.valid_tool_names = {tool["function"]["name"] for tool in self.tools}
tool_names = sorted(self.valid_tool_names)
if not self.quiet_mode:
print(f"🛠️ Loaded {len(self.tools)} tools: {', '.join(tool_names)}")
# Show filtering info if applied
if enabled_toolsets:
print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}")
if disabled_toolsets:
print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}")
elif not self.quiet_mode:
print("🛠️ No tools loaded (all tools filtered out or unavailable)")
# Check tool requirements
if self.tools and not self.quiet_mode:
requirements = check_toolset_requirements()
missing_reqs = [name for name, available in requirements.items() if not available]
if missing_reqs:
print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}")
# Show trajectory saving status
if self.save_trajectories and not self.quiet_mode:
print("📝 Trajectory saving enabled")
# Show ephemeral system prompt status
if self.ephemeral_system_prompt and not self.quiet_mode:
prompt_preview = self.ephemeral_system_prompt[:60] + "..." if len(self.ephemeral_system_prompt) > 60 else self.ephemeral_system_prompt
print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)")
# Show prompt caching status
if self._use_prompt_caching and not self.quiet_mode:
print(f"💾 Prompt caching: ENABLED (Claude via OpenRouter, {self._cache_ttl} TTL)")
# Session logging setup - auto-save conversation trajectories for debugging
self.session_start = datetime.now()
if session_id:
# Use provided session ID (e.g., from CLI)
self.session_id = session_id
else:
# Generate a new session ID
timestamp_str = self.session_start.strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:6]
self.session_id = f"{timestamp_str}_{short_uuid}"
# Session logs go into ~/.hermes/sessions/ alongside gateway sessions
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
self.logs_dir = hermes_home / "sessions"
self.logs_dir.mkdir(parents=True, exist_ok=True)
self.session_log_file = self.logs_dir / f"session_{self.session_id}.json"
# Track conversation messages for session logging
self._session_messages: List[Dict[str, Any]] = []
# Cached system prompt -- built once per session, only rebuilt on compression
self._cached_system_prompt: Optional[str] = None
# Filesystem checkpoint manager (transparent — not a tool)
from tools.checkpoint_manager import CheckpointManager
self._checkpoint_mgr = CheckpointManager(
enabled=checkpoints_enabled,
max_snapshots=checkpoint_max_snapshots,
)
# SQLite session store (optional -- provided by CLI or gateway)
self._session_db = session_db
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
if self._session_db:
try:
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or "cli",
model=self.model,
model_config={
"max_iterations": self.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
},
user_id=None,
)
except Exception as e:
logger.debug("Session DB create_session failed: %s", e)
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore
self._todo_store = TodoStore()
# Persistent memory (MEMORY.md + USER.md) -- loaded from disk
self._memory_store = None
self._memory_enabled = False
self._user_profile_enabled = False
self._memory_nudge_interval = 10
self._memory_flush_min_turns = 6
if not skip_memory:
try:
from hermes_cli.config import load_config as _load_mem_config
mem_config = _load_mem_config().get("memory", {})
self._memory_enabled = mem_config.get("memory_enabled", False)
self._user_profile_enabled = mem_config.get("user_profile_enabled", False)
self._memory_nudge_interval = int(mem_config.get("nudge_interval", 10))
self._memory_flush_min_turns = int(mem_config.get("flush_min_turns", 6))
if self._memory_enabled or self._user_profile_enabled:
from tools.memory_tool import MemoryStore
self._memory_store = MemoryStore(
memory_char_limit=mem_config.get("memory_char_limit", 2200),
user_char_limit=mem_config.get("user_char_limit", 1375),
)
self._memory_store.load_from_disk()
except Exception:
pass # Memory is optional -- don't break agent init
# Honcho AI-native memory (cross-session user modeling)
# Reads ~/.honcho/config.json as the single source of truth.
self._honcho = None # HonchoSessionManager | None
self._honcho_session_key = honcho_session_key
if not skip_memory:
try:
from honcho_integration.client import HonchoClientConfig, get_honcho_client
hcfg = HonchoClientConfig.from_global_config()
if hcfg.enabled and hcfg.api_key:
from honcho_integration.session import HonchoSessionManager
client = get_honcho_client(hcfg)
self._honcho = HonchoSessionManager(
honcho=client,
config=hcfg,
context_tokens=hcfg.context_tokens,
)
# Resolve session key: explicit arg > global sessions map > fallback
if not self._honcho_session_key:
self._honcho_session_key = (
hcfg.resolve_session_name()
or "hermes-default"
)
# Ensure session exists in Honcho
self._honcho.get_or_create(self._honcho_session_key)
# Inject session context into the honcho tool module
from tools.honcho_tools import set_session_context
set_session_context(self._honcho, self._honcho_session_key)
logger.info(
"Honcho active (session: %s, user: %s, workspace: %s)",
self._honcho_session_key, hcfg.peer_name, hcfg.workspace_id,
)
else:
if not hcfg.enabled:
logger.debug("Honcho disabled in global config")
elif not hcfg.api_key:
logger.debug("Honcho enabled but no API key configured")
except Exception as e:
logger.debug("Honcho init failed (non-fatal): %s", e)
self._honcho = None
# Skills config: nudge interval for skill creation reminders
self._skill_nudge_interval = 15
try:
from hermes_cli.config import load_config as _load_skills_config
skills_config = _load_skills_config().get("skills", {})
self._skill_nudge_interval = int(skills_config.get("creation_nudge_interval", 15))
except Exception:
pass
# Initialize context compressor for automatic context management
# Compresses conversation when approaching model's context limit
# Configuration via config.yaml (compression section) or environment variables
compression_threshold = float(os.getenv("CONTEXT_COMPRESSION_THRESHOLD", "0.85"))
compression_enabled = os.getenv("CONTEXT_COMPRESSION_ENABLED", "true").lower() in ("true", "1", "yes")
compression_summary_model = os.getenv("CONTEXT_COMPRESSION_MODEL") or None
self.context_compressor = ContextCompressor(
model=self.model,
threshold_percent=compression_threshold,
protect_first_n=3,
protect_last_n=4,
summary_target_tokens=500,
summary_model_override=compression_summary_model,
quiet_mode=self.quiet_mode,
base_url=self.base_url,
)
self.compression_enabled = compression_enabled
self._user_turn_count = 0
# Cumulative token usage for the session
self.session_prompt_tokens = 0
self.session_completion_tokens = 0
self.session_total_tokens = 0
self.session_api_calls = 0
if not self.quiet_mode:
if compression_enabled:
print(f"📊 Context limit: {self.context_compressor.context_length:,} tokens (compress at {int(compression_threshold*100)}% = {self.context_compressor.threshold_tokens:,})")
else:
print(f"📊 Context limit: {self.context_compressor.context_length:,} tokens (auto-compression disabled)")
def _max_tokens_param(self, value: int) -> dict:
"""Return the correct max tokens kwarg for the current provider.
OpenAI's newer models (gpt-4o, o-series, gpt-5+) require
'max_completion_tokens'. OpenRouter, local models, and older
OpenAI models use 'max_tokens'.
"""
_is_direct_openai = (
"api.openai.com" in self.base_url.lower()
and "openrouter" not in self.base_url.lower()
)
if _is_direct_openai:
return {"max_completion_tokens": value}
return {"max_tokens": value}
def _has_content_after_think_block(self, content: str) -> bool:
"""
Check if content has actual text after any <think></think> blocks.
This detects cases where the model only outputs reasoning but no actual
response, which indicates an incomplete generation that should be retried.
Args:
content: The assistant message content to check
Returns:
True if there's meaningful content after think blocks, False otherwise
"""
if not content:
return False
# Remove all <think>...</think> blocks (including nested ones, non-greedy)
cleaned = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
# Check if there's any non-whitespace content remaining
return bool(cleaned.strip())
def _strip_think_blocks(self, content: str) -> str:
"""Remove <think>...</think> blocks from content, returning only visible text."""
if not content:
return ""
return re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
def _looks_like_codex_intermediate_ack(
self,
user_message: str,
assistant_content: str,
messages: List[Dict[str, Any]],
) -> bool:
"""Detect a planning/ack message that should continue instead of ending the turn."""
if any(isinstance(msg, dict) and msg.get("role") == "tool" for msg in messages):
return False
assistant_text = self._strip_think_blocks(assistant_content or "").strip().lower()
if not assistant_text:
return False
if len(assistant_text) > 1200:
return False
has_future_ack = bool(
re.search(r"\b(i['’]ll|i will|let me|i can do that|i can help with that)\b", assistant_text)
)
if not has_future_ack:
return False
action_markers = (
"look into",
"look at",
"inspect",
"scan",
"check",
"analyz",
"review",
"explore",
"read",
"open",
"run",
"test",
"fix",
"debug",
"search",
"find",
"walkthrough",
"report back",
"summarize",
)
workspace_markers = (
"directory",
"current directory",
"current dir",
"cwd",
"repo",
"repository",
"codebase",
"project",
"folder",
"filesystem",
"file tree",
"files",
"path",
)
user_text = (user_message or "").strip().lower()
user_targets_workspace = (
any(marker in user_text for marker in workspace_markers)
or "~/" in user_text
or "/" in user_text
)
assistant_mentions_action = any(marker in assistant_text for marker in action_markers)
assistant_targets_workspace = any(
marker in assistant_text for marker in workspace_markers
)
return (user_targets_workspace or assistant_targets_workspace) and assistant_mentions_action
def _extract_reasoning(self, assistant_message) -> Optional[str]:
"""
Extract reasoning/thinking content from an assistant message.
OpenRouter and various providers can return reasoning in multiple formats:
1. message.reasoning - Direct reasoning field (DeepSeek, Qwen, etc.)
2. message.reasoning_content - Alternative field (Moonshot AI, Novita, etc.)
3. message.reasoning_details - Array of {type, summary, ...} objects (OpenRouter unified)
Args:
assistant_message: The assistant message object from the API response
Returns:
Combined reasoning text, or None if no reasoning found
"""
reasoning_parts = []
# Check direct reasoning field
if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning:
reasoning_parts.append(assistant_message.reasoning)
# Check reasoning_content field (alternative name used by some providers)
if hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content:
# Don't duplicate if same as reasoning
if assistant_message.reasoning_content not in reasoning_parts:
reasoning_parts.append(assistant_message.reasoning_content)
# Check reasoning_details array (OpenRouter unified format)
# Format: [{"type": "reasoning.summary", "summary": "...", ...}, ...]
if hasattr(assistant_message, 'reasoning_details') and assistant_message.reasoning_details:
for detail in assistant_message.reasoning_details:
if isinstance(detail, dict):
# Extract summary from reasoning detail object
summary = detail.get('summary') or detail.get('content') or detail.get('text')
if summary and summary not in reasoning_parts:
reasoning_parts.append(summary)
# Combine all reasoning parts
if reasoning_parts:
return "\n\n".join(reasoning_parts)
return None
def _cleanup_task_resources(self, task_id: str) -> None:
"""Clean up VM and browser resources for a given task."""
try:
cleanup_vm(task_id)
except Exception as e:
if self.verbose_logging:
logging.warning(f"Failed to cleanup VM for task {task_id}: {e}")
try:
cleanup_browser(task_id)
except Exception as e:
if self.verbose_logging:
logging.warning(f"Failed to cleanup browser for task {task_id}: {e}")
def _persist_session(self, messages: List[Dict], conversation_history: List[Dict] = None):
"""Save session state to both JSON log and SQLite on any exit path.
Ensures conversations are never lost, even on errors or early returns.
"""
self._session_messages = messages
self._save_session_log(messages)
self._flush_messages_to_session_db(messages, conversation_history)
def _flush_messages_to_session_db(self, messages: List[Dict], conversation_history: List[Dict] = None):
"""Persist any un-flushed messages to the SQLite session store.
Uses _last_flushed_db_idx to track which messages have already been
written, so repeated calls (from multiple exit paths) only write
truly new messages — preventing the duplicate-write bug (#860).
"""
if not self._session_db:
return
try:
start_idx = len(conversation_history) if conversation_history else 0
flush_from = max(start_idx, self._last_flushed_db_idx)
for msg in messages[flush_from:]:
role = msg.get("role", "unknown")
content = msg.get("content")
tool_calls_data = None
if hasattr(msg, "tool_calls") and msg.tool_calls:
tool_calls_data = [
{"name": tc.function.name, "arguments": tc.function.arguments}
for tc in msg.tool_calls
]
elif isinstance(msg.get("tool_calls"), list):
tool_calls_data = msg["tool_calls"]
self._session_db.append_message(
session_id=self.session_id,
role=role,
content=content,
tool_name=msg.get("tool_name"),
tool_calls=tool_calls_data,
tool_call_id=msg.get("tool_call_id"),
finish_reason=msg.get("finish_reason"),
)
self._last_flushed_db_idx = len(messages)
except Exception as e:
logger.debug("Session DB append_message failed: %s", e)
def _get_messages_up_to_last_assistant(self, messages: List[Dict]) -> List[Dict]:
"""
Get messages up to (but not including) the last assistant turn.
This is used when we need to "roll back" to the last successful point
in the conversation, typically when the final assistant message is
incomplete or malformed.
Args:
messages: Full message list
Returns:
Messages up to the last complete assistant turn (ending with user/tool message)
"""
if not messages:
return []
# Find the index of the last assistant message
last_assistant_idx = None
for i in range(len(messages) - 1, -1, -1):
if messages[i].get("role") == "assistant":
last_assistant_idx = i
break
if last_assistant_idx is None:
# No assistant message found, return all messages
return messages.copy()
# Return everything up to (not including) the last assistant message
return messages[:last_assistant_idx]
def _format_tools_for_system_message(self) -> str:
"""
Format tool definitions for the system message in the trajectory format.
Returns:
str: JSON string representation of tool definitions
"""
if not self.tools:
return "[]"
# Convert tool definitions to the format expected in trajectories
formatted_tools = []
for tool in self.tools:
func = tool["function"]
formatted_tool = {
"name": func["name"],
"description": func.get("description", ""),
"parameters": func.get("parameters", {}),
"required": None # Match the format in the example
}
formatted_tools.append(formatted_tool)
return json.dumps(formatted_tools, ensure_ascii=False)
def _convert_to_trajectory_format(self, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]:
"""
Convert internal message format to trajectory format for saving.
Args:
messages (List[Dict]): Internal message history
user_query (str): Original user query
completed (bool): Whether the conversation completed successfully
Returns:
List[Dict]: Messages in trajectory format
"""
trajectory = []
# Add system message with tool definitions
system_msg = (
"You are a function calling AI model. You are provided with function signatures within <tools> </tools> XML tags. "
"You may call one or more functions to assist with the user query. If available tools are not relevant in assisting "
"with user query, just respond in natural conversational language. Don't make assumptions about what values to plug "
"into functions. After calling & executing the functions, you will be provided with function results within "
"<tool_response> </tool_response> XML tags. Here are the available tools:\n"
f"<tools>\n{self._format_tools_for_system_message()}\n</tools>\n"
"For each function call return a JSON object, with the following pydantic model json schema for each:\n"
"{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, "
"'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n"
"Each function call should be enclosed within <tool_call> </tool_call> XML tags.\n"
"Example:\n<tool_call>\n{'name': <function-name>,'arguments': <args-dict>}\n</tool_call>"
)
trajectory.append({
"from": "system",
"value": system_msg
})
# Add the actual user prompt (from the dataset) as the first human message
trajectory.append({
"from": "human",
"value": user_query
})
# Skip the first message (the user query) since we already added it above.
# Prefill messages are injected at API-call time only (not in the messages
# list), so no offset adjustment is needed here.
i = 1
while i < len(messages):
msg = messages[i]
if msg["role"] == "assistant":
# Check if this message has tool calls