Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions scripts/python/tasks/managed/collect_registry_token_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""Collect the registry token secret name from the release data file."""

from __future__ import annotations

from pathlib import Path
from typing import Any

import file
import tekton
from logger import logger

PROG = "collect_registry_token_secret.py"


def _is_public(value: Any) -> bool:
"""Return True when a mapping ``public`` flag is enabled."""
if value is True:
return True
return str(value).lower() == "true"


def is_secret_required(data: dict[str, Any]) -> bool:
"""Return True when defaults or any component require making repos public.

Mirrors the bash task: ``mapping.defaults.public`` or any
``mapping.components[*].public`` set to true.
"""
mapping = data.get("mapping") or {}
defaults = mapping.get("defaults") or {}
if _is_public(defaults.get("public", False)):
return True

for component in mapping.get("components") or []:
if isinstance(component, dict) and _is_public(component.get("public", False)):
return True

return False


def collect_registry_token_secret(data: dict[str, Any]) -> str:
"""Return the registry secret name, or an empty string when not required.

Raises:
ValueError: When a secret is required but ``mapping.registrySecret``
is absent from the data file.

"""
if not is_secret_required(data):
logger.info("No repos to make public, so no secret is required. Exiting...")
return ""

mapping = data.get("mapping") or {}
if "registrySecret" not in mapping:
raise ValueError("Registry secret missing in data JSON file")

secret = mapping["registrySecret"]
if secret is None:
raise ValueError("Registry secret missing in data JSON file")

return str(secret)


def run(data_file: Path) -> str:
"""Load *data_file* and return the registry secret name (or empty)."""
if not data_file.is_file():
raise FileNotFoundError("No valid data file was provided.")

data = file.load_json_dict(data_file)
return collect_registry_token_secret(data)


def main() -> int:
"""Read Tekton env vars, resolve the secret name, and write the result."""
data_dir = Path(tekton.require_env("PARAM_DATA_DIR"))
data_path = Path(tekton.require_env("PARAM_DATA_PATH"))
(result_path,) = tekton.result_paths_from_env("RESULT_REGISTRY_SECRET")

secret = run(data_dir / data_path)
result_path.write_text(secret, encoding="utf-8")
return 0


if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())
175 changes: 175 additions & 0 deletions scripts/python/tasks/managed/test_collect_registry_token_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Tests for collect_registry_token_secret."""

from __future__ import annotations

import json
from pathlib import Path

import pytest

from collect_registry_token_secret import (
collect_registry_token_secret,
is_secret_required,
main,
run,
)


def test_is_secret_required_when_defaults_public() -> None:
"""defaults.public=true requires a registry secret."""
data = {"mapping": {"defaults": {"public": True}, "components": []}}
assert is_secret_required(data) is True


def test_is_secret_required_when_component_public() -> None:
"""Any component with public=true requires a registry secret."""
data = {
"mapping": {
"defaults": {},
"components": [{"name": "a"}, {"name": "b", "public": True}],
}
}
assert is_secret_required(data) is True


def test_is_secret_required_false_when_nothing_public() -> None:
"""No secret is required when nothing is marked public."""
data = {
"mapping": {
"defaults": {},
"components": [{"name": "a"}, {"name": "b", "public": False}],
}
}
assert is_secret_required(data) is False


def test_collect_returns_secret_when_required() -> None:
"""Return mapping.registrySecret when public repos need a token."""
data = {
"mapping": {
"defaults": {},
"components": [{"name": "c", "public": True}],
"registrySecret": "mysecret",
}
}
assert collect_registry_token_secret(data) == "mysecret"


def test_collect_returns_empty_when_not_required() -> None:
"""Return an empty string when no repos are public."""
data = {
"mapping": {
"defaults": {},
"components": [{"name": "c"}],
"registrySecret": "mysecret",
}
}
assert collect_registry_token_secret(data) == ""


def test_collect_raises_when_secret_missing() -> None:
"""Raise when public=true but mapping.registrySecret is absent."""
data = {
"mapping": {
"defaults": {"public": True},
"components": [],
}
}
with pytest.raises(ValueError, match="Registry secret missing"):
collect_registry_token_secret(data)


def test_run_raises_when_data_file_missing(tmp_path: Path) -> None:
"""Raise FileNotFoundError when the data file does not exist."""
with pytest.raises(FileNotFoundError, match="No valid data file"):
run(tmp_path / "missing.json")


def test_run_reads_file(tmp_path: Path) -> None:
"""Load JSON from disk and return the registry secret name."""
data_file = tmp_path / "data.json"
data_file.write_text(
json.dumps(
{
"mapping": {
"components": [{"name": "c", "public": True}],
"defaults": {},
"registrySecret": "token-secret",
}
}
),
encoding="utf-8",
)
assert run(data_file) == "token-secret"


def test_main_writes_secret(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Write the secret name to the Tekton result file."""
data_dir = tmp_path / "release"
data_dir.mkdir()
(data_dir / "data.json").write_text(
json.dumps(
{
"mapping": {
"components": [{"name": "c", "public": True}],
"defaults": {},
"registrySecret": "mysecret",
}
}
),
encoding="utf-8",
)
result_file = tmp_path / "registrySecret"
monkeypatch.setenv("PARAM_DATA_DIR", str(data_dir))
monkeypatch.setenv("PARAM_DATA_PATH", "data.json")
monkeypatch.setenv("RESULT_REGISTRY_SECRET", str(result_file))

assert main() == 0
assert result_file.read_text(encoding="utf-8") == "mysecret"


def test_main_writes_empty_when_not_required(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Write an empty result when no secret is required."""
data_dir = tmp_path / "release"
data_dir.mkdir()
(data_dir / "data.json").write_text(
json.dumps({"mapping": {"components": [{"name": "c"}], "defaults": {}}}),
encoding="utf-8",
)
result_file = tmp_path / "registrySecret"
monkeypatch.setenv("PARAM_DATA_DIR", str(data_dir))
monkeypatch.setenv("PARAM_DATA_PATH", "data.json")
monkeypatch.setenv("RESULT_REGISTRY_SECRET", str(result_file))

assert main() == 0
assert result_file.read_text(encoding="utf-8") == ""


def test_main_raises_when_secret_missing(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Propagate ValueError when registrySecret is missing but required."""
data_dir = tmp_path / "release"
data_dir.mkdir()
(data_dir / "data.json").write_text(
json.dumps(
{
"mapping": {
"components": [],
"defaults": {"public": True},
}
}
),
encoding="utf-8",
)
result_file = tmp_path / "registrySecret"
monkeypatch.setenv("PARAM_DATA_DIR", str(data_dir))
monkeypatch.setenv("PARAM_DATA_PATH", "data.json")
monkeypatch.setenv("RESULT_REGISTRY_SECRET", str(result_file))

with pytest.raises(ValueError, match="Registry secret missing"):
main()

assert not result_file.exists()
Loading