-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzeus.py
More file actions
1332 lines (1191 loc) · 61.5 KB
/
Copy pathzeus.py
File metadata and controls
1332 lines (1191 loc) · 61.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
ZEUS — Supreme Orchestrator Agent
Responsibilities:
1. Own the pipeline — every agent is a child, no agent talks to another
2. Query the Knowledge Base before making the final trade decision
3. Run an LLM reasoning step (Claude) to evaluate the signal holistically
4. Write a full DecisionTrace to the KB for every signal processed
5. Manage circuit breakers — degrade gracefully if any agent fails
6. Start and monitor the Watchdog for zero-outage operation
Import rule: zeus.py is the ONLY file allowed to import from agents/*.
All other agents import from core.types only.
"""
from __future__ import annotations
import json
import logging
import os
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import anthropic
from agents.apollo import ApolloAgent
from agents.ares import AresAgent
from agents.ares_mock import AresMockAgent
from agents.argus import ArgusAgent
from agents.artemis import ArtemisAgent
from agents.hades import HadesAgent
from agents.icarus import IcarusAgent
from agents.pythia import PythiaAgent
from core.circuit_breaker import CircuitBreaker
from core.knowledge_base import KnowledgeBase
from core.milestone_manager import MilestoneManager
from core.redis_bridge import RedisBridge
from core.seniority import SeniorityEvaluator, SeniorityReport
from core.types import (
AgentHealth,
DecisionTrace,
FilteredSignal,
HealthReport,
MacroContext,
MarketRegime,
PipelineStatus,
RawSignal,
SizedSignal,
TradeResult,
)
from core.watchdog import Watchdog
logger = logging.getLogger("zeus")
def _fmt_vix(vix: float) -> str:
"""Render VIX for the LLM prompt. The -1.0 sentinel (Artemis macro fetch
failed) must read as UNAVAILABLE — never as a benign number ZEUS could
treat as safe."""
return "UNAVAILABLE" if vix is None or vix < 0 else f"{vix:.1f}"
@dataclass
class ZeusConfig:
max_portfolio_drawdown_pct: float = 0.08
max_open_positions: int = 3 # settings is authoritative — see build_zeus()
max_open_positions_per_ticker: int = 1 # never hold > 1 position per ticker
max_deployed_pct: float = 0.90 # cap: never deploy >90% of equity (keep cash buffer)
ticker_cooldown_hours: float = 48.0 # hours before re-trading same ticker
paper_trading: bool = True
mock_execution: bool = True
min_zeus_confidence: float = 0.55
use_llm_reasoning: bool = True
use_debate: bool = True # bull/bear debate before Director verdict
debate_rounds: int = 1 # bull→bear pairs per signal
debate_max_tokens: int = 400 # output cap per debate call
# Single source of truth for "how much money we have". Feeds BOTH the
# milestone stage/tier gate AND € position sizing, so the two can never
# reason about different balances. None = fail closed (raise), never guess.
# (Was two fields — starting_equity + default_account_equity — that could
# drift and silently misclassify the stage; consolidated to one.)
default_account_equity: Optional[float] = None
stop_loss_pct: float = 0.03
take_profit_pct: float = 0.06
ib_paper_port: int = 4002
ib_live_port: int = 4001
# Single source of truth for the Anthropic model IDs (was hardcoded in 4
# call sites). claude-sonnet-4-6 is the current Sonnet. Overridable via
# settings.json so an account-specific / upgraded ID changes in one place.
anthropic_model_director: str = "claude-sonnet-4-6"
anthropic_model_debate: str = "claude-sonnet-4-6"
@dataclass
class PipelineRun:
"""Full audit trail for one signal moving through the pipeline."""
run_id: str
started_at: datetime
raw_signal: Optional[RawSignal] = None
filtered_signal: Optional[FilteredSignal] = None
macro_context: Optional[MacroContext] = None
sized_signal: Optional[SizedSignal] = None
trade_result: Optional[TradeResult] = None
trace: Optional[DecisionTrace] = None
killed_at_stage: Optional[str] = None
kill_reason: Optional[str] = None
def kill(self, stage: str, reason: str, trace: "DecisionTrace | None" = None) -> "PipelineRun":
self.killed_at_stage = stage
self.kill_reason = reason
if trace is not None:
self.trace = trace
logger.info("[ZEUS] Signal killed at %s — %s", stage, reason)
return self
class ZeusOrchestrator:
"""
ZEUS owns the full pipeline.
Circuit breakers protect every agent call.
Watchdog runs as a background daemon.
LLM reasoning runs before final trade approval.
All decisions are written to the Knowledge Base.
"""
def __init__(self, config: ZeusConfig | None = None):
self.config = config or ZeusConfig()
# Settings is authoritative for these knobs — override ZeusConfig defaults
# so callers don't need to explicitly pass them through ZeusConfig.
from config.settings import load_settings
_s = load_settings()
if self.config.max_open_positions == 3 and "max_open_positions" in _s:
self.config.max_open_positions = int(_s["max_open_positions"])
if self.config.max_open_positions_per_ticker == 1:
self.config.max_open_positions_per_ticker = int(_s.get("max_open_positions_per_ticker", 1))
if self.config.max_deployed_pct == 0.90:
self.config.max_deployed_pct = float(_s.get("max_deployed_pct", 0.90))
if self.config.ticker_cooldown_hours == 48.0:
self.config.ticker_cooldown_hours = float(_s.get("ticker_cooldown_hours", 48.0))
if self.config.use_debate is True and "use_debate" in _s:
self.config.use_debate = bool(_s["use_debate"])
if self.config.debate_rounds == 1:
self.config.debate_rounds = int(_s.get("debate_rounds", 1))
if self.config.debate_max_tokens == 400:
self.config.debate_max_tokens = int(_s.get("debate_max_tokens", 400))
if self.config.anthropic_model_director == "claude-sonnet-4-6":
self.config.anthropic_model_director = str(_s.get("anthropic_model_director", "claude-sonnet-4-6"))
if self.config.anthropic_model_debate == "claude-sonnet-4-6":
self.config.anthropic_model_debate = str(_s.get("anthropic_model_debate", "claude-sonnet-4-6"))
# Account equity — single source of truth for stage gate AND € sizing.
# Prefer explicit config; else settings.json. FAIL CLOSED if neither
# provides it: a money-sizing system must never guess a balance.
if self.config.default_account_equity is None:
_eq = _s.get("account_equity", _s.get("default_account_equity"))
self.config.default_account_equity = float(_eq) if _eq is not None else None
if self.config.default_account_equity is None or self.config.default_account_equity <= 0:
raise ValueError(
"account equity is not configured — set 'default_account_equity' "
"(or 'account_equity') in settings.json or pass it to ZeusConfig. "
"Refusing to start with an unknown balance (fail closed)."
)
self.status = PipelineStatus.RUNNING
# Core infrastructure
self.kb = KnowledgeBase()
self.cb = CircuitBreaker(failure_threshold=3, window_seconds=300, reset_timeout=120)
self.watchdog = Watchdog(alert_fn=self._send_alert)
self.milestone = MilestoneManager(
account_equity=self.config.default_account_equity,
alert_fn=self._send_alert,
)
# Agents — ZEUS holds the only references
# Apollo first so its get_ticker resolver can be injected into Icarus
self.apollo = ApolloAgent(knowledge_base=self.kb)
self.icarus = IcarusAgent(
ticker_resolver=self.apollo.get_ticker,
)
self.hades = HadesAgent()
self.artemis = ArtemisAgent()
self.pythia = PythiaAgent(milestone_manager=self.milestone)
# ── Seniority evaluator (built early — its verdict gates real money) ──
self.seniority = SeniorityEvaluator(kb=self.kb, alert_fn=self._send_alert)
self._seniority_report: Optional[SeniorityReport] = None
# ── HARD real-money gate ──────────────────────────────────────────────
# Settings can REQUEST live trading (paper_trading=False), but the team
# only earns it by reaching the Senior tier AND an operator arming it.
# If either is missing, force paper mode here — before Ares/Argus are
# built — so no real order can ever be placed prematurely. This is the
# teeth behind "no real money until they're senior."
_gate_forced_paper = False
if not self.config.paper_trading:
from core.seniority import real_money_live
startup_report = self.seniority.evaluate()
if not real_money_live(startup_report.system_tier):
logger.warning(
"[ZEUS] Real money REQUESTED but BLOCKED — %s armed=%s. "
"Forcing PAPER mode. (Reach Senior + set ARM_REAL_MONEY=true to enable.)",
startup_report.summary_line().split(" | ")[0],
os.getenv("ARM_REAL_MONEY", "false"),
)
self.config.paper_trading = True
self.config.mock_execution = True
_gate_forced_paper = True
else:
logger.warning("[ZEUS] Real money LIVE — system is Senior and armed. Real capital at risk.")
# Resolve IB port once: env > settings.json > ZeusConfig default (4002/4001).
# SAFETY: when the gate forced paper mode, IGNORE any IB_PORT override —
# an explicit IB_PORT=4001 must never route a blocked system to the live
# broker port. The paper port is non-negotiable in that state.
_ib_port_default = self.config.ib_paper_port if self.config.paper_trading else self.config.ib_live_port
if _gate_forced_paper:
_ib_port = self.config.ib_paper_port
else:
_ib_port = int(os.getenv("IB_PORT", str(_ib_port_default)))
# Belt-and-suspenders: in paper mode the resolved port can never be live.
if self.config.paper_trading and _ib_port == self.config.ib_live_port:
logger.warning("[ZEUS] IB_PORT=%d is the LIVE port but system is paper — overriding to paper port %d",
_ib_port, self.config.ib_paper_port)
_ib_port = self.config.ib_paper_port
self.ares = (
AresMockAgent(
account_equity=self.config.default_account_equity,
stop_loss_pct=self.config.stop_loss_pct,
take_profit_pct=self.config.take_profit_pct,
)
if self.config.mock_execution
else AresAgent(
paper=self.config.paper_trading,
host=os.getenv("IB_HOST", "ibgateway"),
port=_ib_port,
stop_loss_pct=self.config.stop_loss_pct,
take_profit_pct=self.config.take_profit_pct,
)
)
self.argus = ArgusAgent(
max_drawdown_pct=self.config.max_portfolio_drawdown_pct,
on_kill=self._emergency_halt,
milestone_manager=self.milestone,
default_account_equity=self.config.default_account_equity,
ib_host=os.getenv("IB_HOST", "ibgateway"),
ib_port=_ib_port,
mock=self.config.mock_execution, # mirror mock_execution — no IB in mock mode
)
# Shadow learning layer — wire KB into Argus's OutcomeResolver
self.argus.set_knowledge_base(self.kb)
# LLM client for reasoning step
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise RuntimeError("ANTHROPIC_API_KEY is not set — LLM reasoning will fail. Check your .env.")
self._claude = anthropic.Anthropic(api_key=api_key)
self._verify_model_or_die()
self.bridge = RedisBridge() # SpendLens intelligence feed
self._register_watchdog()
self.watchdog.start()
# Evaluate on startup — log current readiness
self._run_seniority_evaluation()
# Intra-run state — reset each run_once() cycle
self._run_approved_trades: list[dict] = [] # tickers/sides approved this cycle
logger.info(
"[ZEUS] Initialised — paper=%s mock=%s llm_reasoning=%s",
self.config.paper_trading, self.config.mock_execution, self.config.use_llm_reasoning,
)
def _verify_model_or_die(self) -> None:
"""Boot-time dry-run of the director model. A bad/unauthorized model ID
must be a LOUD failure here — otherwise every LLM call throws at runtime
and ZEUS silently falls back to Pattern-only scoring, masking the cause.
Skipped when LLM reasoning is off (mock mode) or ZEUS_SKIP_MODEL_CHECK=1
(offline/tests)."""
if not self.config.use_llm_reasoning:
return
if os.getenv("ZEUS_SKIP_MODEL_CHECK") == "1":
logger.info("[ZEUS] Model dry-run skipped (ZEUS_SKIP_MODEL_CHECK=1)")
return
model = self.config.anthropic_model_director
try:
self._claude.messages.create(
model=model, max_tokens=1,
messages=[{"role": "user", "content": "ping"}],
)
logger.info("[ZEUS] Model dry-run OK — %s reachable", model)
except Exception as exc:
logger.critical(
"[ZEUS] Model dry-run FAILED for '%s': %s. Refusing to start — "
"fix anthropic_model_director in settings.json / the API key.", model, exc)
raise RuntimeError(f"Anthropic model '{model}' unusable: {exc}") from exc
def _budget_equity(self) -> float:
"""Equity the budget cap is computed against. Prefer LIVE equity from
Argus so the cap tracks gains/drawdown; fall back to the configured
starting balance when Argus hasn't refreshed yet (0.0 / None / error),
logging a WARN so the fallback is visible."""
try:
live = self.argus.portfolio_state().total_equity
except Exception as exc:
logger.warning("[ZEUS] could not read live equity (%s) — using config equity", exc)
live = None
if live and live > 0:
return float(live)
logger.warning("[ZEUS] live equity unavailable — budget cap on config €%.0f",
self.config.default_account_equity)
return float(self.config.default_account_equity)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run_once(self) -> list[PipelineRun]:
if self.status != PipelineStatus.RUNNING:
logger.warning("[ZEUS] Pipeline is %s — skipping run.", self.status.value)
return []
# Fetch signals directly from Icarus (Supabase)
raw_signals = self.cb.call("icarus", fn=self.icarus.fetch, fallback=[])
logger.info("[ZEUS] Icarus returned %d signal(s).", len(raw_signals))
# Reset intra-run state so each cycle starts fresh
self._run_approved_trades = []
runs: list[PipelineRun] = []
for sig in raw_signals:
run = self._process_signal(sig)
runs.append(run)
self.cb.call("argus", fn=self.argus.refresh, fallback=None)
return runs
def run_research_cycle(self, historical: bool = False) -> dict:
"""Trigger Apollo's daily research cycle — ingest literature, update tickers, self-improve.
Pass historical=True for the one-shot bootstrap that loads 4 years of
earnings, Form 4, FRED macro, and EDGAR supply chain data before paper
trading begins.
"""
if historical:
result = self.cb.call(
"apollo",
fn=self.apollo.run_historical_ingestion,
fallback={"error": "circuit open"},
)
else:
result = self.cb.call(
"apollo",
fn=self.apollo.run_research_cycle,
fallback={"error": "circuit open"},
)
# Re-evaluate seniority after every research cycle — Apollo may have promoted agents
self._run_seniority_evaluation()
return result
def decide(self, sized: SizedSignal) -> dict:
"""
Expose the ZEUS LLM reasoning step as a standalone callable.
Used by ReplayEngine to re-evaluate past DecisionTraces without
going through the full pipeline.
Returns {"approved": bool, "reasoning": str}.
"""
dummy_trace = DecisionTrace(
trace_id=str(uuid.uuid4()),
signal_id=None,
timestamp=datetime.now(timezone.utc),
headline=sized.original.headline if sized.original else "",
supplier=sized.original.supplier if sized.original else "",
category=sized.original.category.value if sized.original else "",
severity=sized.original.severity.value if sized.original else "",
hades_passed=True, hades_notes=[],
trend_suppressed=False, trend_regime=None, trend_vix=sized.macro.vix,
pattern_confidence=sized.confidence, pattern_size_pct=sized.position_size_pct,
zeus_reasoning="", zeus_approved=False, zeus_override=False,
zeus_override_reason=None, trade_placed=False,
)
try:
approved, reasoning, _ = self._zeus_evaluate(sized, sized.macro, dummy_trace)
return {"approved": approved, "reasoning": reasoning}
except Exception as exc:
logger.warning("[ZEUS] decide() failed: %s", exc)
return {"approved": False, "reasoning": str(exc)}
def run_backtest(self) -> dict:
"""
Replay historical KB entries through Hades → Pythia to pre-seed
context key statistics before paper trading begins.
Safe to run multiple times — synthetic trades are additive.
"""
from datetime import datetime, timezone
from core.shadow_learning import Backtester
from core.types import MacroContext, MarketRegime
macro = MacroContext(
fetched_at=datetime.now(timezone.utc),
regime=MarketRegime.SIDEWAYS,
vix=18.0,
sp500_1m_return=0.0,
)
bt = Backtester(
hades_agent=self.hades,
pythia_agent=self.pythia,
macro_context=macro,
)
result = bt.run(self.kb)
logger.info("[ZEUS] Backtest complete: %s", result.summary())
return result.summary()
def run_replay(self, limit: int = 30) -> dict:
"""
Re-run the last N DecisionTraces through current ZEUS reasoning.
Returns agreement rate and changed-mind cases for review.
"""
from core.shadow_learning import ReplayEngine
engine = ReplayEngine(zeus_agent=self)
results = engine.replay_recent(self.kb, limit=limit)
changed = [r for r in results if r.changed_mind()]
return {
"replayed": len(results),
"agreement_rate": engine.agreement_rate(results),
"changed_mind": len(changed),
"changed_cases": [
{
"trace_id": r.trace_id,
"was": "APPROVED" if r.original_approved else "REJECTED",
"now": "APPROVED" if r.replay_approved else "REJECTED",
"original_reasoning": r.original_reasoning[:200],
"replay_reasoning": r.replay_reasoning[:200],
}
for r in changed
],
}
def get_seniority_report(self) -> dict:
"""Return current seniority levels for all agents — safe to expose publicly."""
if self._seniority_report is None:
self._run_seniority_evaluation()
report = self._seniority_report
# Public view: ranks only, no criteria details (knowledge base content stays private)
return {
"system_level": report.system_tier.label(),
"system_rank": f"{report.system_tier.label()} L{report.system_level}",
"system_tier_int": int(report.system_tier),
"real_money_unlocked": report.real_money_unlocked,
"real_money_armed": report.armed,
"live_trading_allowed": report.live_trading_allowed,
"max_position_pct": report.max_position_pct,
"evaluated_at": report.evaluated_at.isoformat(),
"agents": {
name: {
"tier": score.tier.label(),
"level": score.level,
"rank": score.rank_label(),
"wins": score.wins,
"level_int": int(score.tier), # back-compat: tier ordinal
}
for name, score in report.agents.items()
},
}
def halt(self, reason: str = "manual") -> None:
self.status = PipelineStatus.HALTED
try:
self.ares.cancel_all_pending()
except Exception as exc:
logger.warning("[ZEUS] cancel_all_pending failed during halt: %s", exc)
self._send_alert(f"ZEUS HALTED — {reason}")
logger.critical("[ZEUS] HALT — %s", reason)
def resume(self) -> None:
if self.status == PipelineStatus.SHUTDOWN:
raise RuntimeError("Cannot resume a shutdown ZEUS instance.")
self.status = PipelineStatus.RUNNING
logger.info("[ZEUS] Resumed.")
def health(self) -> AgentHealth:
if self.status == PipelineStatus.RUNNING:
return AgentHealth.HEALTHY
if self.status == PipelineStatus.HALTED:
return AgentHealth.FAILED
return AgentHealth.DEGRADED
def get_health_reports(self) -> list[HealthReport]:
return self.watchdog.poll_now()
def get_milestone_status(self) -> dict:
return self.milestone.status_dict()
# ------------------------------------------------------------------
# Pipeline
# ------------------------------------------------------------------
def _process_signal(self, raw: RawSignal) -> PipelineRun:
run_id = str(uuid.uuid4())
run = PipelineRun(run_id=run_id, started_at=datetime.now(timezone.utc), raw_signal=raw)
trace = self._init_trace(run_id, raw)
# Stage 1 — Hades compliance
filtered: Optional[FilteredSignal] = self.cb.call(
"hades",
fn=lambda: self.hades.filter(raw),
fallback=None,
)
trace.hades_passed = filtered is not None
if filtered is None:
trace.hades_notes = ["Hades killed or circuit open"]
trace.kill_reason = "compliance block or Hades circuit open"
trace.killed_at_stage = "hades"
self._write_trace(trace)
return run.kill("hades", trace.kill_reason, trace)
trace.hades_notes = filtered.notes
run.filtered_signal = filtered
self.bridge.push_supplier_risk(filtered) # → SpendLens vendor risk
# Stage 2 — Trend macro context
macro: MacroContext = self.cb.call(
"artemis",
fn=lambda: self.artemis.analyze(filtered),
fallback=MacroContext(
fetched_at=datetime.now(timezone.utc),
regime=MarketRegime.UNKNOWN,
vix=20.0,
sp500_1m_return=0.0,
suppress=False,
),
)
trace.trend_regime = macro.regime.value if hasattr(macro.regime, "value") else str(macro.regime)
trace.trend_vix = macro.vix
trace.trend_suppressed = macro.suppress
if macro.suppress:
trace.killed_at_stage = "trend"
trace.kill_reason = macro.suppress_reason or "macro suppression"
self._write_trace(trace)
return run.kill("artemis", trace.kill_reason, trace)
run.macro_context = macro
self.bridge.push_macro(macro) # → SpendLens category strategy
# Stage 3 — Pattern sizing
sized: SizedSignal = self.cb.call(
"pythia",
fn=lambda: self.pythia.size(filtered, macro),
fallback=SizedSignal(
original=filtered, macro=macro,
confidence=0.5, position_size_pct=0.01,
skip=False,
),
)
trace.pattern_confidence = sized.confidence
trace.pattern_size_pct = sized.position_size_pct
if sized.skip:
trace.killed_at_stage = "pattern"
trace.kill_reason = sized.skip_reason or "low confidence"
self._write_trace(trace)
return run.kill("pythia", trace.kill_reason, trace)
run.sized_signal = sized
# Stage 3b — Concentration control
# Reject before LLM to avoid burning tokens on a structurally-blocked ticker.
conc_kill = self._check_concentration(sized)
if conc_kill:
trace.killed_at_stage = "concentration"
trace.kill_reason = conc_kill
self._write_trace(trace)
return run.kill("concentration", conc_kill, trace)
# Stage 4 — Pre-decision enrichment: Apollo researches the company,
# giving Zeus real fundamentals + recent news before the LLM decides.
ticker = sized.affected_tickers[0] if sized.affected_tickers else ""
live_context = ""
ticker_history: list[dict] = []
if ticker:
try:
live_context = self.apollo.enrich_signal(ticker, sized.supplier)
except Exception as exc:
logger.warning("[ZEUS] Signal enrichment failed for %s: %s", ticker, exc)
try:
ticker_history = self.kb.query_ticker_history(ticker, n_results=5)
except Exception as exc:
logger.warning("[ZEUS] Ticker history query failed for %s: %s", ticker, exc)
# Stage 4 — ZEUS LLM reasoning + KB query (the final judge)
approved, reasoning, override_size = self._zeus_evaluate(
sized, macro, trace,
live_context=live_context,
intra_run_trades=list(self._run_approved_trades),
ticker_history=ticker_history,
)
trace.zeus_reasoning = reasoning
trace.zeus_approved = approved
if override_size is not None:
trace.zeus_override = True
trace.zeus_override_reason = f"ZEUS resized from {sized.position_size_pct:.3f} to {override_size:.3f}"
sized.position_size_pct = override_size
# Teach Icarus — feedback loop so it learns which patterns get approved
self.icarus.record_signal_outcome(raw, approved)
if not approved:
trace.killed_at_stage = "zeus"
trace.kill_reason = "ZEUS LLM reasoning rejected trade"
self._write_trace(trace)
return run.kill("zeus", trace.kill_reason, trace)
# Stage 5 — Portfolio headroom check
if self.argus.open_position_count() >= self.config.max_open_positions:
trace.killed_at_stage = "zeus"
trace.kill_reason = "max open positions reached"
self._write_trace(trace)
return run.kill("zeus", trace.kill_reason, trace)
# Stage 5a — Budget gate: ZEUS must not approve trades it can't fund.
# The goal is to GROW the account, not over-commit it. Check the real
# wallet (DB-based committed capital vs equity) before approving — this
# also fails SAFE when IB is down (committed_capital reads the DB, not a
# live position count that silently returns 0 when the broker is gone).
equity = self._budget_equity()
committed = self.argus.committed_capital()
trade_cost = equity * sized.position_size_pct
max_deployed = equity * self.config.max_deployed_pct
if committed + trade_cost > max_deployed:
trace.killed_at_stage = "zeus"
trace.kill_reason = (
f"budget: would deploy €{committed + trade_cost:,.0f} of €{equity:,.0f} "
f"(cap €{max_deployed:,.0f} @ {self.config.max_deployed_pct:.0%}); "
f"€{self.argus.available_cash():,.0f} cash free"
)
logger.info("[ZEUS] Trade rejected on budget — %s", trace.kill_reason)
self._write_trace(trace)
return run.kill("zeus", trace.kill_reason, trace)
# Stage 5b — Seniority position size ceiling
if self._seniority_report is not None:
max_pct = self._seniority_report.max_position_pct
if sized.position_size_pct > max_pct:
logger.info(
"[ZEUS] Position capped by seniority: %.2f%% → %.2f%% (system=%s)",
sized.position_size_pct * 100, max_pct * 100,
self._seniority_report.summary_line().split(" | ")[0],
)
sized.position_size_pct = max_pct
# Stage 6 — Execute (with pre-flight health gate and pending-order drain)
ares_healthy = self.ares.health() == AgentHealth.HEALTHY
if ares_healthy:
# Drain any previously queued approvals before placing the new one
try:
drained = self.ares.reconcile_pending()
if drained:
logger.info("[ZEUS] Reconciled %d pending order(s) before new placement", drained)
except Exception as _rec_exc:
logger.warning("[ZEUS] reconcile_pending failed (non-fatal): %s", _rec_exc)
if ares_healthy:
result: TradeResult = self.cb.call(
"ares",
fn=lambda: self.ares.place(sized),
fallback=TradeResult(
order_id="failed", symbol="", side="", fill_price=None, qty=0, status="circuit_open"
),
)
else:
# IB is down — enqueue instead of attempting the doomed 3-retry storm
logger.info("[ZEUS] IB pre-flight DEGRADED — queuing approval for signal %s", sized.signal_id)
result = self.ares.place(sized) # place() will hit _enqueue() via connectivity check
run.trade_result = result
trace.trade_placed = result.status not in ("circuit_open", "skipped", "error")
trace.symbol = result.symbol
trace.side = result.side
trace.fill_price = result.fill_price
# Record this approval so subsequent signals in this cycle know about it
if trace.trade_placed:
self._run_approved_trades.append({
"ticker": result.symbol,
"side": result.side,
"sector": sized.category.value,
"supplier": sized.supplier,
})
# In mock mode: register position with Argus immediately (no IB poll needed)
if self.config.mock_execution and result.fill_price and result.qty:
pos_side = "LONG" if result.side == "BUY" else "SHORT"
self.argus.add_mock_position(
symbol=result.symbol, side=pos_side,
qty=result.qty, avg_cost=result.fill_price,
current_price=result.fill_price,
stop_loss_price=result.stop_loss_price,
take_profit_price=result.take_profit_price,
)
# Feed outcome back to Pattern + KB
self.cb.call("pythia", fn=lambda: self.pythia.record_trade(sized, result), fallback=None)
self._write_trace(trace)
run.trace = trace
logger.info(
"[ZEUS] Trade placed — %s %s @ %s | order_id=%s | confidence=%.2f",
result.side, result.symbol, result.fill_price, result.order_id, sized.confidence,
)
return run
# ------------------------------------------------------------------
# ZEUS LLM reasoning — the final judge
# ------------------------------------------------------------------
def _zeus_evaluate(
self,
sized: SizedSignal,
macro: MacroContext,
trace: DecisionTrace,
live_context: str = "",
intra_run_trades: Optional[list] = None,
ticker_history: Optional[list] = None,
) -> tuple[bool, str, Optional[float]]:
"""
Query the KB, build the Director prompt, call Claude, parse response.
Returns (approved, reasoning_text, override_position_size_or_None).
"""
if not self.config.use_llm_reasoning:
return sized.confidence >= self.config.min_zeus_confidence, "LLM reasoning disabled.", None
kb_context = self._build_kb_context(sized, macro, trace)
self_critique = self._load_self_critique()
# Adversarial debate — bull builds the case, bear rebuts it (seeing the
# bull's argument). ZEUS reads both before adjudicating. Returns ("","")
# if debate is disabled or a call fails — ZEUS then reasons single-shot.
bull_case, bear_case = self._run_debate(sized, macro, kb_context, live_context)
prompt = self._build_director_prompt(
sized, macro, kb_context,
live_context=live_context,
intra_run_trades=intra_run_trades or [],
self_critique=self_critique,
ticker_history=ticker_history or [],
bull_case=bull_case,
bear_case=bear_case,
)
try:
response = self._claude.messages.create(
model=self.config.anthropic_model_director,
max_tokens=1500,
messages=[{"role": "user", "content": prompt}],
)
self._record_token_usage(response.usage, sized.affected_tickers[0] if sized.affected_tickers else "unknown", role="director")
approved, reasoning, override_size = self._parse_llm_response(
response.content[0].text.strip(), sized
)
# Fold the debate into the auditable reasoning text (no schema change).
if bull_case or bear_case:
reasoning += (
f" | DEBATE — BULL: {bull_case[:300]}"
f" || BEAR: {bear_case[:300]}"
)
return approved, reasoning, override_size
except Exception as exc:
logger.error("[ZEUS] LLM reasoning failed: %s — defaulting to Pattern score.", exc)
fallback_approved = sized.confidence >= self.config.min_zeus_confidence
return fallback_approved, f"LLM call failed ({exc}). Used Pattern confidence fallback.", None
def _run_debate(
self,
sized: SizedSignal,
macro: MacroContext,
kb_context: str,
live_context: str = "",
) -> tuple[str, str]:
"""
Run a one-round adversarial debate before the Director verdict.
Bull argues the strongest case to TAKE the trade. Bear then rebuts it,
seeing the bull's argument (sequential, so it's genuinely adversarial
rather than two independent opinions).
Strictly additive: returns ("", "") if debate is disabled or any call
fails — ZEUS then falls back to single-shot reasoning, exactly as before.
The debate produces evidence; ZEUS remains the sole judge.
"""
if not self.config.use_debate:
return "", ""
ticker = sized.affected_tickers[0] if sized.affected_tickers else "unknown"
bull_case = ""
bear_case = ""
try:
bull_prompt = self._build_debate_prompt(
"BULL", sized, macro, kb_context, live_context, opponent_case=""
)
bull_resp = self._claude.messages.create(
model=self.config.anthropic_model_debate,
max_tokens=self.config.debate_max_tokens,
messages=[{"role": "user", "content": bull_prompt}],
)
self._record_token_usage(bull_resp.usage, f"{ticker}:bull", role="bull")
bull_case = bull_resp.content[0].text.strip()
except Exception as exc:
logger.warning("[ZEUS] Bull debate call failed: %s — proceeding without it.", exc)
return "", ""
try:
bear_prompt = self._build_debate_prompt(
"BEAR", sized, macro, kb_context, live_context, opponent_case=bull_case
)
bear_resp = self._claude.messages.create(
model=self.config.anthropic_model_debate,
max_tokens=self.config.debate_max_tokens,
messages=[{"role": "user", "content": bear_prompt}],
)
self._record_token_usage(bear_resp.usage, f"{ticker}:bear", role="bear")
bear_case = bear_resp.content[0].text.strip()
except Exception as exc:
logger.warning("[ZEUS] Bear debate call failed: %s — using bull case only.", exc)
logger.info(
"[ZEUS] Debate complete for %s — bull=%d chars, bear=%d chars",
ticker, len(bull_case), len(bear_case),
)
return bull_case, bear_case
def _build_debate_prompt(
self,
side: str,
sized: SizedSignal,
macro: MacroContext,
kb_context: str,
live_context: str,
opponent_case: str,
) -> str:
"""Build a bull- or bear-side debate prompt. `side` is 'BULL' or 'BEAR'."""
live_block = f"\nLIVE FUNDAMENTALS (Apollo):\n{live_context}\n" if live_context else ""
if side == "BULL":
role = (
"You are the BULL analyst. Build the strongest evidence-based case to TAKE "
"this long trade. Ground every claim in the signal, the macro regime, the live "
"fundamentals, and KB precedent below. Do not invent numbers. End by naming the "
"single biggest risk to your own thesis — the honest bull names the bear's best shot."
)
opponent_block = ""
else:
role = (
"You are the BEAR analyst. The bull's case is below. Rebut it. Argue the strongest "
"evidence-based case to REJECT or skip this trade: why the event may not move this "
"ticker as claimed, why the regime or sizing could be wrong, what failure modes the "
"bull ignored. Ground every claim in the evidence. Do not invent numbers."
)
opponent_block = f"\n--- BULL'S CASE (rebut this) ---\n{opponent_case}\n"
return f"""{role}
═══════════════════════════════════════════════
SIGNAL
═══════════════════════════════════════════════
Headline: {sized.headline}
Supplier: {sized.supplier}
Category: {sized.category.value}
Severity: {sized.severity.value}
Tickers: {sized.affected_tickers}
MACRO: regime={macro.regime.value} | VIX={_fmt_vix(macro.vix)} | SPY 1m={macro.sp500_1m_return*100:.1f}%
QUANT: Pythia confidence={sized.confidence:.2f}, proposed size={sized.position_size_pct*100:.2f}%
{live_block}
═══════════════════════════════════════════════
KNOWLEDGE BASE & PRECEDENT
═══════════════════════════════════════════════
{kb_context}
{opponent_block}
Write 3-5 tight sentences. No preamble, no JSON — just your argument."""
def _record_token_usage(self, usage, symbol: str, role: str = "director") -> None:
# Sonnet 4.6 pricing: $3/MTok input, $15/MTok output
input_tok = getattr(usage, "input_tokens", 0) or 0
output_tok = getattr(usage, "output_tokens", 0) or 0
cost_usd = round((input_tok * 3 + output_tok * 15) / 1_000_000, 6)
logger.info("[ZEUS] LLM tokens — in=%d out=%d cost=$%.4f role=%s symbol=%s",
input_tok, output_tok, cost_usd, role, symbol)
try:
from datetime import datetime, timezone
import core.supabase_client as supa
# agent/role are explicit columns now (not parsed from `symbol`) so the
# dashboard's per-agent token panel reads them directly. ZEUS is the
# only LLM caller today; `agent` lets other agents attribute spend later.
supa.get_client().table("llm_usage").insert({
"model": self.config.anthropic_model_director,
"agent": "zeus",
"role": role,
"symbol": symbol,
"input_tokens": input_tok,
"output_tokens": output_tok,
"cost_usd": cost_usd,
"recorded_at": datetime.now(timezone.utc).isoformat(),
}).execute()
except Exception as exc:
logger.debug("[ZEUS] llm_usage insert skipped: %s", exc)
def _load_self_critique(self) -> str:
"""Read Zeus's own skill file — accumulated self-critique from Apollo's improvement loop."""
try:
skills_path = Path("knowledge/agents/zeus_skills.md")
if skills_path.exists():
content = skills_path.read_text(encoding="utf-8").strip()
if content:
return content
except Exception:
pass
return ""
def _build_kb_context(
self,
sized: SizedSignal,
macro: MacroContext,
trace: DecisionTrace,
) -> str:
"""Fetch KB doctrine, precedent, and outcome stats; return formatted context block."""
kb_query = (
f"{sized.category.value} signal in {macro.regime} market, "
f"VIX {macro.vix:.1f}, supplier {sized.supplier}"
)
try:
knowledge_chunks = self.kb.query_knowledge(kb_query, n_results=5)
except Exception as exc:
logger.warning("[ZEUS] KB knowledge query failed: %s", exc)
knowledge_chunks = []
try:
past_decisions = self.kb.query_similar_decisions(kb_query, n_results=4)
except Exception as exc:
logger.warning("[ZEUS] KB decisions query failed: %s", exc)
past_decisions = []
try:
outcome_stats = self.kb.query_outcomes_by_context(sized.category.value, trace.trend_regime)
except Exception as exc:
logger.warning("[ZEUS] KB outcomes query failed: %s", exc)
outcome_stats = "unavailable"
if not knowledge_chunks and not past_decisions:
return "No KB context loaded yet — operating on first principles."
return "\n\n".join([
"--- TRADING DOCTRINE (KB) ---",
*knowledge_chunks,
"--- PRECEDENT: SIMILAR PAST DECISIONS ---",
*past_decisions,
f"--- STATISTICAL OUTCOMES FOR THIS CONTEXT ---\n{outcome_stats}",
])
def _build_director_prompt(
self,
sized: SizedSignal,
macro: MacroContext,
kb_context: str,
live_context: str = "",
intra_run_trades: Optional[list] = None,
self_critique: str = "",
ticker_history: Optional[list] = None,
bull_case: str = "",
bear_case: str = "",
) -> str:
"""Assemble the full Director governance prompt from signal + portfolio state."""
open_positions = self.argus.open_position_count()
portfolio_state = self.argus.portfolio_state()
current_dd = portfolio_state.current_drawdown_pct
equity = portfolio_state.total_equity
seniority_level = (
f"{self._seniority_report.system_tier.label()} L{self._seniority_report.system_level}"
if self._seniority_report else "Trainee L1"
)
# Format trades already approved this cycle
if intra_run_trades:
trades_this_cycle = "\n".join(
f" • {t['ticker']} {t['side']} ({t['sector']}, supplier: {t['supplier']})"
for t in intra_run_trades
)
else:
trades_this_cycle = " None yet this cycle."
# Self-critique section — Zeus reads his own accumulated biases
self_critique_section = ""
if self_critique:
self_critique_section = f"""
═══════════════════════════════════════════════
YOUR KNOWN BIASES & SELF-CRITIQUE (from Apollo's analysis of your past decisions)
═══════════════════════════════════════════════
{self_critique}