Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
09e4f74
adding function to print colored terminal output
yacinemebarki Mar 28, 2026
946deda
agent-governance-toolkit/packages/agent-os/tests
yacinemebarki Mar 28, 2026
50f70e9
keeping the same file format as the main branch
yacinemebarki Mar 28, 2026
5afb439
adding function to print colored terminal output
yacinemebarki Mar 28, 2026
6498c4e
add rich as runtime dependency and improve pytest coverage
yacinemebarki Mar 29, 2026
0944184
Merge branch 'main' into colored/terminal/output
yacinemebarki Mar 29, 2026
556f596
Add MIT license header to test_cli_output.py
yacinemebarki Mar 29, 2026
6a8d7f2
Merge branch 'main' into colored/terminal/output
imran-siddique Mar 30, 2026
93b2b0d
Merge branch 'main' into colored/terminal/output
imran-siddique Mar 30, 2026
8f5eebd
Merge branch 'main' into colored/terminal/output
yacinemebarki Mar 30, 2026
65339fd
Updated to print messages using Rich Console when available
yacinemebarki Mar 30, 2026
0515a12
improve error handling and redirect policy violations to stderr
yacinemebarki Mar 30, 2026
81383d9
Sanitize CLI output and document rich fallback
yacinemebarki Mar 30, 2026
26c2e7c
robust fallback for rich Text and safer output handling
yacinemebarki Mar 30, 2026
8c7c503
improve improve error handling
yacinemebarki Mar 30, 2026
14551fb
Fix test_policy_violation to capture stderr instead of stdout
yacinemebarki Mar 30, 2026
9ea93b8
Fix test_policy_cli to capture stderr and clean lint issues
yacinemebarki Mar 30, 2026
23d9413
Merge branch 'main' into colored/terminal/output
imran-siddique Mar 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/agent-os/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ classifiers = [
# Core dependencies (minimal - just what's needed for kernel)
dependencies = [
"pydantic>=2.4.0",
"rich>=13.0.0"
]

[project.urls]
Expand Down
117 changes: 96 additions & 21 deletions packages/agent-os/src/agent_os/policies/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ def _import_yaml() -> Any:
print("ERROR: pyyaml is required — pip install pyyaml", file=sys.stderr)
sys.exit(2)

def _import_console(no_color: bool = False, use_stderr: bool = False):
'''Lazy import for rich.console.Console.'''
try:
from rich.console import Console

return Console(stderr=use_stderr, no_color=no_color)
except (ModuleNotFoundError, ImportError):
print("WARNING: rich library not installed, using plain text output", file=sys.stderr)
return None

def _import_rich_text() ->Any:
'''Lazy import for rich.text.Text.'''
try:
from rich.text import Text

return Text
except ImportError as e:
print(f"WARNING: {e}", file=sys.stderr)
return None

def _load_file(path: Path) -> dict[str, Any]:
"""Load a YAML or JSON file and return the parsed dict."""
Expand All @@ -60,6 +79,57 @@ def _load_file(path: Path) -> dict[str, Any]:
return data


# ---------------------------------------------------------------------------
# Colored output helpers — use rich when available, fall back to plain text.
# ---------------------------------------------------------------------------


def error(msg: str, no_color: bool = False) -> None:
"""Print an error message in red to stderr."""
console = _import_console(no_color, use_stderr=True)
Text = _import_rich_text()
if console and Text:
console.print(Text(msg), style="red")
else:
print(msg, file=sys.stderr)

def success(msg: str, no_color: bool = False) -> None:
"""Print a success message in green to stdout."""
console = _import_console(no_color, use_stderr=False)
Text = _import_rich_text()
if console and Text:
console.print(Text(msg), style="green")
else:
print(msg)

def warn(msg: str, no_color: bool = False) -> None:
"""Print a warning message in yellow to stdout."""
console = _import_console(no_color, use_stderr=False)
Text = _import_rich_text()
if console and Text:
console.print(Text(msg), style="yellow")
else:
print(msg)

def policy_violation(msg: str, no_color: bool = False) -> None:
"""Print a policy violation message in bold red to stdout."""
console = _import_console(no_color, use_stderr=True)
Text = _import_rich_text()
if console and Text:
console.print(Text(msg), style="bold red")
else:
print(msg)

def passed_check(msg: str, no_color: bool = False) -> None:
"""Print a passed-check message with a green checkmark to stdout."""
console = _import_console(no_color, use_stderr=False)
Text = _import_rich_text()
if console and Text:
console.print(Text(f"\u2714 {msg}"), style="green")
else:
print(f"\u2714 {msg}")


# ============================================================================
# validate
# ============================================================================
Expand All @@ -71,13 +141,13 @@ def cmd_validate(args: argparse.Namespace) -> int:

path = Path(args.path)
if not path.exists():
print(f"ERROR: file not found: {path}", file=sys.stderr)
error(f"ERROR: file not found: {path}")
return 2

try:
data = _load_file(path)
except Exception as exc:
print(f"ERROR: failed to parse {path}: {exc}", file=sys.stderr)
error(f"ERROR: failed to parse {path}: {exc}")
return 2

# --- Optional JSON-Schema validation (best-effort) --------------------
Expand All @@ -91,22 +161,22 @@ def cmd_validate(args: argparse.Namespace) -> int:
except ImportError:
pass # jsonschema not installed — skip, rely on Pydantic
except jsonschema.ValidationError as ve:
print(f"FAIL: {path}")
print(f" JSON-Schema error: {ve.message}")
policy_violation(f"FAIL: {path}")
policy_violation(f" JSON-Schema error: {ve.message}")
if ve.absolute_path:
print(f" Location: {' -> '.join(str(p) for p in ve.absolute_path)}")
policy_violation(f" Location: {' -> '.join(str(p) for p in ve.absolute_path)}")
return 1

# --- Pydantic validation (authoritative) ------------------------------
try:
PolicyDocument.model_validate(data)
except Exception as exc:
print(f"FAIL: {path}")
policy_violation(f"FAIL: {path}")
for line in str(exc).splitlines():
print(f" {line}")
policy_violation(f" {line}")
return 1

print(f"OK: {path}")
success(f"OK: {path}")
return 0


Expand All @@ -125,7 +195,7 @@ def cmd_test(args: argparse.Namespace) -> int:

for p in (policy_path, scenarios_path):
if not p.exists():
print(f"ERROR: file not found: {p}", file=sys.stderr)
error(f"ERROR: file not found: {p}")
return 2

# Load the policy
Expand All @@ -135,19 +205,19 @@ def cmd_test(args: argparse.Namespace) -> int:
try:
doc = PolicyDocument.from_json(policy_path)
except Exception:
print(f"ERROR: failed to load policy {policy_path}: {exc}", file=sys.stderr)
error(f"ERROR: failed to load policy {policy_path}: {exc}")
return 2

# Load scenarios
try:
scenarios_data = _load_file(scenarios_path)
except Exception as exc:
print(f"ERROR: failed to parse scenarios {scenarios_path}: {exc}", file=sys.stderr)
error(f"ERROR: failed to parse scenarios {scenarios_path}: {exc}")
return 2

scenarios = scenarios_data.get("scenarios", [])
if not scenarios:
print("ERROR: no scenarios found in test file", file=sys.stderr)
error("ERROR: no scenarios found in test file")
return 2

evaluator = PolicyEvaluator(policies=[doc])
Expand Down Expand Up @@ -175,14 +245,19 @@ def cmd_test(args: argparse.Namespace) -> int:

if errors:
failed += 1
print(f" FAIL: {name}")
policy_violation(f" FAIL: {name}")
for err in errors:
print(f" - {err}")
policy_violation(f" - {err}")
else:
passed += 1
print(f" PASS: {name}")
passed_check(f" PASS: {name}")

summary = f"\n{passed}/{total} scenarios passed."
if failed > 0:
warn(summary)
else:
success(summary)

print(f"\n{passed}/{total} scenarios passed.")
return 1 if failed > 0 else 0


Expand All @@ -200,14 +275,14 @@ def cmd_diff(args: argparse.Namespace) -> int:

for p in (path1, path2):
if not p.exists():
print(f"ERROR: file not found: {p}", file=sys.stderr)
error(f"ERROR: file not found: {p}")
return 2

try:
doc1 = PolicyDocument.model_validate(_load_file(path1))
doc2 = PolicyDocument.model_validate(_load_file(path2))
except Exception as exc:
print(f"ERROR: failed to load policies: {exc}", file=sys.stderr)
error(f"ERROR: failed to load policies: {exc}")
return 2

differences: list[str] = []
Expand Down Expand Up @@ -271,12 +346,12 @@ def cmd_diff(args: argparse.Namespace) -> int:
differences.append(f" rule '{name}' message changed")

if differences:
print(f"Differences between {path1} and {path2}:")
warn(f"Differences between {path1} and {path2}:")
for diff in differences:
print(diff)
warn(diff)
return 1
else:
print(f"No differences between {path1} and {path2}.")
success(f"No differences between {path1} and {path2}.")
return 0


Expand Down
33 changes: 33 additions & 0 deletions packages/agent-os/tests/test_cli_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from agent_os.policies.cli import success,error,warn,policy_violation,passed_check

def test_success(capsys):
success("This is a success message")
captured = capsys.readouterr()
assert "success message" in captured.out.lower()


def test_error(capsys):
error("This is an error message")
captured = capsys.readouterr()
assert "error message" in captured.err.lower()


def test_warn(capsys):
warn("This is a warning message")
captured = capsys.readouterr()
assert "warning message" in captured.out.lower()


def test_policy_violation_output(capsys):
policy_violation("FAIL: invalid policy")
captured = capsys.readouterr()
assert "fail" in captured.err.lower()


def test_passed_check(capsys):
passed_check("Test passed successfully!")
captured = capsys.readouterr()
assert "test passed" in captured.out.lower()
12 changes: 6 additions & 6 deletions packages/agent-os/tests/test_policy_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
import json
from pathlib import Path

import pytest
import yaml

from agent_os.policies.cli import cmd_diff, cmd_test, cmd_validate, main
from agent_os.policies.cli import main
from agent_os.policies.schema import (
PolicyAction,
PolicyCondition,
Expand Down Expand Up @@ -146,7 +145,7 @@ def test_validate_invalid_action(self, tmp_path, capsys):
policy_file = _write_yaml(tmp_path / "bad.yaml", data)
rc = main(["validate", str(policy_file)])
assert rc == 1
assert "FAIL" in capsys.readouterr().out
assert "FAIL" in capsys.readouterr().err

def test_validate_missing_required_field(self, tmp_path, capsys):
data = _valid_policy_dict()
Expand Down Expand Up @@ -228,9 +227,10 @@ def test_failing_scenario(self, tmp_path, capsys):
scenarios_file = _write_yaml(tmp_path / "scenarios.yaml", scenarios)
rc = main(["test", str(policy_file), str(scenarios_file)])
assert rc == 1
out = capsys.readouterr().out
assert "FAIL" in out
assert "0/1 scenarios passed" in out
captured = capsys.readouterr()
assert "FAIL" in captured.err
assert "0/1 scenarios passed" in captured.out


def test_missing_policy_file(self, tmp_path, capsys):
scenarios_file = _write_yaml(tmp_path / "scenarios.yaml", {"scenarios": [{"name": "x"}]})
Expand Down
Loading