-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore_cli.py
More file actions
100 lines (84 loc) · 4.06 KB
/
core_cli.py
File metadata and controls
100 lines (84 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
core_cli.py — Master Orchestrator for the Zenith AI Security Pipeline
======================================================================
ROLE IN PIPELINE: The conductor. Ties all four modules together into one CLI.
WHAT IT DOES:
1. Parses CLI arguments (--file or --prompt).
2. Imports each module lazily at runtime with importlib.
3. Runs each module in sequence, threading ZenithPayload through the chain.
4. If a module is missing or throws, prints a yellow SKIP / red ERROR and
continues — the pipeline NEVER crashes due to a missing module.
5. Prints a coloured terminal report after all modules complete.
WHY BUILT THIS WAY:
Lazy imports (importlib.import_module inside try/except) mean each dev can
work on their own branch without breaking the demo on main. A teammate who
hasn't written their module yet doesn't stall the whole team — their slot
just prints ⚠ SKIP and the next module picks up the same payload.
USAGE:
python core_cli.py --file path/to/file.py
python core_cli.py --prompt "Ignore previous instructions..."
python core_cli.py ← demo mode (uses tests/fixtures/vulnerable.py)
BRANCH RULE:
This file lives on main only. Individual module branches NEVER modify it.
"""
import argparse
import sys
import os
from colorama import Fore, Style, init
init(autoreset=True)
def run_pipeline(input_path: str = None, prompt: str = None):
"""
Run the full Zenith pipeline end to end.
Each module is imported lazily inside a try/except.
If a module is missing, it prints a yellow SKIP and continues.
The pipeline NEVER crashes due to a missing module.
Parameters
----------
input_path : str, optional
Path to a source file or dependency manifest to analyse.
prompt : str, optional
Raw prompt string to scan for injection patterns.
Returns
-------
ZenithPayload
The fully (or partially) populated payload after all modules have run.
"""
from src.shared.payload import ZenithPayload
raw = open(input_path).read() if input_path else (prompt or "")
payload = ZenithPayload(input_type="unknown", raw_input=raw, file_path=input_path)
MODULES = [
("Module 1 · Ingress + Shield", "src.dev1_ingress.ingress", "run_ingress", {"raw_input": raw, "file_path": input_path}),
("Module 2 · SAST + CVE", "src.dev2_sast.sast_runner", "run_sast", {"payload": None}),
("Module 3 · Red/Blue Clash", "src.dev3_clash.clash_runner", "run_clash", {"payload": None}),
("Module 4 · Verify + Dashboard", "src.dev4_verify.verify_runner","run_verify", {"payload": None}),
]
for name, module_path, func_name, kwargs in MODULES:
print(f"\n{'─'*50}")
print(f" ▶ {name}")
try:
import importlib
mod = importlib.import_module(module_path)
fn = getattr(mod, func_name)
if func_name == "run_ingress":
payload = fn(**kwargs)
else:
kwargs["payload"] = payload
payload = fn(**kwargs)
print(f" {Fore.GREEN}✓ {name} — COMPLETE{Style.RESET_ALL}")
except ImportError:
print(f" {Fore.YELLOW}⚠ SKIP — {name} not yet built{Style.RESET_ALL}")
except Exception as e:
print(f" {Fore.RED}✗ ERROR in {name}: {e}{Style.RESET_ALL}")
print(f" {Fore.YELLOW} → Pipeline continuing anyway...{Style.RESET_ALL}")
return payload
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Zenith AI Security Pipeline")
parser.add_argument("--file", help="Path to code file to analyze")
parser.add_argument("--prompt", help="Raw prompt string to analyze for injection")
args = parser.parse_args()
if not args.file and not args.prompt:
# Default demo mode
print(f"{Fore.CYAN} Running in DEMO MODE with built-in vulnerable fixture{Style.RESET_ALL}")
run_pipeline(input_path="tests/fixtures/vulnerable.py")
else:
run_pipeline(input_path=args.file, prompt=args.prompt)