77policies on discovered AI/LLM components.
88
99Supported policy patterns:
10+ - forbid (principal, action, resource) when { ... };
11+ - forbid (principal, action == Action::"deploy", resource) when { ... };
1012 - forbid ... when { resource.severity == "critical" };
1113 - forbid ... when { resource.provider == "DeepSeek" };
1214 - forbid ... when { resource.component_type == "llm-api" };
1315 - forbid ... when { resource.risk_score > 75 };
1416
1517Usage:
16- python3 cedar-gate.py <scan-results.json> <policy.cedar> [--summary <path>]
18+ python3 cedar-gate.py <scan-results.json> <policy.cedar> [options]
19+
20+ Options:
21+ --summary <path> Write violation report to file (GitHub Actions summary)
22+ --fail-on-severity <sev> Only fail on violations at or above this severity
23+ --annotations Emit GitHub Actions annotations (::error, ::warning)
24+ --entities <path> Path to Cedar entities JSON file for additional context
1725
1826Exit codes:
1927 0 = all policies passed
2331
2432from __future__ import annotations
2533
34+ import argparse
2635import json
2736import re
2837import sys
@@ -50,18 +59,28 @@ class Violation:
5059 component_name : str
5160 component_type : str
5261 actual_value : Any
62+ severity : str = ""
63+ file_path : str = ""
64+ line_number : int = 0
5365
5466
5567SEVERITY_ORDER = {"critical" : 4 , "high" : 3 , "medium" : 2 , "low" : 1 , "info" : 0 , "none" : 0 }
5668
5769# Regex patterns for Cedar-like policy syntax
58- # Matches : forbid ( principal, action == Action::"deploy", resource ) when { ... };
59- RULE_PATTERN = re .compile (
70+ # Pattern 1 : forbid (principal, action == Action::"deploy", resource) when { ... };
71+ RULE_PATTERN_TYPED = re .compile (
6072 r'forbid\s*\(\s*principal\s*,\s*action\s*==\s*Action::"(\w+)"\s*,\s*resource\s*\)'
6173 r'\s*when\s*\{([^}]+)\}\s*;' ,
6274 re .MULTILINE | re .DOTALL ,
6375)
6476
77+ # Pattern 2: forbid (principal, action, resource) when { ... };
78+ RULE_PATTERN_SIMPLE = re .compile (
79+ r'forbid\s*\(\s*principal\s*,\s*action\s*,\s*resource\s*\)'
80+ r'\s*when\s*\{([^}]+)\}\s*;' ,
81+ re .MULTILINE | re .DOTALL ,
82+ )
83+
6584# Matches conditions inside when { ... }
6685# e.g. resource.severity == "critical" or resource.risk_score > 75
6786CONDITION_PATTERN = re .compile (
@@ -76,7 +95,8 @@ def parse_policy(policy_text: str) -> list[PolicyRule]:
7695 # Strip comments (// style)
7796 cleaned = re .sub (r'//[^\n]*' , '' , policy_text )
7897
79- for match in RULE_PATTERN .finditer (cleaned ):
98+ # Match typed action rules: action == Action::"deploy"
99+ for match in RULE_PATTERN_TYPED .finditer (cleaned ):
80100 action = match .group (1 )
81101 body = match .group (2 ).strip ()
82102
@@ -85,15 +105,7 @@ def parse_policy(policy_text: str) -> list[PolicyRule]:
85105 operator = cond .group (2 )
86106 raw_value = cond .group (3 ).strip ()
87107
88- # Try to parse as number
89- try :
90- value : str | int | float = int (raw_value )
91- except ValueError :
92- try :
93- value = float (raw_value )
94- except ValueError :
95- value = raw_value
96-
108+ value = _parse_value (raw_value )
97109 rules .append (
98110 PolicyRule (
99111 action = action ,
@@ -104,9 +116,40 @@ def parse_policy(policy_text: str) -> list[PolicyRule]:
104116 )
105117 )
106118
119+ # Match simple rules: (principal, action, resource)
120+ for match in RULE_PATTERN_SIMPLE .finditer (cleaned ):
121+ body = match .group (1 ).strip ()
122+
123+ for cond in CONDITION_PATTERN .finditer (body ):
124+ field_name = cond .group (1 )
125+ operator = cond .group (2 )
126+ raw_value = cond .group (3 ).strip ()
127+
128+ value = _parse_value (raw_value )
129+ rules .append (
130+ PolicyRule (
131+ action = "*" ,
132+ field = field_name ,
133+ operator = operator ,
134+ value = value ,
135+ raw = match .group (0 ).strip (),
136+ )
137+ )
138+
107139 return rules
108140
109141
142+ def _parse_value (raw_value : str ) -> str | int | float :
143+ """Try to parse a value as number, fall back to string."""
144+ try :
145+ return int (raw_value )
146+ except ValueError :
147+ try :
148+ return float (raw_value )
149+ except ValueError :
150+ return raw_value
151+
152+
110153def evaluate_condition (rule : PolicyRule , component : dict [str , Any ]) -> bool :
111154 """Check if a single component violates a rule. Returns True if violated."""
112155 # Map Cedar field names to AI-BOM scan result keys
@@ -175,28 +218,65 @@ def evaluate_condition(rule: PolicyRule, component: dict[str, Any]) -> bool:
175218
176219
177220def evaluate (
178- components : list [dict [str , Any ]], rules : list [PolicyRule ]
221+ components : list [dict [str , Any ]],
222+ rules : list [PolicyRule ],
223+ entities : dict [str , Any ] | None = None ,
179224) -> list [Violation ]:
180225 """Evaluate all components against all rules. Returns list of violations."""
181226 violations : list [Violation ] = []
182227
228+ # Merge entity attributes into components if entities file provided
229+ entity_map : dict [str , dict [str , Any ]] = {}
230+ if entities :
231+ for entity in entities .get ("entities" , []):
232+ uid = entity .get ("uid" , {})
233+ entity_id = uid .get ("id" , "" ) if isinstance (uid , dict ) else str (uid )
234+ if entity_id :
235+ entity_map [entity_id ] = entity .get ("attrs" , {})
236+
183237 for component in components :
238+ # Enrich component with entity attributes if available
239+ enriched = dict (component )
240+ comp_name = component .get ("name" , "" )
241+ if comp_name in entity_map :
242+ for k , v in entity_map [comp_name ].items ():
243+ if k not in enriched :
244+ enriched [k ] = v
245+
184246 for rule in rules :
185- if evaluate_condition (rule , component ):
247+ if evaluate_condition (rule , enriched ):
186248 violations .append (
187249 Violation (
188250 rule = rule ,
189- component_name = component .get ("name" , "unknown" ),
190- component_type = component .get ("component_type" , "unknown" ),
191- actual_value = component .get (
192- rule .field , component .get (rule .field , "N/A" )
193- ),
251+ component_name = enriched .get ("name" , "unknown" ),
252+ component_type = enriched .get ("component_type" , "unknown" ),
253+ actual_value = enriched .get (rule .field , "N/A" ),
254+ severity = str (enriched .get ("severity" , "" )).lower (),
255+ file_path = enriched .get ("file_path" , "" ),
256+ line_number = enriched .get ("line_number" , 0 ),
194257 )
195258 )
196259
197260 return violations
198261
199262
263+ def filter_by_severity (
264+ violations : list [Violation ], min_severity : str
265+ ) -> list [Violation ]:
266+ """Filter violations to only include those at or above the given severity."""
267+ threshold = SEVERITY_ORDER .get (min_severity .lower (), 0 )
268+ if threshold == 0 :
269+ return violations
270+
271+ filtered = []
272+ for v in violations :
273+ # Determine the severity of the violation
274+ sev = v .severity or "none"
275+ if SEVERITY_ORDER .get (sev , 0 ) >= threshold :
276+ filtered .append (v )
277+ return filtered
278+
279+
200280def extract_components (scan_data : dict [str , Any ]) -> list [dict [str , Any ]]:
201281 """Extract the component list from various AI-BOM output formats."""
202282 # Direct list at top level
@@ -225,6 +305,14 @@ def extract_components(scan_data: dict[str, Any]) -> list[dict[str, Any]]:
225305 "provider" : result .get ("properties" , {}).get ("provider" , "unknown" ),
226306 "risk_score" : result .get ("properties" , {}).get ("risk_score" , 0 ),
227307 }
308+ # Extract file location from SARIF
309+ locations = result .get ("locations" , [])
310+ if locations :
311+ phys = locations [0 ].get ("physicalLocation" , {})
312+ artifact = phys .get ("artifactLocation" , {})
313+ comp ["file_path" ] = artifact .get ("uri" , "" )
314+ region = phys .get ("region" , {})
315+ comp ["line_number" ] = region .get ("startLine" , 0 )
228316 components .append (comp )
229317 return components
230318
@@ -270,22 +358,52 @@ def format_violation_report(violations: list[Violation]) -> str:
270358 return "\n " .join (lines )
271359
272360
273- def main () -> int :
274- if len (sys .argv ) < 3 :
275- print (
276- "Usage: cedar-gate.py <scan-results.json> <policy.cedar> [--summary <path>]" ,
277- file = sys .stderr ,
361+ def emit_annotations (violations : list [Violation ]) -> None :
362+ """Emit GitHub Actions annotations for each violation."""
363+ for v in violations :
364+ level = "error" if v .severity in ("critical" , "high" ) else "warning"
365+ msg = (
366+ f"Policy violation: { v .component_name } ({ v .component_type } ) — "
367+ f"resource.{ v .rule .field } { v .rule .operator } { v .rule .value } "
368+ f"(actual: { v .actual_value } )"
278369 )
279- return 2
370+ if v .file_path and v .line_number :
371+ print (f"::{ level } file={ v .file_path } ,line={ v .line_number } ::{ msg } " )
372+ elif v .file_path :
373+ print (f"::{ level } file={ v .file_path } ::{ msg } " )
374+ else :
375+ print (f"::{ level } ::{ msg } " )
280376
281- results_path = Path (sys .argv [1 ])
282- policy_path = Path (sys .argv [2 ])
283377
284- summary_path : Path | None = None
285- if "--summary" in sys .argv :
286- idx = sys .argv .index ("--summary" )
287- if idx + 1 < len (sys .argv ):
288- summary_path = Path (sys .argv [idx + 1 ])
378+ def parse_args () -> argparse .Namespace :
379+ parser = argparse .ArgumentParser (
380+ description = "Cedar-like policy gate for AI-BOM scan results"
381+ )
382+ parser .add_argument ("results" , help = "Path to scan results JSON file" )
383+ parser .add_argument ("policy" , help = "Path to Cedar policy file" )
384+ parser .add_argument ("--summary" , help = "Path to write violation report" )
385+ parser .add_argument (
386+ "--fail-on-severity" ,
387+ choices = ["critical" , "high" , "medium" , "low" ],
388+ help = "Only fail on violations at or above this severity" ,
389+ )
390+ parser .add_argument (
391+ "--annotations" ,
392+ action = "store_true" ,
393+ help = "Emit GitHub Actions ::error/::warning annotations" ,
394+ )
395+ parser .add_argument (
396+ "--entities" ,
397+ help = "Path to Cedar entities JSON file for additional context" ,
398+ )
399+ return parser .parse_args ()
400+
401+
402+ def main () -> int :
403+ args = parse_args ()
404+
405+ results_path = Path (args .results )
406+ policy_path = Path (args .policy )
289407
290408 # Load scan results
291409 if not results_path .exists ():
@@ -305,6 +423,16 @@ def main() -> int:
305423
306424 policy_text = policy_path .read_text (encoding = "utf-8" )
307425
426+ # Load entities (optional)
427+ entities : dict [str , Any ] | None = None
428+ if args .entities :
429+ entities_path = Path (args .entities )
430+ if entities_path .exists ():
431+ try :
432+ entities = json .loads (entities_path .read_text (encoding = "utf-8" ))
433+ except json .JSONDecodeError as e :
434+ print (f"Warning: invalid JSON in entities file: { e } " , file = sys .stderr )
435+
308436 # Parse
309437 rules = parse_policy (policy_text )
310438 if not rules :
@@ -320,14 +448,23 @@ def main() -> int:
320448 print (f"Evaluating { len (rules )} rule(s) against { len (components )} component(s)..." )
321449
322450 # Evaluate
323- violations = evaluate (components , rules )
451+ violations = evaluate (components , rules , entities )
452+
453+ # Filter by severity threshold if specified
454+ if args .fail_on_severity and violations :
455+ violations = filter_by_severity (violations , args .fail_on_severity )
324456
325457 if violations :
326458 report = format_violation_report (violations )
327459 print (report )
328460
461+ # Emit GitHub Actions annotations
462+ if args .annotations :
463+ emit_annotations (violations )
464+
329465 # Write GitHub Actions summary if path provided
330- if summary_path :
466+ if args .summary :
467+ summary_path = Path (args .summary )
331468 with open (summary_path , "a" , encoding = "utf-8" ) as f :
332469 f .write (report )
333470 f .write ("\n " )
0 commit comments