forked from MervinPraison/PraisonAI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.py
More file actions
1182 lines (987 loc) · 41.8 KB
/
manager.py
File metadata and controls
1182 lines (987 loc) · 41.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Context Manager Facade for PraisonAI Agents.
Provides a unified interface for context management:
- Budgeting and allocation
- Token estimation with validation
- Composition within limits
- Optimization with benefit checking
- Monitoring with snapshot hooks
- Multi-agent orchestration support
- Optimization history tracking
This is the main entry point for context management in both SDK and CLI.
"""
import hashlib
import json
from praisonaiagents._logging import get_logger
logger = get_logger(__name__)
import threading
import atexit
from typing import Dict, List, Any, Optional, Literal, Callable, Tuple
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from enum import Enum
from .models import (
ContextLedger, BudgetAllocation, OptimizerStrategy, OptimizationResult,
ContextConfig as ManagerConfig # Deprecated alias for backward compatibility
)
from .tokens import estimate_tokens_heuristic, estimate_messages_tokens, estimate_message_tokens
from .budgeter import ContextBudgeter
from .ledger import ContextLedgerManager, MultiAgentLedger
from .optimizer import get_optimizer
from .monitor import ContextMonitor, MultiAgentMonitor
class SessionDeduplicationCache:
"""
Thread-safe session-level content deduplication cache.
Tracks content hashes across all agents in a workflow session
to prevent duplicate content from being sent to LLM.
"""
def __init__(self, max_size: int = 1000):
"""
Initialize session deduplication cache.
Args:
max_size: Maximum number of hashes to cache (LRU eviction)
"""
self._cache: Dict[str, str] = {} # hash -> agent_name
self._lock = threading.Lock()
self._max_size = max_size
self._stats = {"duplicates_prevented": 0, "tokens_saved": 0}
def check_and_add(self, content_hash: str, agent_name: str, tokens: int = 0) -> bool:
"""
Check if content hash exists and add if new.
Args:
content_hash: Hash of the content
agent_name: Name of the agent adding this content
tokens: Estimated tokens in this content
Returns:
True if duplicate (already exists), False if new
"""
with self._lock:
if content_hash in self._cache:
self._stats["duplicates_prevented"] += 1
self._stats["tokens_saved"] += tokens
return True # Duplicate
# LRU eviction if at capacity
if len(self._cache) >= self._max_size:
# Remove oldest entry
oldest_key = next(iter(self._cache))
del self._cache[oldest_key]
self._cache[content_hash] = agent_name
return False # New content
def get_stats(self) -> Dict[str, int]:
"""Get deduplication statistics."""
with self._lock:
return self._stats.copy()
def clear(self) -> None:
"""Clear the cache."""
with self._lock:
self._cache.clear()
self._stats = {"duplicates_prevented": 0, "tokens_saved": 0}
def deduplicate_topics(topics: list, key: str = "title", similarity_threshold: float = 0.8) -> list:
"""
Programmatic deduplication of topics/items before agent processing.
This helps prevent duplicate content from being passed to downstream agents,
reducing token waste and improving quality.
Args:
topics: List of topic dicts or strings
key: Key to use for comparison if topics are dicts (default: "title")
similarity_threshold: Similarity threshold for fuzzy matching (0.0-1.0)
Returns:
Deduplicated list of topics
"""
if not topics:
return topics
seen_hashes = set()
seen_normalized = set()
unique_topics = []
for topic in topics:
# Get the content to compare
if isinstance(topic, dict):
content = str(topic.get(key, topic.get("content", str(topic))))
else:
content = str(topic)
# Normalize for comparison
normalized = content.lower().strip()
# Remove common words for better matching
normalized = " ".join(w for w in normalized.split() if len(w) > 3)
# Check exact hash match
content_hash = hashlib.sha256(normalized.encode()).hexdigest()
if content_hash in seen_hashes:
continue
# Check fuzzy match using simple word overlap
is_duplicate = False
for seen in seen_normalized:
# Calculate Jaccard similarity
words1 = set(normalized.split())
words2 = set(seen.split())
if words1 and words2:
intersection = len(words1 & words2)
union = len(words1 | words2)
similarity = intersection / union if union > 0 else 0
if similarity >= similarity_threshold:
is_duplicate = True
break
if not is_duplicate:
seen_hashes.add(content_hash)
seen_normalized.add(normalized)
unique_topics.append(topic)
return unique_topics
class EstimationMode(str, Enum):
"""Token estimation modes."""
HEURISTIC = "heuristic"
ACCURATE = "accurate"
VALIDATED = "validated"
class ContextShareMode(str, Enum):
"""How context is shared between agents."""
NONE = "none"
SUMMARY = "summary"
FULL = "full"
class ToolShareMode(str, Enum):
"""How tools are shared between agents."""
NONE = "none"
SAFE = "safe"
FULL = "full"
class OptimizationEventType(str, Enum):
"""Types of optimization events."""
NORMALIZE = "normalize"
CAP_OUTPUTS = "cap_outputs"
PRUNE_TOOLS = "prune_tools"
SLIDING_WINDOW = "sliding_window"
SUMMARIZE = "summarize"
BENEFIT_CHECK = "benefit_check"
REVERT = "revert"
SNAPSHOT = "snapshot"
OVERFLOW_DETECTED = "overflow_detected"
AUTO_COMPACT = "auto_compact"
@dataclass
class ContextPolicy:
"""
Policy for context sharing during agent handoffs.
Controls how context is passed between agents in multi-agent scenarios.
"""
share: bool = False
share_mode: ContextShareMode = ContextShareMode.NONE
max_tokens: int = 0 # 0 = no limit
tools_share: ToolShareMode = ToolShareMode.NONE
preserve_system: bool = True
preserve_recent_turns: int = 3
def to_dict(self) -> Dict[str, Any]:
return {
"share": self.share,
"share_mode": self.share_mode.value,
"max_tokens": self.max_tokens,
"tools_share": self.tools_share.value,
"preserve_system": self.preserve_system,
"preserve_recent_turns": self.preserve_recent_turns,
}
@dataclass
class OptimizationEvent:
"""Record of an optimization event."""
timestamp: str
event_type: OptimizationEventType
strategy: Optional[str] = None
tokens_before: int = 0
tokens_after: int = 0
tokens_saved: int = 0
messages_affected: int = 0
details: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp,
"event_type": self.event_type.value,
"strategy": self.strategy,
"tokens_before": self.tokens_before,
"tokens_after": self.tokens_after,
"tokens_saved": self.tokens_saved,
"messages_affected": self.messages_affected,
"details": self.details,
}
@dataclass
class EstimationMetrics:
"""Metrics for token estimation accuracy."""
heuristic_estimate: int = 0
accurate_estimate: int = 0
error_pct: float = 0.0
estimator_used: EstimationMode = EstimationMode.HEURISTIC
def to_dict(self) -> Dict[str, Any]:
return {
"heuristic_estimate": self.heuristic_estimate,
"accurate_estimate": self.accurate_estimate,
"error_pct": self.error_pct,
"estimator_used": self.estimator_used.value,
}
@dataclass
class PerToolBudget:
"""Per-tool token budget configuration."""
tool_name: str
max_output_tokens: int = 10000
protected: bool = False
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class SnapshotHookData:
"""Data captured at LLM call boundary for exact snapshot."""
timestamp: str
messages: List[Dict[str, Any]]
tools: List[Dict[str, Any]]
message_hash: str
tools_hash: str
ledger: Optional[ContextLedger] = None
budget: Optional[BudgetAllocation] = None
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp,
"message_count": len(self.messages),
"tools_count": len(self.tools),
"message_hash": self.message_hash,
"tools_hash": self.tools_hash,
"ledger": self.ledger.to_dict() if self.ledger else None,
"budget": self.budget.to_dict() if self.budget else None,
}
class ContextManager:
"""
Unified facade for context management.
Orchestrates budgeting, composition, optimization, and monitoring.
Provides hooks for exact LLM boundary snapshots.
Tracks optimization history for debugging.
Example:
manager = ContextManager(model="gpt-4o")
# Process messages before LLM call
result = manager.process(
messages=messages,
system_prompt=system_prompt,
tools=tools,
)
# Get optimized messages
optimized_messages = result["messages"]
# Check if optimization occurred
if result["optimized"]:
print(f"Saved {result['tokens_saved']} tokens")
"""
def __init__(
self,
model: str = "gpt-4o-mini",
config: Optional[ManagerConfig] = None,
session_id: str = "",
agent_name: str = "",
session_cache: Optional[SessionDeduplicationCache] = None,
llm_summarize_fn: Optional[Callable] = None,
):
"""
Initialize context manager.
Args:
model: Model name for budget calculation
config: Manager configuration
session_id: Session identifier
agent_name: Agent name for monitoring
session_cache: Shared session cache for cross-agent deduplication
llm_summarize_fn: Optional LLM function for intelligent summarization
"""
self.model = model
self.config = config or ManagerConfig.from_env()
self.session_id = session_id
self.agent_name = agent_name
# LLM summarization function (auto-wired from agent when llm_summarize=True)
self._llm_summarize_fn = llm_summarize_fn
# Session-level deduplication cache (shared across agents in workflow)
self._session_cache = session_cache
# Core components
self._budgeter = ContextBudgeter(
model=model,
output_reserve=self.config.output_reserve,
)
self._ledger = ContextLedgerManager(agent_id=agent_name)
self._monitor = ContextMonitor(
enabled=self.config.monitor_enabled,
path=self.config.monitor_path,
format=self.config.monitor_format,
frequency=self.config.monitor_frequency,
redact_sensitive=self.config.redact_sensitive,
)
# Budget allocation
self._budget = self._budgeter.allocate()
self._ledger.set_budget(self._budget)
# Optimization history
self._history: List[OptimizationEvent] = []
self._max_history = 100
# Estimation metrics
self._estimation_metrics: Optional[EstimationMetrics] = None
# Snapshot hook data
self._last_snapshot_hook: Optional[SnapshotHookData] = None
self._snapshot_callbacks: List[Callable[[SnapshotHookData], None]] = []
# Async write buffer
self._async_buffer: List[Tuple[Path, str]] = []
self._async_lock = threading.Lock()
self._async_thread: Optional[threading.Thread] = None
# Register exit handler for async flush
if self.config.monitor_write_mode == "async":
atexit.register(self._flush_async_writes)
# Message hash cache for estimation
self._estimation_cache: Dict[str, int] = {}
self._cache_max_size = 1000
def process(
self,
messages: List[Dict[str, Any]],
system_prompt: str = "",
tools: Optional[List[Dict[str, Any]]] = None,
trigger: Literal["turn", "tool_call", "manual", "overflow"] = "turn",
) -> Dict[str, Any]:
"""
Process messages through the context pipeline.
Applies budgeting, optimization, and monitoring.
Args:
messages: Conversation messages
system_prompt: System prompt content
tools: Tool schemas
trigger: What triggered this processing
Returns:
Dict with processed messages and metadata
"""
tools = tools or []
# Track segments
self._ledger.reset()
if system_prompt:
self._ledger.track_system_prompt(system_prompt)
if tools:
self._ledger.track_tools(tools)
self._ledger.track_history(messages)
# Check for overflow
utilization = self._ledger.get_utilization()
needs_optimization = utilization >= self.config.compact_threshold
result = {
"messages": messages,
"optimized": False,
"tokens_before": self._ledger.get_total(),
"tokens_after": self._ledger.get_total(),
"tokens_saved": 0,
"utilization": utilization,
"warnings": self._ledger.get_warnings(),
"optimization_result": None,
}
# Log overflow detection
if needs_optimization:
self._add_history_event(
OptimizationEventType.OVERFLOW_DETECTED,
tokens_before=result["tokens_before"],
details={"utilization": utilization, "threshold": self.config.compact_threshold},
)
# Step 1: Deduplicate messages before optimization
deduped_messages = self._deduplicate_messages(messages)
dedup_saved = estimate_messages_tokens(messages) - estimate_messages_tokens(deduped_messages)
if dedup_saved > 0:
self._add_history_event(
OptimizationEventType.CAP_OUTPUTS,
tokens_before=estimate_messages_tokens(messages),
tokens_after=estimate_messages_tokens(deduped_messages),
tokens_saved=dedup_saved,
details={"reason": "deduplication", "messages_removed": len(messages) - len(deduped_messages)},
)
messages = deduped_messages
result["tokens_before"] = estimate_messages_tokens(messages)
# Step 2: Auto-compact if needed
if self.config.auto_compact and needs_optimization:
optimized_messages, opt_result = self._optimize_with_benefit_check(
messages,
self._budget.history_budget,
)
if opt_result and opt_result.tokens_saved > 0:
result["messages"] = optimized_messages
result["optimized"] = True
result["tokens_after"] = opt_result.optimized_tokens
result["tokens_saved"] = opt_result.tokens_saved
result["optimization_result"] = opt_result
# Update ledger
self._ledger.track_history(optimized_messages)
result["utilization"] = self._ledger.get_utilization()
result["warnings"] = self._ledger.get_warnings()
# Pre-optimization snapshot
if self.config.snapshot_timing in ("pre_optimization", "both"):
self._take_snapshot(messages, tools, trigger, "pre_optimization")
# Post-optimization snapshot
if self.config.snapshot_timing in ("post_optimization", "both"):
self._take_snapshot(result["messages"], tools, trigger, "post_optimization")
return result
def _optimize_with_benefit_check(
self,
messages: List[Dict[str, Any]],
target_tokens: int,
) -> Tuple[List[Dict[str, Any]], Optional[OptimizationResult]]:
"""
Optimize with benefit checking to prevent token inflation.
Args:
messages: Messages to optimize
target_tokens: Target token count
Returns:
Tuple of (optimized_messages, result)
"""
original_tokens = estimate_messages_tokens(messages)
if original_tokens <= target_tokens:
return messages, None
# Get optimizer with LLM summarization if configured
optimizer = get_optimizer(
self.config.strategy,
preserve_recent=self.config.keep_recent_turns,
protected_tools=self.config.protected_tools,
llm_summarize_fn=self._llm_summarize_fn if self.config.llm_summarize else None,
smart_tool_summarize=self.config.smart_tool_summarize,
tool_summarize_limits=self.config.tool_summarize_limits,
)
# Try optimization
optimized, result = optimizer.optimize(messages, target_tokens, self._ledger.get_ledger())
# Benefit check
min_gain = self.config.compression_min_gain_pct / 100.0
actual_gain = (original_tokens - result.optimized_tokens) / original_tokens if original_tokens > 0 else 0
self._add_history_event(
OptimizationEventType.BENEFIT_CHECK,
strategy=self.config.strategy.value,
tokens_before=original_tokens,
tokens_after=result.optimized_tokens,
tokens_saved=result.tokens_saved,
details={
"min_gain_required": min_gain,
"actual_gain": actual_gain,
"beneficial": actual_gain >= min_gain,
},
)
if actual_gain < min_gain:
# Not beneficial - revert
self._add_history_event(
OptimizationEventType.REVERT,
tokens_before=result.optimized_tokens,
tokens_after=original_tokens,
details={"reason": "compression_not_beneficial"},
)
return messages, None
# Log successful optimization
self._add_history_event(
OptimizationEventType.AUTO_COMPACT,
strategy=self.config.strategy.value,
tokens_before=original_tokens,
tokens_after=result.optimized_tokens,
tokens_saved=result.tokens_saved,
messages_affected=result.messages_removed,
)
return optimized, result
def _deduplicate_messages(
self,
messages: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Remove duplicate content from messages.
Detects and removes messages with identical content hashes,
keeping only the first occurrence. Preserves message order.
Uses session-level cache for cross-agent deduplication if available.
Args:
messages: List of messages to deduplicate
Returns:
Deduplicated list of messages
"""
if not messages:
return messages
seen_hashes: set = set()
result: List[Dict[str, Any]] = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
# Always keep system messages (usually unique)
if role == "system":
result.append(msg)
continue
# Always keep assistant messages with tool_calls (they're actions)
if role == "assistant" and msg.get("tool_calls"):
result.append(msg)
continue
# For tool results and user/assistant content, check for duplicates
if isinstance(content, str) and len(content) > 100:
# Hash the content (first 2000 chars to avoid hashing huge content)
content_key = content[:2000]
content_hash = hashlib.sha256(content_key.encode()).hexdigest()[:16]
# Check local seen hashes first
if content_hash in seen_hashes:
logger.debug(f"[Context] Dedup: skipping local duplicate (hash={content_hash[:8]}, agent={self.agent_name})")
continue
# Check session-level cache for cross-agent deduplication
if self._session_cache:
tokens = estimate_message_tokens(msg)
if self._session_cache.check_and_add(content_hash, self.agent_name, tokens):
# Duplicate found in session cache - skip
logger.debug(f"[Context] Dedup: skipping session duplicate (hash={content_hash[:8]}, agent={self.agent_name}, tokens={tokens})")
continue
seen_hashes.add(content_hash)
result.append(msg)
return result
def _take_snapshot(
self,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
trigger: str,
timing: str,
) -> None:
"""Take a context snapshot."""
if not self._monitor.enabled:
return
self._monitor.snapshot(
ledger=self._ledger.get_ledger(),
budget=self._budget,
messages=messages,
session_id=self.session_id,
agent_name=self.agent_name,
model_name=self.model,
trigger=trigger,
)
self._add_history_event(
OptimizationEventType.SNAPSHOT,
details={"timing": timing, "trigger": trigger},
)
def capture_llm_boundary(
self,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
) -> SnapshotHookData:
"""
Capture exact state at LLM call boundary.
Call this immediately before sending to LLM to get exact snapshot.
Args:
messages: Exact messages being sent
tools: Exact tool schemas being sent
Returns:
SnapshotHookData with hashes for verification
"""
# Compute hashes
messages_json = json.dumps(messages, sort_keys=True, default=str)
tools_json = json.dumps(tools, sort_keys=True, default=str)
message_hash = hashlib.sha256(messages_json.encode()).hexdigest()[:16]
tools_hash = hashlib.sha256(tools_json.encode()).hexdigest()[:16]
from datetime import timezone
hook_data = SnapshotHookData(
timestamp=datetime.now(tz=timezone.utc).isoformat().replace('+00:00', 'Z'),
messages=messages,
tools=tools,
message_hash=message_hash,
tools_hash=tools_hash,
ledger=self._ledger.get_ledger(),
budget=self._budget,
)
self._last_snapshot_hook = hook_data
# Call registered callbacks
for callback in self._snapshot_callbacks:
try:
callback(hook_data)
except Exception:
pass # Don't let callbacks break the flow
return hook_data
def register_snapshot_callback(
self,
callback: Callable[[SnapshotHookData], None]
) -> None:
"""Register a callback for LLM boundary snapshots."""
self._snapshot_callbacks.append(callback)
def get_last_snapshot_hook(self) -> Optional[SnapshotHookData]:
"""Get the last LLM boundary snapshot."""
return self._last_snapshot_hook
def estimate_tokens(
self,
text: str,
validate: bool = False,
) -> Tuple[int, Optional[EstimationMetrics]]:
"""
Estimate tokens with optional validation.
Args:
text: Text to estimate
validate: Whether to validate against accurate count
Returns:
Tuple of (token_count, metrics)
"""
# Check cache
cache_key = hashlib.sha256(text.encode()).hexdigest()
if cache_key in self._estimation_cache:
return self._estimation_cache[cache_key], None
heuristic = estimate_tokens_heuristic(text)
metrics = None
if validate or self.config.estimation_mode == EstimationMode.VALIDATED:
try:
from .tokens import estimate_tokens_accurate
accurate = estimate_tokens_accurate(text, self.model)
error_pct = abs(heuristic - accurate) / accurate * 100 if accurate > 0 else 0
metrics = EstimationMetrics(
heuristic_estimate=heuristic,
accurate_estimate=accurate,
error_pct=error_pct,
estimator_used=EstimationMode.VALIDATED,
)
self._estimation_metrics = metrics
# Log if mismatch exceeds threshold
if self.config.log_estimation_mismatch and error_pct > self.config.mismatch_threshold_pct:
import logging
get_logger(__name__).warning(
f"Token estimation mismatch: heuristic={heuristic}, accurate={accurate}, error={error_pct:.1f}%"
)
# Use accurate if validated mode
if self.config.estimation_mode == EstimationMode.ACCURATE:
result = accurate
else:
result = heuristic
except ImportError:
result = heuristic
metrics = EstimationMetrics(
heuristic_estimate=heuristic,
estimator_used=EstimationMode.HEURISTIC,
)
else:
result = heuristic
# Cache result
if len(self._estimation_cache) < self._cache_max_size:
self._estimation_cache[cache_key] = result
return result, metrics
def get_tool_budget(self, tool_name: str) -> int:
"""Get token budget for a specific tool."""
if tool_name in self.config.tool_budgets:
return self.config.tool_budgets[tool_name].max_output_tokens
return self.config.default_tool_output_max
def set_tool_budget(self, tool_name: str, max_tokens: int, protected: bool = False) -> None:
"""Set token budget for a specific tool."""
self.config.tool_budgets[tool_name] = PerToolBudget(
tool_name=tool_name,
max_output_tokens=max_tokens,
protected=protected,
)
if protected and tool_name not in self.config.protected_tools:
self.config.protected_tools.append(tool_name)
def truncate_tool_output(self, tool_name: str, output: str) -> str:
"""Truncate tool output according to its budget."""
max_tokens = self.get_tool_budget(tool_name)
# Estimate current tokens
current_tokens, _ = self.estimate_tokens(output)
if current_tokens <= max_tokens:
return output
# Truncate by character ratio (approximate)
ratio = max_tokens / current_tokens
max_chars = int(len(output) * ratio * 0.9) # 10% safety margin
# Use smart truncation format that judge recognizes as OK
tail_size = min(max_chars // 5, 1000)
head = output[:max_chars - tail_size]
tail = output[-tail_size:] if tail_size > 0 else ""
truncated = f"{head}\n...[{len(output):,} chars, showing first/last portions]...\n{tail}"
self._add_history_event(
OptimizationEventType.CAP_OUTPUTS,
tokens_before=current_tokens,
tokens_after=max_tokens,
details={"tool_name": tool_name, "original_length": len(output)},
)
return truncated
def _add_history_event(
self,
event_type: OptimizationEventType,
strategy: Optional[str] = None,
tokens_before: int = 0,
tokens_after: int = 0,
tokens_saved: int = 0,
messages_affected: int = 0,
details: Optional[Dict[str, Any]] = None,
) -> None:
"""Add an event to optimization history."""
from datetime import timezone
event = OptimizationEvent(
timestamp=datetime.now(tz=timezone.utc).isoformat().replace('+00:00', 'Z'),
event_type=event_type,
strategy=strategy,
tokens_before=tokens_before,
tokens_after=tokens_after,
tokens_saved=tokens_saved,
messages_affected=messages_affected,
details=details or {},
)
self._history.append(event)
# Trim history if too long
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history:]
def get_history(self) -> List[Dict[str, Any]]:
"""Get optimization history."""
return [e.to_dict() for e in self._history]
def get_stats(self) -> Dict[str, Any]:
"""Get current context statistics."""
return {
"model": self.model,
"session_id": self.session_id,
"agent_name": self.agent_name,
"budget": self._budget.to_dict(),
"ledger": self._ledger.to_dict(),
"utilization": self._ledger.get_utilization(),
"warnings": self._ledger.get_warnings(),
"history_events": len(self._history),
"estimation_metrics": self._estimation_metrics.to_dict() if self._estimation_metrics else None,
"monitor_stats": self._monitor.get_stats(),
}
def emergency_truncate(
self,
messages: List[Dict[str, Any]],
target_tokens: int,
) -> List[Dict[str, Any]]:
"""
Emergency truncation when optimization isn't enough.
Aggressively removes messages to fit within target tokens.
Preserves system messages and most recent turns.
Args:
messages: Messages to truncate
target_tokens: Target token count
Returns:
Truncated messages list
"""
if not messages:
return messages
# Estimate current tokens
current_tokens = estimate_messages_tokens(messages)
if current_tokens <= target_tokens:
return messages
# Separate system and non-system messages
system_msgs = [m for m in messages if m.get("role") == "system"]
other_msgs = [m for m in messages if m.get("role") != "system"]
# Keep system messages (they're usually small)
result = list(system_msgs)
system_tokens = estimate_messages_tokens(system_msgs)
remaining_budget = target_tokens - system_tokens
if remaining_budget <= 0:
# Even system messages exceed budget - truncate system content
for msg in result:
if isinstance(msg.get("content"), str) and len(msg["content"]) > 500:
content = msg["content"]
tail_size = min(50, len(content) // 10)
msg["content"] = f"{content[:450]}\n...[{len(content):,} chars, showing first/last portions]...\n{content[-tail_size:] if tail_size > 0 else ''}"
return result
# Keep most recent messages that fit
kept_msgs = []
tokens_used = 0
for msg in reversed(other_msgs):
msg_tokens = estimate_message_tokens(msg)
if tokens_used + msg_tokens <= remaining_budget:
kept_msgs.insert(0, msg)
tokens_used += msg_tokens
else:
# Try to fit a truncated version
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
# Keep user messages but truncate content
available = remaining_budget - tokens_used
if available > 50:
truncated_msg = msg.copy()
max_chars = available * 4 # ~4 chars per token
content = msg["content"]
tail_size = min(max_chars // 10, 100)
truncated_msg["content"] = f"{content[:max_chars - tail_size]}\n...[{len(content):,} chars, showing first/last portions]...\n{content[-tail_size:] if tail_size > 0 else ''}"
kept_msgs.insert(0, truncated_msg)
break
result.extend(kept_msgs)
# Log the emergency truncation
self._add_history_event(
OptimizationEventType.CAP_OUTPUTS,
tokens_before=current_tokens,
tokens_after=estimate_messages_tokens(result),
tokens_saved=current_tokens - estimate_messages_tokens(result),
messages_affected=len(messages) - len(result),
details={"reason": "emergency_truncation", "target_tokens": target_tokens},
)
return result
def get_resolved_config(self) -> Dict[str, Any]:
"""Get the fully resolved configuration with source info."""
return {
"config": self.config.to_dict(),
"precedence": "CLI > ENV > config.yaml > defaults",
"effective_budget": self._budget.to_dict(),
}
def _flush_async_writes(self) -> None:
"""Flush any pending async writes."""
with self._async_lock:
for path, content in self._async_buffer:
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
except Exception:
pass
self._async_buffer.clear()
def reset(self) -> None:
"""Reset manager state."""
self._ledger.reset()
self._history.clear()
self._estimation_cache.clear()
self._last_snapshot_hook = None
class MultiAgentContextManager:
"""
Context manager for multi-agent orchestration.
Provides per-agent isolation with controlled sharing policies.
"""
def __init__(
self,
config: Optional[ManagerConfig] = None,
default_policy: Optional[ContextPolicy] = None,
session_cache: Optional[SessionDeduplicationCache] = None,
):
"""
Initialize multi-agent context manager.
Args:
config: Base configuration
default_policy: Default context sharing policy
session_cache: Shared session deduplication cache
"""
self.config = config or ManagerConfig.from_env()
self.default_policy = default_policy or ContextPolicy()
# Session-level deduplication cache (shared across all agents)
self._session_cache = session_cache or SessionDeduplicationCache()
self._agents: Dict[str, ContextManager] = {}
self._multi_ledger = MultiAgentLedger()
self._multi_monitor = MultiAgentMonitor(
enabled=self.config.monitor_enabled,
format=self.config.monitor_format,
redact_sensitive=self.config.redact_sensitive,
)
# Agent policies
self._policies: Dict[str, ContextPolicy] = {}