diff --git a/src/fix/python.rs b/src/fix/python.rs index 3ffb19e..593e255 100644 --- a/src/fix/python.rs +++ b/src/fix/python.rs @@ -21,7 +21,7 @@ fn check_security(path: &Path) -> Result<(), String> { )); } - if metadata.permissions().mode() & 0o022 == 0 { + if metadata.permissions().mode() & 0o022 != 0 { return Err(format!( "{} Python rule '{}' is writable by non-owners. Aborting to prevent privilege escalation.", "SECURITY ERROR:".red().bold(), @@ -36,6 +36,9 @@ pub fn process_python_rules( command: &Command, rule_paths: Vec, ) -> Result, String> { + if rule_paths.is_empty() { + return Ok(vec![]); + } let module_path = get_common_parent(&rule_paths) .ok_or("No common parent found for rule paths".to_string())?; let mut fixed_commands: Vec = vec![]; @@ -70,24 +73,74 @@ pub fn process_python_rules( continue; } }; - let match_func = module.getattr("match")?; - let fix_func = module.getattr("fix")?; + let match_func = match module.getattr("match") { + Ok(func) => func, + Err(e) => { + eprintln!( + "{}{}{}", + "Failed to get 'match' function from rule '".yellow(), + rule_path.display(), + "': ".yellow(), + ); + eprintln!("{e}"); + continue; + } + }; + let fix_func = match module.getattr("fix") { + Ok(func) => func, + Err(e) => { + eprintln!( + "{}{}{}", + "Failed to get 'fix' function from rule '".yellow(), + rule_path.display(), + "': ".yellow(), + ); + eprintln!("{e}"); + continue; + } + }; if match_func.is_callable() && fix_func.is_callable() { - if match_func + let is_match = match match_func .call1(( command.command(), command.output().stdout(), command.output().stderr(), - ))? - .extract::()? + )) + .and_then(|result| result.extract::()) { - let fixed_command: String = fix_func + Ok(result) => result, + Err(e) => { + eprintln!( + "{}{}{}", + "Failed to execute 'match' function in rule '".yellow(), + rule_path.display(), + "': ".yellow(), + ); + eprintln!("{e}"); + continue; + } + }; + if is_match { + let fixed_command: String = match fix_func .call1(( command.command(), command.output().stdout(), command.output().stderr(), - ))? - .extract()?; + )) + .and_then(|result| result.extract()) + { + Ok(cmd) => cmd, + Err(e) => { + eprintln!( + "{}{}{}", + "Failed to execute 'fix' function in rule '".yellow(), + rule_path.display(), + "': ".yellow(), + ); + eprintln!("{e}"); + continue; + } + }; fixed_commands.push(fixed_command); } } else { @@ -162,3 +215,264 @@ fn get_common_parent(paths: &[PathBuf]) -> Option { Some(common.iter().collect()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::fix::structs::CommandOutput; + use std::fs; + use std::io::Write; + use tempfile::tempdir; + + #[cfg(unix)] + use std::os::unix::fs::PermissionsExt; + + fn dummy_command() -> Command { + let output = CommandOutput::new(String::new(), String::new()); + Command::new("test".to_string(), output) + } + + #[test] + fn common_parent_empty() { + assert_eq!(get_common_parent(&[]), None); + } + + #[test] + fn common_parent_single() { + let paths = vec![PathBuf::from("/a/b/c.py")]; + assert_eq!(get_common_parent(&paths), Some(PathBuf::from("/a/b"))); + } + + #[test] + fn common_parent_multiple_with_common() { + let paths = vec![ + PathBuf::from("/a/b/c/d.py"), + PathBuf::from("/a/b/c/e.py"), + PathBuf::from("/a/b/c/f/g.py"), + ]; + assert_eq!(get_common_parent(&paths), Some(PathBuf::from("/a/b/c"))); + } + + #[test] + fn common_parent_root() { + let paths = vec![PathBuf::from("/a/b/c.py"), PathBuf::from("/d/e/f.py")]; + assert_eq!(get_common_parent(&paths), Some(PathBuf::from("/"))); + } + + #[test] + fn module_name_valid() { + let modules_dir = PathBuf::from("/root/modules"); + let rule_path = PathBuf::from("/root/modules/sub/dir/rule.py"); + assert_eq!( + get_module_name(&modules_dir, &rule_path), + Some("sub.dir.rule".to_string()) + ); + } + + #[test] + fn module_name_not_subpath() { + let modules_dir = PathBuf::from("/root/modules"); + let rule_path = PathBuf::from("/other/place/rule.py"); + assert_eq!(get_module_name(&modules_dir, &rule_path), None); + } + + #[test] + fn module_name_no_file_stem() { + let modules_dir = PathBuf::from("/root"); + let rule_path = PathBuf::from("/"); + assert_eq!(get_module_name(&modules_dir, &rule_path), None); + } + + fn create_rule_file(dir: &Path, name: &str, content: &str) -> PathBuf { + let path = dir.join(name); + fs::create_dir_all(path.parent().unwrap()).unwrap(); + let mut file = fs::File::create(&path).unwrap(); + write!(file, "{}", content).unwrap(); + + #[cfg(unix)] + { + let mut perms = fs::metadata(&path).unwrap().permissions(); + perms.set_mode(0o600); + fs::set_permissions(&path, perms).unwrap(); + } + + path + } + + #[cfg(unix)] + #[test] + fn create_rule_file_sets_correct_permissions() { + let temp = tempdir().unwrap(); + let path = create_rule_file(temp.path(), "perm_check.py", "print('test')"); + let metadata = fs::metadata(&path).unwrap(); + let mode = metadata.permissions().mode() & 0o777; + assert_eq!(mode, 0o600, "File permissions should be set to 600"); + } + + #[cfg(unix)] + #[test] + fn import_fails_if_file_not_readable() { + let temp = tempdir().unwrap(); + let path = temp.path().join("no_read.py"); + { + let mut file = fs::File::create(&path).unwrap(); + writeln!(file, "def match(c,o,e): return True").unwrap(); + writeln!(file, "def fix(c,o,e): return 'fixed'").unwrap(); + } + let mut perms = fs::metadata(&path).unwrap().permissions(); + perms.set_mode(0o200); + fs::set_permissions(&path, perms).unwrap(); + + if fs::File::open(&path).is_ok() { + return; + } + + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![path]); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn process_single_rule_match() { + let temp = tempdir().unwrap(); + let rule_path = create_rule_file( + temp.path(), + "match_ok.py", + r#" +def match(command, stdout, stderr): + return True +def fix(command, stdout, stderr): + return "fixed-command" +"#, + ); + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![rule_path]); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), vec!["fixed-command".to_string()]); + } + + #[test] + fn process_rule_no_match() { + let temp = tempdir().unwrap(); + let rule_path = create_rule_file( + temp.path(), + "no_match.py", + r#" +def match(command, stdout, stderr): + return False +def fix(command, stdout, stderr): + return "should-not-be-called" +"#, + ); + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![rule_path]); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn process_rule_missing_match_func() { + let temp = tempdir().unwrap(); + let rule_path = create_rule_file( + temp.path(), + "missing_match.py", + r#" +def fix(command, stdout, stderr): + return "something" +"#, + ); + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![rule_path]); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn process_rule_match_raises() { + let temp = tempdir().unwrap(); + let rule_path = create_rule_file( + temp.path(), + "match_raises.py", + r#" +def match(command, stdout, stderr): + raise ValueError("oops") +def fix(command, stdout, stderr): + return "fixed" +"#, + ); + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![rule_path]); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn process_rule_fix_raises() { + let temp = tempdir().unwrap(); + let rule_path = create_rule_file( + temp.path(), + "fix_raises.py", + r#" +def match(command, stdout, stderr): + return True +def fix(command, stdout, stderr): + raise Exception("fix failed") +"#, + ); + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![rule_path]); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn process_multiple_rules() { + let temp = tempdir().unwrap(); + let rule1 = create_rule_file( + temp.path(), + "multi1.py", + r#" +def match(c, o, e): return True +def fix(c, o, e): return "cmd1" +"#, + ); + let rule2 = create_rule_file( + temp.path(), + "multi2.py", + r#" +def match(c, o, e): return False +def fix(c, o, e): return "cmd2" +"#, + ); + let rule3 = create_rule_file( + temp.path(), + "multi3.py", + r#" +def match(c, o, e): return True +def fix(c, o, e): return "cmd3" +"#, + ); + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![rule1, rule2, rule3]); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), vec!["cmd1".to_string(), "cmd3".to_string()]); + } + + #[test] + fn process_no_common_parent() { + let paths = vec![PathBuf::from("a/b.py"), PathBuf::from("c/d.py")]; + let cmd = dummy_command(); + let result = process_python_rules(&cmd, paths); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("No common parent found")); + } + + #[test] + fn process_empty_rules() { + let cmd = dummy_command(); + let result = process_python_rules(&cmd, vec![]); + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } +}