Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
09e4f74
adding function to print colored terminal output
yacinemebarki Mar 28, 2026
946deda
agent-governance-toolkit/packages/agent-os/tests
yacinemebarki Mar 28, 2026
50f70e9
keeping the same file format as the main branch
yacinemebarki Mar 28, 2026
5afb439
adding function to print colored terminal output
yacinemebarki Mar 28, 2026
6498c4e
add rich as runtime dependency and improve pytest coverage
yacinemebarki Mar 29, 2026
0944184
Merge branch 'main' into colored/terminal/output
yacinemebarki Mar 29, 2026
556f596
Add MIT license header to test_cli_output.py
yacinemebarki Mar 29, 2026
6a8d7f2
Merge branch 'main' into colored/terminal/output
imran-siddique Mar 30, 2026
93b2b0d
Merge branch 'main' into colored/terminal/output
imran-siddique Mar 30, 2026
8f5eebd
Merge branch 'main' into colored/terminal/output
yacinemebarki Mar 30, 2026
65339fd
Updated to print messages using Rich Console when available
yacinemebarki Mar 30, 2026
0515a12
improve error handling and redirect policy violations to stderr
yacinemebarki Mar 30, 2026
81383d9
Sanitize CLI output and document rich fallback
yacinemebarki Mar 30, 2026
26c2e7c
robust fallback for rich Text and safer output handling
yacinemebarki Mar 30, 2026
8c7c503
improve improve error handling
yacinemebarki Mar 30, 2026
14551fb
Fix test_policy_violation to capture stderr instead of stdout
yacinemebarki Mar 30, 2026
9ea93b8
Fix test_policy_cli to capture stderr and clean lint issues
yacinemebarki Mar 30, 2026
23d9413
Merge branch 'main' into colored/terminal/output
imran-siddique Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 69 additions & 111 deletions packages/agent-os/src/agent_os/policies/cli.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Policy-as-code CLI for Agent-OS governance.

Provides commands to validate, test, and diff declarative policy documents
without external dependencies beyond the standard library and pydantic/pyyaml.

Usage:
python -m agent_os.policies.cli validate <path>
python -m agent_os.policies.cli test <policy_path> <test_scenarios_path>
python -m agent_os.policies.cli diff <path1> <path2>

Exit codes:
0 - Success
1 - Validation failure / test failure
2 - Runtime error (file not found, parse error, etc.)
"""

from __future__ import annotations

import argparse
Expand All @@ -26,20 +7,43 @@
from typing import Any

# ---------------------------------------------------------------------------
# Lazy imports — keep startup fast and give clear messages when deps missing.
# Colored output functions using rich
# ---------------------------------------------------------------------------

try:
from rich import print as rprint
except ImportError:
# fallback to normal print if rich not installed
def rprint(msg, **kwargs):
print(msg)

def error(msg):
rprint(f"[red]{msg}[/red]", file=sys.stderr)

def success(msg):
rprint(f"[green]{msg}[/green]")

def warn(msg):
rprint(f"[yellow]{msg}[/yellow]")

def policy_violation(msg):
rprint(f"[bold red]{msg}[/bold red]")

def passed_check(msg):
rprint(f"[green]✔ {msg}[/green]")

# ---------------------------------------------------------------------------
# Lazy imports
# ---------------------------------------------------------------------------

def _import_yaml() -> Any:
try:
import yaml

return yaml
except ImportError:
print("ERROR: pyyaml is required — pip install pyyaml", file=sys.stderr)
error("ERROR: pyyaml is required — pip install pyyaml")
sys.exit(2)


def _load_file(path: Path) -> dict[str, Any]:
"""Load a YAML or JSON file and return the parsed dict."""
text = path.read_text(encoding="utf-8")
Expand All @@ -49,7 +53,7 @@ def _load_file(path: Path) -> dict[str, Any]:
elif path.suffix == ".json":
data = json.loads(text)
else:
# Try YAML first, fall back to JSON
# Try YAML first, fallback to JSON
yaml = _import_yaml()
try:
data = yaml.safe_load(text)
Expand All @@ -59,64 +63,56 @@ def _load_file(path: Path) -> dict[str, Any]:
raise ValueError(f"Expected a mapping at top level, got {type(data).__name__}")
return data


# ============================================================================
# validate
# ============================================================================


def cmd_validate(args: argparse.Namespace) -> int:
"""Validate a policy YAML/JSON file against the PolicyDocument schema."""
from .schema import PolicyDocument # noqa: E402

path = Path(args.path)
if not path.exists():
print(f"ERROR: file not found: {path}", file=sys.stderr)
error(f"ERROR: file not found: {path}")
return 2

try:
data = _load_file(path)
except Exception as exc:
print(f"ERROR: failed to parse {path}: {exc}", file=sys.stderr)
error(f"ERROR: failed to parse {path}: {exc}")
return 2

# --- Optional JSON-Schema validation (best-effort) --------------------
# Optional JSON-Schema validation
schema_path = Path(__file__).with_name("policy_schema.json")
if schema_path.exists():
try:
import jsonschema # type: ignore[import-untyped]

schema = json.loads(schema_path.read_text(encoding="utf-8"))
jsonschema.validate(instance=data, schema=schema)
except ImportError:
pass # jsonschema not installed — skip, rely on Pydantic
pass
except jsonschema.ValidationError as ve:
print(f"FAIL: {path}")
print(f" JSON-Schema error: {ve.message}")
policy_violation(f"FAIL: {path}")
error(f" JSON-Schema error: {ve.message}")
if ve.absolute_path:
print(f" Location: {' -> '.join(str(p) for p in ve.absolute_path)}")
warn(f" Location: {' -> '.join(str(p) for p in ve.absolute_path)}")
return 1

# --- Pydantic validation (authoritative) ------------------------------
try:
PolicyDocument.model_validate(data)
except Exception as exc:
print(f"FAIL: {path}")
policy_violation(f"FAIL: {path}")
for line in str(exc).splitlines():
print(f" {line}")
error(f" {line}")
return 1

print(f"OK: {path}")
success(f"OK: {path}")
return 0


# ============================================================================
# test
# ============================================================================


def cmd_test(args: argparse.Namespace) -> int:
"""Test a policy against a set of scenarios."""
from .evaluator import PolicyEvaluator # noqa: E402
from .schema import PolicyDocument # noqa: E402

Expand All @@ -125,29 +121,29 @@ def cmd_test(args: argparse.Namespace) -> int:

for p in (policy_path, scenarios_path):
if not p.exists():
print(f"ERROR: file not found: {p}", file=sys.stderr)
error(f"ERROR: file not found: {p}")
return 2

# Load the policy
# Load policy
try:
doc = PolicyDocument.from_yaml(policy_path)
except Exception as exc:
try:
doc = PolicyDocument.from_json(policy_path)
except Exception:
print(f"ERROR: failed to load policy {policy_path}: {exc}", file=sys.stderr)
error(f"ERROR: failed to load policy {policy_path}: {exc}")
return 2

# Load scenarios
try:
scenarios_data = _load_file(scenarios_path)
except Exception as exc:
print(f"ERROR: failed to parse scenarios {scenarios_path}: {exc}", file=sys.stderr)
error(f"ERROR: failed to parse scenarios {scenarios_path}: {exc}")
return 2

scenarios = scenarios_data.get("scenarios", [])
if not scenarios:
print("ERROR: no scenarios found in test file", file=sys.stderr)
error("ERROR: no scenarios found in test file")
return 2

evaluator = PolicyEvaluator(policies=[doc])
Expand All @@ -165,49 +161,42 @@ def cmd_test(args: argparse.Namespace) -> int:
errors: list[str] = []

if expected_allowed is not None and decision.allowed != expected_allowed:
errors.append(
f"expected allowed={expected_allowed}, got allowed={decision.allowed}"
)
errors.append(f"expected allowed={expected_allowed}, got allowed={decision.allowed}")
if expected_action is not None and decision.action != expected_action:
errors.append(
f"expected action='{expected_action}', got action='{decision.action}'"
)
errors.append(f"expected action='{expected_action}', got action='{decision.action}'")

if errors:
failed += 1
print(f" FAIL: {name}")
policy_violation(f"FAIL: {name}")
for err in errors:
print(f" - {err}")
error(f" - {err}")
else:
passed += 1
print(f" PASS: {name}")
passed_check(name)

print(f"\n{passed}/{total} scenarios passed.")
success(f"\n{passed}/{total} scenarios passed.")
return 1 if failed > 0 else 0


# ============================================================================
# diff
# ============================================================================


def cmd_diff(args: argparse.Namespace) -> int:
"""Show differences between two policy files."""
from .schema import PolicyDocument # noqa: E402

path1 = Path(args.path1)
path2 = Path(args.path2)

for p in (path1, path2):
if not p.exists():
print(f"ERROR: file not found: {p}", file=sys.stderr)
error(f"ERROR: file not found: {p}")
return 2

try:
doc1 = PolicyDocument.model_validate(_load_file(path1))
doc2 = PolicyDocument.model_validate(_load_file(path2))
except Exception as exc:
print(f"ERROR: failed to load policies: {exc}", file=sys.stderr)
error(f"ERROR: failed to load policies: {exc}")
return 2

differences: list[str] = []
Expand All @@ -222,23 +211,13 @@ def cmd_diff(args: argparse.Namespace) -> int:

# Default changes
if doc1.defaults.action != doc2.defaults.action:
differences.append(
f" defaults.action: '{doc1.defaults.action.value}' -> '{doc2.defaults.action.value}'"
)
differences.append(f" defaults.action: '{doc1.defaults.action.value}' -> '{doc2.defaults.action.value}'")
if doc1.defaults.max_tokens != doc2.defaults.max_tokens:
differences.append(
f" defaults.max_tokens: {doc1.defaults.max_tokens} -> {doc2.defaults.max_tokens}"
)
differences.append(f" defaults.max_tokens: {doc1.defaults.max_tokens} -> {doc2.defaults.max_tokens}")
if doc1.defaults.max_tool_calls != doc2.defaults.max_tool_calls:
differences.append(
f" defaults.max_tool_calls: "
f"{doc1.defaults.max_tool_calls} -> {doc2.defaults.max_tool_calls}"
)
differences.append(f" defaults.max_tool_calls: {doc1.defaults.max_tool_calls} -> {doc2.defaults.max_tool_calls}")
if doc1.defaults.confidence_threshold != doc2.defaults.confidence_threshold:
differences.append(
f" defaults.confidence_threshold: "
f"{doc1.defaults.confidence_threshold} -> {doc2.defaults.confidence_threshold}"
)
differences.append(f" defaults.confidence_threshold: {doc1.defaults.confidence_threshold} -> {doc2.defaults.confidence_threshold}")

# Rule changes
rules1 = {r.name: r for r in doc1.rules}
Expand All @@ -258,75 +237,54 @@ def cmd_diff(args: argparse.Namespace) -> int:
r1 = rules1[name]
r2 = rules2[name]
if r1.action != r2.action:
differences.append(
f" rule '{name}' action: '{r1.action.value}' -> '{r2.action.value}'"
)
differences.append(f" rule '{name}' action: '{r1.action.value}' -> '{r2.action.value}'")
if r1.priority != r2.priority:
differences.append(
f" rule '{name}' priority: {r1.priority} -> {r2.priority}"
)
differences.append(f" rule '{name}' priority: {r1.priority} -> {r2.priority}")
if r1.condition != r2.condition:
differences.append(f" rule '{name}' condition changed")
if r1.message != r2.message:
differences.append(f" rule '{name}' message changed")

if differences:
print(f"Differences between {path1} and {path2}:")
success(f"Differences between {path1} and {path2}:")
for diff in differences:
print(diff)
return 1
else:
print(f"No differences between {path1} and {path2}.")
success(f"No differences between {path1} and {path2}.")
return 0


# ============================================================================
# Main entry point
# ============================================================================


def main(argv: list[str] | None = None) -> int:
"""Parse arguments and dispatch to the appropriate subcommand."""
parser = argparse.ArgumentParser(
prog="policy-cli",
description="Agent-OS policy-as-code CLI for validation, testing, and diffing.",
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")

# -- validate ----------------------------------------------------------
p_validate = subparsers.add_parser(
"validate",
help="Validate a policy YAML/JSON file against the schema.",
)
# validate
p_validate = subparsers.add_parser("validate", help="Validate a policy YAML/JSON file.")
p_validate.add_argument("path", help="Path to the policy file to validate.")

# -- test --------------------------------------------------------------
p_test = subparsers.add_parser(
"test",
help="Test a policy against a set of scenarios.",
)
# test
p_test = subparsers.add_parser("test", help="Test a policy against scenarios.")
p_test.add_argument("policy_path", help="Path to the policy file.")
p_test.add_argument("test_scenarios_path", help="Path to the test scenarios YAML.")
p_test.add_argument("test_scenarios_path", help="Path to test scenarios YAML.")

# -- diff --------------------------------------------------------------
p_diff = subparsers.add_parser(
"diff",
help="Show differences between two policy files.",
)
p_diff.add_argument("path1", help="Path to the first policy file.")
p_diff.add_argument("path2", help="Path to the second policy file.")
# diff
p_diff = subparsers.add_parser("diff", help="Show differences between two policy files.")
p_diff.add_argument("path1", help="Path to first policy file.")
p_diff.add_argument("path2", help="Path to second policy file.")

args = parser.parse_args(argv)

if args.command is None:
parser.print_help()
return 2

dispatch = {
"validate": cmd_validate,
"test": cmd_test,
"diff": cmd_diff,
}
dispatch = {"validate": cmd_validate, "test": cmd_test, "diff": cmd_diff}
return dispatch[args.command](args)


Expand Down
25 changes: 25 additions & 0 deletions packages/agent-os/tests/test_cli_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# test_cli_run.py
from agent_os.policies.cli import success, error, warn, policy_violation, passed_check

def main():
print("=== Testing CLI functions with colored output ===\n")

print("1️⃣ success():")
success("This is a success message")

print("\n2️⃣ error():")
error("This is an error message")

print("\n3️⃣ warn():")
warn("This is a warning message")

print("\n4️⃣ policy_violation():")
policy_violation("Policy violation detected!")

print("\n5️⃣ passed_check():")
passed_check("Test passed successfully!")

print("\n=== Done testing CLI functions ===")

if __name__ == "__main__":
main()
Loading