Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions ddev/src/ddev/cli/ci/tests/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from dataclasses import dataclass
from typing import Literal
import re
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal

from ddev.event_bus.orchestrator import BaseMessage

if TYPE_CHECKING:
from ddev.utils.github_async.models import WorkflowJob

# Characters GitHub disallows in an artifact name (plus CR/LF).
ARTIFACT_NAME_DISALLOWED = re.compile(r'["\:<>|*?\\/\r\n]')
# Reserved separator between the artifact name's fields. Chosen so it never appears in a field
# value (target/environment/platform), which keeps the name reversible via a plain split.
ARTIFACT_NAME_SEPARATOR = "~"


@dataclass
class BatchJob:
Expand All @@ -21,6 +31,27 @@ class BatchJob:
unit_tests: bool
e2e_tests: bool

def artifact_name(self) -> str:
"""Reversible artifact name built from the job's target, environment, and platform.

Pure and deterministic. Each field is sanitized to GitHub's artifact-name constraints and
joined by a reserved separator absent from the values, so the name can be split back into
``(target, environment, platform)``. Uniqueness within a batch relies on those three fields
being distinct per job.
"""
fields = (self.target, self.environment, self.platform)
return ARTIFACT_NAME_SEPARATOR.join(ARTIFACT_NAME_DISALLOWED.sub("_", field) for field in fields)


def split_artifact_name(artifact_name: str) -> tuple[str, str, str]:
"""Reverse ``BatchJob.artifact_name`` into ``(target, environment, platform)``.

Raises ``ValueError`` when ``artifact_name`` is not the expected three-field shape, so callers
can skip artifacts that are not per-job test artifacts.
"""
target, environment, platform = artifact_name.split(ARTIFACT_NAME_SEPARATOR)
return target, environment, platform


@dataclass
class FailedCheck:
Expand All @@ -30,6 +61,23 @@ class FailedCheck:
url: str


@dataclass
class BatchJobResult:
"""Everything known about a single job in a finished batch, correlated by the producer.

``artifacts_path`` is the single downloaded folder for the job (named after the job's
``artifact_name``); the three ``*_artifact_name`` fields are the expected per-facet file names
inside that folder.
"""

job: BatchJob
workflow_job: WorkflowJob | None
artifacts_path: str | None
unit_artifact_name: str
e2e_artifact_name: str
coverage_artifact_name: str


@dataclass
class WorkflowStatus:
"""Status of a single GitHub Actions workflow run."""
Expand Down Expand Up @@ -58,6 +106,8 @@ class BatchFinished(BaseMessage):
run_id: int
workflow_url: str
artifacts_path: str
timed_out: bool = False
batch_jobs: list[BatchJobResult] = field(default_factory=list)


@dataclass
Expand Down
76 changes: 70 additions & 6 deletions ddev/src/ddev/cli/ci/tests/task_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from pathlib import Path
from typing import Any, Literal

from ddev.cli.ci.tests.messages import BatchFinished, TestBatch
from ddev.cli.ci.tests.messages import BatchFinished, BatchJob, BatchJobResult, TestBatch, split_artifact_name
from ddev.event_bus.orchestrator import AsyncProcessor
from ddev.utils.github_async import AsyncGitHubClient, GitHubResponse
from ddev.utils.github_async.models import WorkflowRun
from ddev.utils.github_async.models import WorkflowJob, WorkflowRun


def _conclusion_to_status(conclusion: str | None) -> Literal["success", "failure", "skipped"]:
Expand Down Expand Up @@ -98,15 +98,19 @@ async def process_message(self, message: TestBatch) -> None:
self._logger.warning("Workflow completed with null conclusion", extra=log_extra)
final_conclusion = raw or "neutral"

artifacts_path = await self._download_artifacts(run_id, log_extra)
artifacts_path, artifact_dirs = await self._download_artifacts(run_id, log_extra)
self._logger.info("Artifacts downloaded", extra=log_extra)

jobs = await self._list_jobs(run_id, log_extra)
batch_jobs = self._build_batch_jobs(message.job_list, jobs, artifact_dirs)

finished = BatchFinished(
id=message.id,
status=_conclusion_to_status(raw),
run_id=run_id,
workflow_url=workflow_url,
artifacts_path=str(artifacts_path),
batch_jobs=batch_jobs,
)
finally:
try:
Expand Down Expand Up @@ -134,16 +138,75 @@ async def _poll_until_complete(self, run_id: int, log_extra: dict[str, Any]) ->
self._logger.info("Workflow completed", extra=log_extra)
return run

async def _list_jobs(self, run_id: int, log_extra: dict[str, Any]) -> list[WorkflowJob]:
"""Fetch the workflow run's jobs; on failure log a warning and return an empty list."""
jobs: list[WorkflowJob] = []
try:
async for page in self._client.list_workflow_jobs(self._options.owner, self._options.repo, run_id):
jobs.extend(page.data.jobs)
except Exception:
self._logger.warning("Failed to list workflow jobs", extra=log_extra, exc_info=True)
return jobs

@staticmethod
def _build_batch_jobs(
job_list: list[BatchJob], jobs: list[WorkflowJob], artifact_dirs: dict[str, Path]
) -> list[BatchJobResult]:
"""Correlate each job's spec, its workflow-run result, and its artifact directory.

The workflow-job join is by name (tolerant of misses). Each downloaded artifact folder is
identified by reversing its name into ``(target, environment, platform)``, which is matched
to the batch job. That single folder holds the three per-facet files, whose names
(``unit-``/``e2e-``/``coverage-`` prefixed on the base name) are recorded for the gatherer.
A job missing from the API or from disk still yields a well-formed result.
"""
jobs_by_name = {job.name: job for job in jobs}
# Reverse each downloaded artifact's name to identify which target/env/platform it belongs to.
dirs_by_fields: dict[tuple[str, str, str], Path] = {}
for artifact_name, path in artifact_dirs.items():
try:
dirs_by_fields[split_artifact_name(artifact_name)] = path
except ValueError:
continue

results: list[BatchJobResult] = []
for batch_job in job_list:
base = batch_job.artifact_name()
artifact_dir = dirs_by_fields.get(split_artifact_name(base))
results.append(
BatchJobResult(
job=batch_job,
workflow_job=jobs_by_name.get(batch_job.name),
artifacts_path=str(artifact_dir) if artifact_dir is not None else None,
unit_artifact_name=f"unit-{base}",
e2e_artifact_name=f"e2e-{base}",
coverage_artifact_name=f"coverage-{base}",
)
)
return results

def _build_inputs(self, message: TestBatch) -> dict[str, str]:
return {
"batch_id": message.id,
"checkout_sha": self._options.checkout_sha,
"integrations": json.dumps(message.integrations),
"job_list": json.dumps([dataclasses.asdict(job) for job in message.job_list]),
"job_list": json.dumps([self._job_input(job) for job in message.job_list]),
}

async def _download_artifacts(self, run_id: int, log_extra: dict[str, Any]) -> Path:
@staticmethod
def _job_input(job: BatchJob) -> dict[str, Any]:
"""Serialize a job for the workflow, carrying the artifact name so all its files upload under
a single folder/zip named after it (splittable later via ``split_artifact_name``)."""
return {**dataclasses.asdict(job), "artifact_name": job.artifact_name()}

async def _download_artifacts(self, run_id: int, log_extra: dict[str, Any]) -> tuple[Path, dict[str, Path]]:
"""Download the run's artifacts and return the run directory plus an artifact-name -> path map.

The map keys on the GitHub artifact name (the contract a ``BatchJob`` reproduces via
``artifact_name``), letting the producer resolve each job's directory deterministically.
"""
run_path = self._options.artifacts_base_path / str(run_id)
artifact_dirs: dict[str, Path] = {}
failures: list[tuple[int, str]] = []
try:
async for page in self._client.list_workflow_run_artifacts(self._options.owner, self._options.repo, run_id):
Expand All @@ -167,6 +230,7 @@ async def _download_artifacts(self, run_id: int, log_extra: dict[str, Any]) -> P
target = run_path / f"{artifact.id}-{artifact.name}"
try:
await self._client.download_artifact(artifact.archive_download_url, target)
artifact_dirs[artifact.name] = target
self._logger.info("Downloaded artifact %s -> %s", artifact.id, target, extra=log_extra)
except Exception as exc:
self._logger.warning(
Expand All @@ -186,4 +250,4 @@ async def _download_artifacts(self, run_id: int, log_extra: dict[str, Any]) -> P
failures,
extra=log_extra,
)
return run_path
return run_path, artifact_dirs
29 changes: 29 additions & 0 deletions ddev/src/ddev/utils/github_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
PullRequest,
PullRequestReviewComment,
WorkflowDispatchResult,
WorkflowJobsList,
WorkflowRun,
)

Expand Down Expand Up @@ -251,6 +252,34 @@ async def list_workflow_run_artifacts(
async for response in self._paginated_request("GET", endpoint, timeout=timeout, params={"per_page": per_page}):
yield self._parse_response(response, ArtifactsList)

async def list_workflow_jobs(
self,
owner: str,
repo: str,
run_id: int,
per_page: int = 30,
timeout: float | None = None,
) -> AsyncIterator[GitHubResponse[WorkflowJobsList]]:
"""
Calls the GitHub API to list jobs for a workflow run (paginated).

GitHub API Documentation:
https://docs.github.com/en/rest/actions/workflow-jobs#list-jobs-for-a-workflow-run

Args:
owner: Repository owner (user or organisation).
repo: Repository name.
run_id: Numeric ID of the workflow run.
per_page: Number of jobs per page (default 30, max 100).
timeout: Optional timeout for this specific request. Defaults to the client's default_timeout.

Returns:
AsyncIterator[GitHubResponse[WorkflowJobsList]]: One page of jobs per iteration.
"""
endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}/jobs"
async for response in self._paginated_request("GET", endpoint, timeout=timeout, params={"per_page": per_page}):
yield self._parse_response(response, WorkflowJobsList)

async def create_issue_comment(
self,
owner: str,
Expand Down
6 changes: 6 additions & 0 deletions ddev/src/ddev/utils/github_async/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
from .user import GitHubUser as GitHubUser
from .workflow import Artifact as Artifact
from .workflow import ArtifactsList as ArtifactsList
from .workflow import JobStep as JobStep
from .workflow import WorkflowDispatchResult as WorkflowDispatchResult
from .workflow import WorkflowJob as WorkflowJob
from .workflow import WorkflowJobsList as WorkflowJobsList
from .workflow import WorkflowRun as WorkflowRun

# Map of exported attribute name -> submodule (relative to this package) that
Expand All @@ -45,11 +48,14 @@
'CheckRun': 'check_run',
'GitHubUser': 'user',
'IssueComment': 'comment',
'JobStep': 'workflow',
'Label': 'label',
'PullRequest': 'pull_request',
'PullRequestRef': 'pull_request',
'PullRequestReviewComment': 'comment',
'WorkflowDispatchResult': 'workflow',
'WorkflowJob': 'workflow',
'WorkflowJobsList': 'workflow',
'WorkflowRun': 'workflow',
}

Expand Down
36 changes: 35 additions & 1 deletion ddev/src/ddev/utils/github_async/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from __future__ import annotations

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, Field


class WorkflowRun(BaseModel):
Expand Down Expand Up @@ -50,3 +50,37 @@ class ArtifactsList(BaseModel):

total_count: int
artifacts: list[Artifact]


class JobStep(BaseModel):
"""A single step within a GitHub Actions job."""

model_config = ConfigDict(extra="ignore")

name: str
status: str
conclusion: str | None = None
number: int | None = None


class WorkflowJob(BaseModel):
"""A single job within a GitHub Actions workflow run."""

model_config = ConfigDict(extra="ignore")

id: int
run_id: int
name: str
status: str
conclusion: str | None = None
html_url: str | None = None
steps: list[JobStep] = Field(default_factory=list)


class WorkflowJobsList(BaseModel):
"""A list of jobs with a total count."""

model_config = ConfigDict(extra="ignore")

total_count: int
jobs: list[WorkflowJob]
63 changes: 63 additions & 0 deletions ddev/tests/cli/ci/tests/test_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
"""Tests for the ci/tests pipeline messages."""

from __future__ import annotations

import pytest

from ddev.cli.ci.tests.messages import ARTIFACT_NAME_DISALLOWED, BatchJob, split_artifact_name


def _job(**overrides: object) -> BatchJob:
base = {
"name": "job-1",
"target": "ntp",
"runner": "ubuntu-latest",
"environment": "py3.13",
"platform": "linux",
"unit_tests": True,
"e2e_tests": False,
}
base.update(overrides)
return BatchJob(**base) # type: ignore[arg-type]


def test_artifact_name_is_deterministic() -> None:
assert _job().artifact_name() == _job().artifact_name()


def test_artifact_name_built_from_target_env_platform() -> None:
assert _job().artifact_name() == "ntp~py3.13~linux"


def test_artifact_name_is_reversible() -> None:
# split_artifact_name recovers (target, environment, platform) even when fields contain hyphens.
job = _job(target="datadog_checks_base", environment="py3.13-18", platform="linux")
assert split_artifact_name(job.artifact_name()) == ("datadog_checks_base", "py3.13-18", "linux")


def test_split_artifact_name_rejects_unexpected_shape() -> None:
with pytest.raises(ValueError):
split_artifact_name("not-a-valid-artifact-name")


@pytest.mark.parametrize("field", ["name", "runner", "unit_tests", "e2e_tests"])
def test_artifact_name_ignores_non_identifying_fields(field: str) -> None:
# name / runner / unit_tests / e2e_tests are not part of the artifact name.
changed = {"name": "other-job", "runner": "windows-latest", "unit_tests": False, "e2e_tests": True}[field]
assert _job(**{field: changed}).artifact_name() == _job().artifact_name()


@pytest.mark.parametrize(
("field", "value"),
[("target", "kafka"), ("environment", "py3.12"), ("platform", "windows")],
)
def test_artifact_name_varies_with_identifying_fields(field: str, value: str) -> None:
assert _job(**{field: value}).artifact_name() != _job().artifact_name()


def test_artifact_name_sanitizes_disallowed_characters() -> None:
name = _job(target='a/b:c*d?e|f"g<h>i\\j', environment="x\r\ny").artifact_name()
assert ARTIFACT_NAME_DISALLOWED.search(name) is None
Loading
Loading