diff --git a/tests/act_testing/__init__.py b/tests/act_testing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/act_testing/act_runner.py b/tests/act_testing/act_runner.py new file mode 100644 index 00000000..d3d22d59 --- /dev/null +++ b/tests/act_testing/act_runner.py @@ -0,0 +1,251 @@ +""" +ACT CLI wrapper for running GitHub Actions workflows locally. + +This module provides a clean Python interface for testing GitHub Actions workflows +using the nektos/act CLI tool with pytest parameterized tests. +""" + +import json +import os +import subprocess +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Optional, Union +from dataclasses import dataclass, field + + +@dataclass +class ActResult: + """Result of running an ACT CLI command.""" + + returncode: int + stdout: str + stderr: str + success: bool = field(init=False) + + def __post_init__(self) -> None: + self.success = self.returncode == 0 + + +@dataclass +class WorkflowTrigger: + """Configuration for triggering a GitHub Actions workflow.""" + + event_name: str + event_payload: Dict[str, Any] = field(default_factory=dict) + secrets: Dict[str, str] = field(default_factory=dict) + env_vars: Dict[str, str] = field(default_factory=dict) + platform: str = "ubuntu-latest" + image: str = "catthehacker/ubuntu:act-latest" # Medium size option + + +class ActRunner: + """ + Python wrapper around the nektos/act CLI for testing GitHub Actions workflows. + + This class provides a clean interface for running workflows with different + trigger scenarios and event payloads. + """ + + def __init__( + self, + workflow_dir: Union[str, Path], + act_binary: str = "act", + default_image: str = "catthehacker/ubuntu:act-latest" + ): + """ + Initialize the ACT runner. + + Args: + workflow_dir: Path to directory containing .github/workflows + act_binary: Path to act CLI binary (default: "act" from PATH) + default_image: Default Docker image to use (medium size) + """ + self.workflow_dir = Path(workflow_dir) + self.act_binary = act_binary + self.default_image = default_image + + try: + result = subprocess.run( + [self.act_binary, "--version"], + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode != 0: + raise RuntimeError(f"ACT CLI not available: {result.stderr}") + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + raise RuntimeError(f"ACT CLI not found or not working: {e}") + + def run_workflow( + self, + workflow_file: str, + trigger: WorkflowTrigger, + job_name: Optional[str] = None, + dry_run: bool = False, + verbose: bool = False + ) -> ActResult: + """ + Run a GitHub Actions workflow using ACT CLI. + + Args: + workflow_file: Name of workflow file (e.g., "python_pytest.yml") + trigger: Workflow trigger configuration + job_name: Specific job to run (optional) + dry_run: Only show what would be run + verbose: Enable verbose output + + Returns: + ActResult with execution details + """ + cmd = [self.act_binary] + + cmd.append(trigger.event_name) + + workflow_path = self.workflow_dir / ".github" / "workflows" / workflow_file + if not workflow_path.exists(): + raise FileNotFoundError(f"Workflow file not found: {workflow_path}") + + cmd.extend(["-W", str(workflow_path)]) + + cmd.extend(["-P", f"{trigger.platform}={trigger.image}"]) + + if job_name: + cmd.extend(["-j", job_name]) + + if dry_run: + cmd.append("--dryrun") + + if verbose: + cmd.append("-v") + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as event_file: + json.dump(trigger.event_payload, event_file, indent=2) + event_file_path = event_file.name + + try: + if trigger.event_payload: + cmd.extend(["-e", event_file_path]) + + for key, value in trigger.secrets.items(): + cmd.extend(["-s", f"{key}={value}"]) + + env = {} + for key, value in trigger.env_vars.items(): + env[key] = value + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout + env={**os.environ, **env} if env else None + ) + + return ActResult( + returncode=result.returncode, + stdout=result.stdout, + stderr=result.stderr + ) + + except subprocess.TimeoutExpired: + return ActResult( + returncode=-1, + stdout="", + stderr="Command timed out after 5 minutes" + ) + finally: + Path(event_file_path).unlink(missing_ok=True) + + def list_workflows(self) -> List[str]: + """List available workflow files.""" + workflows_dir = self.workflow_dir / ".github" / "workflows" + if not workflows_dir.exists(): + return [] + + return [ + f.name for f in workflows_dir.glob("*.yml") + if f.is_file() + ] + + def list_jobs(self, workflow_file: str) -> List[str]: + """List jobs in a specific workflow file.""" + cmd = [self.act_binary, "-l", "-W", str(self.workflow_dir)] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + return [] + + jobs = [] + for line in result.stdout.split('\n'): + if line.strip() and not line.startswith('Stage'): + parts = line.split() + if len(parts) >= 2: + jobs.append(parts[1]) # Job name is second column + + return jobs + except (subprocess.TimeoutExpired, subprocess.SubprocessError): + return [] + + +def create_push_trigger( + ref: str = "refs/heads/main", + sha: str = "abc123", + **kwargs: Any +) -> WorkflowTrigger: + """Create a push event trigger.""" + payload = { + "ref": ref, + "after": sha, + "before": "000000", + "repository": { + "name": "test-repo", + "full_name": "test-org/test-repo" + }, + **kwargs + } + return WorkflowTrigger(event_name="push", event_payload=payload) + + +def create_pr_trigger( + action: str = "opened", + pr_number: int = 1, + base_ref: str = "main", + head_ref: str = "feature-branch", + **kwargs: Any +) -> WorkflowTrigger: + """Create a pull request event trigger.""" + payload = { + "action": action, + "number": pr_number, + "pull_request": { + "number": pr_number, + "base": {"ref": base_ref}, + "head": {"ref": head_ref}, + "title": "Test PR", + "body": "Test PR body" + }, + "repository": { + "name": "test-repo", + "full_name": "test-org/test-repo" + }, + **kwargs + } + return WorkflowTrigger(event_name="pull_request", event_payload=payload) + + +def create_workflow_dispatch_trigger( + inputs: Optional[Dict[str, Any]] = None, + **kwargs: Any +) -> WorkflowTrigger: + """Create a workflow_dispatch event trigger.""" + payload = { + "inputs": inputs or {}, + "repository": { + "name": "test-repo", + "full_name": "test-org/test-repo" + }, + **kwargs + } + return WorkflowTrigger(event_name="workflow_dispatch", event_payload=payload) diff --git a/tests/act_testing/conftest.py b/tests/act_testing/conftest.py new file mode 100644 index 00000000..d29e4ef6 --- /dev/null +++ b/tests/act_testing/conftest.py @@ -0,0 +1,81 @@ +""" +Pytest configuration and fixtures for ACT CLI testing. + +This module provides shared fixtures and configuration for testing +GitHub Actions workflows with the ACT CLI. +""" + +import pytest +import subprocess +from pathlib import Path + + +def pytest_configure(config): + """Configure pytest with custom markers.""" + config.addinivalue_line( + "markers", "act_integration: marks tests as ACT CLI integration tests" + ) + config.addinivalue_line( + "markers", "requires_docker: marks tests that require Docker to be running" + ) + + +@pytest.fixture(scope="session") +def act_available(): + """Check if ACT CLI is available and working.""" + try: + result = subprocess.run( + ["act", "--version"], + capture_output=True, + text=True, + timeout=10 + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError): + return False + + +@pytest.fixture(scope="session") +def docker_available(): + """Check if Docker is available and running.""" + try: + result = subprocess.run( + ["docker", "info"], + capture_output=True, + text=True, + timeout=10 + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError): + return False + + +@pytest.fixture(scope="session") +def repo_root(): + """Get the repository root directory.""" + return Path(__file__).parent.parent.parent + + +def pytest_collection_modifyitems(config, items): + """Modify test collection to skip tests based on requirements.""" + act_available = True + docker_available = True + + try: + subprocess.run(["act", "--version"], capture_output=True, timeout=5) + except (subprocess.TimeoutExpired, FileNotFoundError): + act_available = False + + try: + subprocess.run(["docker", "info"], capture_output=True, timeout=5) + except (subprocess.TimeoutExpired, FileNotFoundError): + docker_available = False + + skip_act = pytest.mark.skip(reason="ACT CLI not available") + skip_docker = pytest.mark.skip(reason="Docker not available") + + for item in items: + if "act_integration" in item.keywords and not act_available: + item.add_marker(skip_act) + if "requires_docker" in item.keywords and not docker_available: + item.add_marker(skip_docker) diff --git a/tests/act_testing/py.typed b/tests/act_testing/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/tests/act_testing/test_workflows.py b/tests/act_testing/test_workflows.py new file mode 100644 index 00000000..e3c7e20c --- /dev/null +++ b/tests/act_testing/test_workflows.py @@ -0,0 +1,217 @@ +""" +Pytest tests for GitHub Actions workflows using ACT CLI. + +This module contains parameterized tests that exercise different workflow +trigger scenarios using the ACT CLI wrapper. +""" + +import pytest +from pathlib import Path +from typing import List, Tuple + +from .act_runner import ( + ActRunner, + WorkflowTrigger, + create_push_trigger, + create_pr_trigger, + create_workflow_dispatch_trigger, +) + + +WORKFLOW_SCENARIOS = [ + ("python_pytest.yml", create_push_trigger(), "pytest-fast", True), + ("python_pytest.yml", create_pr_trigger(), "pytest-fast", True), + ("python_pytest.yml", create_pr_trigger(action="synchronize"), "pytest-no-creds", True), + ("test-pr-command.yml", create_workflow_dispatch_trigger(inputs={"pr": "123"}), "start-workflow", True), + ("fix-pr-command.yml", create_workflow_dispatch_trigger(inputs={"pr": "123"}), "pr-fix-on-demand", True), +] + +TRIGGER_SCENARIOS = [ + ("push_main", create_push_trigger(ref="refs/heads/main")), + ("push_feature", create_push_trigger(ref="refs/heads/feature-branch")), + ("pr_opened", create_pr_trigger(action="opened")), + ("pr_synchronize", create_pr_trigger(action="synchronize")), + ("pr_closed", create_pr_trigger(action="closed")), + ("workflow_dispatch_basic", create_workflow_dispatch_trigger()), + ("workflow_dispatch_with_inputs", create_workflow_dispatch_trigger(inputs={"pr": "123", "comment-id": "456"})), +] + + +class TestWorkflowExecution: + """Test class for GitHub Actions workflow execution via ACT CLI.""" + + @pytest.fixture + def act_runner(self) -> ActRunner: + """Create an ActRunner instance for testing.""" + repo_root = Path(__file__).parent.parent.parent + return ActRunner(workflow_dir=repo_root) + + @pytest.fixture + def available_workflows(self, act_runner: ActRunner) -> List[str]: + """Get list of available workflow files.""" + return act_runner.list_workflows() + + def test_act_runner_initialization(self, act_runner: ActRunner): + """Test that ActRunner initializes correctly.""" + assert act_runner.workflow_dir.exists() + assert act_runner.act_binary == "act" + assert act_runner.default_image == "catthehacker/ubuntu:act-latest" + + def test_list_workflows(self, act_runner: ActRunner, available_workflows: List[str]): + """Test that we can list available workflows.""" + assert len(available_workflows) > 0 + assert "python_pytest.yml" in available_workflows + assert "test-pr-command.yml" in available_workflows + assert "fix-pr-command.yml" in available_workflows + + @pytest.mark.parametrize("workflow_file,trigger,job_name,expected_success", WORKFLOW_SCENARIOS) + def test_workflow_scenarios( + self, + act_runner: ActRunner, + workflow_file: str, + trigger: WorkflowTrigger, + job_name: str, + expected_success: bool + ): + """Test different workflow scenarios with various triggers.""" + result = act_runner.run_workflow( + workflow_file=workflow_file, + trigger=trigger, + job_name=job_name, + dry_run=True, + verbose=True + ) + + if expected_success: + assert result.success, f"Workflow {workflow_file} with job {job_name} failed: {result.stderr}" + + assert workflow_file in result.stdout or workflow_file in result.stderr + if job_name: + assert job_name in result.stdout or job_name in result.stderr + + @pytest.mark.parametrize("scenario_name,trigger", TRIGGER_SCENARIOS) + def test_trigger_scenarios( + self, + act_runner: ActRunner, + scenario_name: str, + trigger: WorkflowTrigger + ): + """Test different trigger scenarios with python_pytest.yml workflow.""" + result = act_runner.run_workflow( + workflow_file="python_pytest.yml", + trigger=trigger, + job_name="pytest-fast", # Use fast job to avoid long execution + dry_run=True, + verbose=True + ) + + assert result.success, f"Trigger scenario {scenario_name} failed: {result.stderr}" + + assert trigger.event_name in result.stdout or trigger.event_name in result.stderr + + def test_workflow_with_secrets(self, act_runner: ActRunner): + """Test workflow execution with secrets.""" + trigger = create_push_trigger() + trigger.secrets = { + "GCP_GSM_CREDENTIALS": "fake-credentials", + "GITHUB_TOKEN": "fake-token" + } + + result = act_runner.run_workflow( + workflow_file="python_pytest.yml", + trigger=trigger, + job_name="pytest-fast", + dry_run=True + ) + + if "authentication required" in result.stderr: + assert "GCP_GSM_CREDENTIALS" in str(trigger.secrets) + assert "GITHUB_TOKEN" in str(trigger.secrets) + else: + assert result.success, f"Workflow with secrets failed: {result.stderr}" + + def test_workflow_with_env_vars(self, act_runner: ActRunner): + """Test workflow execution with environment variables.""" + trigger = create_push_trigger() + trigger.env_vars = { + "AIRBYTE_ANALYTICS_ID": "test-analytics-id", + "PYTHONIOENCODING": "utf-8" + } + + result = act_runner.run_workflow( + workflow_file="python_pytest.yml", + trigger=trigger, + job_name="pytest-fast", + dry_run=True + ) + + assert result.success, f"Workflow with env vars failed: {result.stderr}" + + def test_invalid_workflow_file(self, act_runner: ActRunner): + """Test handling of invalid workflow file.""" + trigger = create_push_trigger() + + with pytest.raises(FileNotFoundError): + act_runner.run_workflow( + workflow_file="nonexistent.yml", + trigger=trigger + ) + + def test_aaron_steers_resolve_ci_vars_action(self, act_runner: ActRunner): + """Test workflows that use the Aaron Steers resolve CI vars action.""" + workflows_with_resolve_vars = [ + ("fix-pr-command.yml", create_workflow_dispatch_trigger(inputs={"pr": "123"})), + ("test-pr-command.yml", create_workflow_dispatch_trigger(inputs={"pr": "123"})), + ("poetry-lock-command.yml", create_workflow_dispatch_trigger(inputs={"pr": "123"})), + ("welcome-message.yml", create_pr_trigger(action="opened")), # This workflow is triggered by PR events + ] + + for workflow_file, trigger in workflows_with_resolve_vars: + result = act_runner.run_workflow( + workflow_file=workflow_file, + trigger=trigger, + dry_run=True, + verbose=True + ) + + workflow_content = (act_runner.workflow_dir / ".github" / "workflows" / workflow_file).read_text() + assert "aaronsteers/resolve-ci-vars-action@v0" in workflow_content + + assert result.success, f"Workflow {workflow_file} with resolve-ci-vars-action failed: {result.stderr}" + + +class TestActRunnerEdgeCases: + """Test edge cases and error handling for ActRunner.""" + + @pytest.fixture + def act_runner(self) -> ActRunner: + """Create an ActRunner instance for testing.""" + repo_root = Path(__file__).parent.parent.parent + return ActRunner(workflow_dir=repo_root) + + def test_timeout_handling(self, act_runner: ActRunner): + """Test that long-running workflows timeout appropriately.""" + trigger = create_push_trigger() + + assert hasattr(act_runner, 'run_workflow') + + def test_different_platforms(self, act_runner: ActRunner): + """Test running workflows on different platforms.""" + platforms = [ + ("ubuntu-latest", "catthehacker/ubuntu:act-latest"), + ("ubuntu-20.04", "catthehacker/ubuntu:act-20.04"), + ] + + for platform, image in platforms: + trigger = create_push_trigger() + trigger.platform = platform + trigger.image = image + + result = act_runner.run_workflow( + workflow_file="python_pytest.yml", + trigger=trigger, + job_name="pytest-fast", + dry_run=True + ) + + assert result.success, f"Platform {platform} failed: {result.stderr}"