Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tests/act_testing/__init__.py
Empty file.
251 changes: 251 additions & 0 deletions tests/act_testing/act_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""
ACT CLI wrapper for running GitHub Actions workflows locally.

This module provides a clean Python interface for testing GitHub Actions workflows
using the nektos/act CLI tool with pytest parameterized tests.
"""

import json
import os
import subprocess
import tempfile
from pathlib import Path
Comment on lines +1 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Fix linter: ruff format is failing this file

CI reports Ruff would reformat this file. Would you run ruff format to align with our formatter, wdyt?


🏁 Script executed:

#!/bin/bash
ruff format tests/act_testing/act_runner.py

Length of output: 283


Align code with Ruff’s formatter and update deprecated settings

It looks like running ruff format tests/act_testing/act_runner.py reformatted the file, but Ruff also flagged that top-level select in .ruff.toml is deprecated in favor of lint.select. Could you:

  • Commit the reformatted tests/act_testing/act_runner.py (via ruff format)
  • Update .ruff.toml by moving any top-level select entries under a lint.select section

WDYT?

🧰 Tools
🪛 GitHub Actions: Run Linters

[error] 1-1: Ruff format check would reformat this file. Would reformat: tests/act_testing/act_runner.py. Run 'ruff format' to apply formatting.

🤖 Prompt for AI Agents
In tests/act_testing/act_runner.py lines 1-12, the file should be committed
after applying Ruff formatting (run `ruff format
tests/act_testing/act_runner.py` and stage the changes); additionally update the
project's .ruff.toml by moving any top-level `select` entries into a
`lint.select` table (remove the top-level `select` and add the same values under
`[lint]` -> `select = [...]`), then run `ruff check` to validate the config and
commit the .ruff.toml change.

from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field


@dataclass
class ActResult:
"""Result of running an ACT CLI command."""

returncode: int
stdout: str
stderr: str
success: bool = field(init=False)

def __post_init__(self) -> None:
self.success = self.returncode == 0


@dataclass
class WorkflowTrigger:
"""Configuration for triggering a GitHub Actions workflow."""

event_name: str
event_payload: Dict[str, Any] = field(default_factory=dict)
secrets: Dict[str, str] = field(default_factory=dict)
env_vars: Dict[str, str] = field(default_factory=dict)
platform: str = "ubuntu-latest"
image: str = "catthehacker/ubuntu:act-latest" # Medium size option


class ActRunner:
"""
Python wrapper around the nektos/act CLI for testing GitHub Actions workflows.

This class provides a clean interface for running workflows with different
trigger scenarios and event payloads.
"""

def __init__(
self,
workflow_dir: Union[str, Path],
act_binary: str = "act",
default_image: str = "catthehacker/ubuntu:act-latest"
):
"""
Initialize the ACT runner.

Args:
workflow_dir: Path to directory containing .github/workflows
act_binary: Path to act CLI binary (default: "act" from PATH)
default_image: Default Docker image to use (medium size)
"""
self.workflow_dir = Path(workflow_dir)
self.act_binary = act_binary
self.default_image = default_image

try:
result = subprocess.run(
[self.act_binary, "--version"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
raise RuntimeError(f"ACT CLI not available: {result.stderr}")
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
raise RuntimeError(f"ACT CLI not found or not working: {e}")

def run_workflow(
self,
workflow_file: str,
trigger: WorkflowTrigger,
job_name: Optional[str] = None,
dry_run: bool = False,
verbose: bool = False
) -> ActResult:
"""
Run a GitHub Actions workflow using ACT CLI.

Args:
workflow_file: Name of workflow file (e.g., "python_pytest.yml")
trigger: Workflow trigger configuration
job_name: Specific job to run (optional)
dry_run: Only show what would be run
verbose: Enable verbose output

Returns:
ActResult with execution details
"""
cmd = [self.act_binary]

cmd.append(trigger.event_name)

workflow_path = self.workflow_dir / ".github" / "workflows" / workflow_file
if not workflow_path.exists():
raise FileNotFoundError(f"Workflow file not found: {workflow_path}")

cmd.extend(["-W", str(workflow_path)])

cmd.extend(["-P", f"{trigger.platform}={trigger.image}"])

if job_name:
cmd.extend(["-j", job_name])

if dry_run:
cmd.append("--dryrun")

if verbose:
cmd.append("-v")

with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as event_file:
json.dump(trigger.event_payload, event_file, indent=2)
event_file_path = event_file.name

try:
if trigger.event_payload:
cmd.extend(["-e", event_file_path])

for key, value in trigger.secrets.items():
cmd.extend(["-s", f"{key}={value}"])

env = {}
for key, value in trigger.env_vars.items():
env[key] = value

result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
env={**os.environ, **env} if env else None
)

return ActResult(
returncode=result.returncode,
stdout=result.stdout,
stderr=result.stderr
)

except subprocess.TimeoutExpired:
return ActResult(
returncode=-1,
stdout="",
stderr="Command timed out after 5 minutes"
)
finally:
Path(event_file_path).unlink(missing_ok=True)

def list_workflows(self) -> List[str]:
"""List available workflow files."""
workflows_dir = self.workflow_dir / ".github" / "workflows"
if not workflows_dir.exists():
return []

return [
f.name for f in workflows_dir.glob("*.yml")
if f.is_file()
]

def list_jobs(self, workflow_file: str) -> List[str]:
"""List jobs in a specific workflow file."""
cmd = [self.act_binary, "-l", "-W", str(self.workflow_dir)]

try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return []

jobs = []
for line in result.stdout.split('\n'):
if line.strip() and not line.startswith('Stage'):
parts = line.split()
if len(parts) >= 2:
jobs.append(parts[1]) # Job name is second column

return jobs
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
return []
Comment on lines +171 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: list_jobs ignores the workflow_file argument

list_jobs() currently lists jobs for the entire repo and ignores the workflow_file parameter. This can produce incorrect results for callers expecting per-file jobs. Shall we scope -W to the specific workflow file, wdyt?

-    def list_jobs(self, workflow_file: str) -> List[str]:
+    def list_jobs(self, workflow_file: str) -> List[str]:
         """List jobs in a specific workflow file."""
-        cmd = [self.act_binary, "-l", "-W", str(self.workflow_dir)]
+        workflow_path = self.workflow_dir / ".github" / "workflows" / workflow_file
+        if not workflow_path.exists():
+            return []
+        cmd = [self.act_binary, "-l", "-W", str(workflow_path)]
@@
-            jobs = []
-            for line in result.stdout.split('\n'):
-                if line.strip() and not line.startswith('Stage'):
-                    parts = line.split()
-                    if len(parts) >= 2:
-                        jobs.append(parts[1])  # Job name is second column
+            jobs: List[str] = []
+            for line in result.stdout.splitlines():
+                s = line.strip()
+                if not s or s.startswith("Stage"):
+                    continue
+                parts = s.split()
+                if parts:
+                    # Heuristic: second column tends to be the job name; fall back to first.
+                    jobs.append(parts[1] if len(parts) > 1 else parts[0])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def list_jobs(self, workflow_file: str) -> List[str]:
"""List jobs in a specific workflow file."""
cmd = [self.act_binary, "-l", "-W", str(self.workflow_dir)]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return []
jobs = []
for line in result.stdout.split('\n'):
if line.strip() and not line.startswith('Stage'):
parts = line.split()
if len(parts) >= 2:
jobs.append(parts[1]) # Job name is second column
return jobs
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
return []
def list_jobs(self, workflow_file: str) -> List[str]:
"""List jobs in a specific workflow file."""
# Scope -W to the actual workflow file rather than the entire directory
workflow_path = self.workflow_dir / ".github" / "workflows" / workflow_file
if not workflow_path.exists():
return []
cmd = [self.act_binary, "-l", "-W", str(workflow_path)]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return []
jobs: List[str] = []
for line in result.stdout.splitlines():
s = line.strip()
if not s or s.startswith("Stage"):
continue
parts = s.split()
if parts:
# Heuristic: second column tends to be the job name; fall back to first.
jobs.append(parts[1] if len(parts) > 1 else parts[0])
return jobs
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
return []
🤖 Prompt for AI Agents
In tests/act_testing/act_runner.py around lines 171 to 189, list_jobs currently
ignores the workflow_file argument and scopes -W to the whole workflow_dir;
change the cmd to point -W at the specific workflow file path (e.g., join
self.workflow_dir and workflow_file or pass the full path) so act lists jobs
only for that workflow file, and handle the case where the workflow file path
may be invalid before running subprocess.



def create_push_trigger(
ref: str = "refs/heads/main",
sha: str = "abc123",
**kwargs: Any
) -> WorkflowTrigger:
"""Create a push event trigger."""
payload = {
"ref": ref,
"after": sha,
"before": "000000",
"repository": {
"name": "test-repo",
"full_name": "test-org/test-repo"
},
**kwargs
}
return WorkflowTrigger(event_name="push", event_payload=payload)


def create_pr_trigger(
action: str = "opened",
pr_number: int = 1,
base_ref: str = "main",
head_ref: str = "feature-branch",
**kwargs: Any
) -> WorkflowTrigger:
"""Create a pull request event trigger."""
payload = {
"action": action,
"number": pr_number,
"pull_request": {
"number": pr_number,
"base": {"ref": base_ref},
"head": {"ref": head_ref},
"title": "Test PR",
"body": "Test PR body"
},
"repository": {
"name": "test-repo",
"full_name": "test-org/test-repo"
},
**kwargs
}
return WorkflowTrigger(event_name="pull_request", event_payload=payload)


def create_workflow_dispatch_trigger(
inputs: Optional[Dict[str, Any]] = None,
**kwargs: Any
) -> WorkflowTrigger:
"""Create a workflow_dispatch event trigger."""
payload = {
"inputs": inputs or {},
"repository": {
"name": "test-repo",
"full_name": "test-org/test-repo"
},
**kwargs
}
return WorkflowTrigger(event_name="workflow_dispatch", event_payload=payload)
81 changes: 81 additions & 0 deletions tests/act_testing/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Pytest configuration and fixtures for ACT CLI testing.

This module provides shared fixtures and configuration for testing
GitHub Actions workflows with the ACT CLI.
"""

import pytest
import subprocess
from pathlib import Path


def pytest_configure(config):
"""Configure pytest with custom markers."""
config.addinivalue_line(
"markers", "act_integration: marks tests as ACT CLI integration tests"
)
config.addinivalue_line(
"markers", "requires_docker: marks tests that require Docker to be running"
)


@pytest.fixture(scope="session")
def act_available():
"""Check if ACT CLI is available and working."""
try:
result = subprocess.run(
["act", "--version"],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False


@pytest.fixture(scope="session")
def docker_available():
"""Check if Docker is available and running."""
try:
result = subprocess.run(
["docker", "info"],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False


@pytest.fixture(scope="session")
def repo_root():
"""Get the repository root directory."""
return Path(__file__).parent.parent.parent


def pytest_collection_modifyitems(config, items):
"""Modify test collection to skip tests based on requirements."""
act_available = True
docker_available = True

try:
subprocess.run(["act", "--version"], capture_output=True, timeout=5)
except (subprocess.TimeoutExpired, FileNotFoundError):
act_available = False

try:
subprocess.run(["docker", "info"], capture_output=True, timeout=5)
except (subprocess.TimeoutExpired, FileNotFoundError):
docker_available = False

skip_act = pytest.mark.skip(reason="ACT CLI not available")
skip_docker = pytest.mark.skip(reason="Docker not available")

for item in items:
if "act_integration" in item.keywords and not act_available:
item.add_marker(skip_act)
if "requires_docker" in item.keywords and not docker_available:
item.add_marker(skip_docker)
Empty file added tests/act_testing/py.typed
Empty file.
Loading
Loading