Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions helpers/mirror/merge_mirror_sets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
Merge oc-mirror generated IDMS/ITMS manifests with existing cluster resources.

The merge strategy is keyed by `source` and preserves existing mirror order:
- Existing mirrors remain first.
- New mirrors from the current manifest are appended if missing.
"""

import argparse
import json
import subprocess
import sys
import time

DEFAULT_SUBPROCESS_TIMEOUT = 30


def _format_cmd(argv):
return " ".join(argv)


def _runtime_error_from_process(argv, exc=None, result=None, timeout=None):
if exc is not None and isinstance(exc, subprocess.TimeoutExpired):
stdout = (exc.stdout or "").strip()
stderr = (exc.stderr or "").strip()
return RuntimeError(
"command timed out: "
f"cmd={_format_cmd(argv)!r}, timeout={timeout}s, "
f"stdout={stdout!r}, stderr={stderr!r}"
)
if exc is not None and isinstance(exc, subprocess.CalledProcessError):
stdout = (exc.stdout or "").strip()
stderr = (exc.stderr or "").strip()
return RuntimeError(
"command failed: "
f"cmd={_format_cmd(argv)!r}, exit={exc.returncode}, "
f"stdout={stdout!r}, stderr={stderr!r}"
)
if result is not None:
stdout = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
return RuntimeError(
"command failed: "
f"cmd={_format_cmd(argv)!r}, exit={result.returncode}, "
f"stdout={stdout!r}, stderr={stderr!r}"
)
return RuntimeError(f"command failed: cmd={_format_cmd(argv)!r}")


def run_command(argv, check=False, timeout=DEFAULT_SUBPROCESS_TIMEOUT):
try:
return subprocess.run(
argv,
capture_output=True,
text=True,
check=check,
timeout=timeout,
)
except subprocess.TimeoutExpired as exc:
raise _runtime_error_from_process(argv, exc=exc, timeout=timeout) from exc
except subprocess.CalledProcessError as exc:
raise _runtime_error_from_process(argv, exc=exc) from exc


def run_json(argv, check=False):
result = run_command(argv, check=check)
if not result.stdout:
return {}
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(
"failed to decode JSON output: "
f"cmd={_format_cmd(argv)!r}, error={exc!r}, stdout={result.stdout!r}"
) from exc


def merge_by_source(existing_items, new_items):
merged = {}
order = []

for item in existing_items or []:
if not isinstance(item, dict):
continue
source = item.get("source")
if not source:
continue
if source not in merged:
merged[source] = []
order.append(source)
for mirror in item.get("mirrors") or []:
if mirror not in merged[source]:
merged[source].append(mirror)

for item in new_items or []:
if not isinstance(item, dict):
continue
source = item.get("source")
if not source:
continue
if source not in merged:
merged[source] = []
order.append(source)
for mirror in item.get("mirrors") or []:
if mirror not in merged[source]:
merged[source].append(mirror)

return [{"source": source, "mirrors": merged[source]} for source in order]


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--oc-bin", required=True)
parser.add_argument("--input-manifest", required=True)
parser.add_argument("--output-manifest", required=True)
parser.add_argument("--api-kind", required=True)
parser.add_argument("--oc-resource", required=True)
parser.add_argument("--spec-key", required=True)
parser.add_argument("--apply", action="store_true")
parser.add_argument("--max-attempts", type=int, default=5)
parser.add_argument("--retry-delay", type=float, default=1.0)
return parser.parse_args()


def get_existing_resource(args, name):
current = run_command(
[args.oc_bin, "get", args.oc_resource, name, "-o", "json"],
check=False,
)
if current.returncode != 0:
error_output = f"{current.stderr or ''}\n{current.stdout or ''}".lower()
if "notfound" in error_output or "not found" in error_output:
return {}
raise _runtime_error_from_process(
[args.oc_bin, "get", args.oc_resource, name, "-o", "json"],
result=current,
)
return json.loads(current.stdout) if current.stdout else {}


def build_merged_docs(args, docs):
merged_docs = []
expected_specs = {}

for doc in docs:
if not isinstance(doc, dict):
merged_docs.append(doc)
continue
if doc.get("kind") != args.api_kind:
merged_docs.append(doc)
continue

name = (doc.get("metadata") or {}).get("name")
if not name:
merged_docs.append(doc)
continue

existing = get_existing_resource(args, name)
existing_list = (existing.get("spec") or {}).get(args.spec_key) or []
new_list = (doc.get("spec") or {}).get(args.spec_key) or []
merged_spec = merge_by_source(existing_list, new_list)
doc.setdefault("spec", {})[args.spec_key] = merged_spec
merged_docs.append(doc)
expected_specs[name] = merged_spec

return merged_docs, expected_specs


def apply_payload(args):
run_command(
[args.oc_bin, "apply", "-f", args.output_manifest],
check=True,
)


def live_matches_expected(args, expected_specs):
mismatches = []
for name, expected_list in expected_specs.items():
current = get_existing_resource(args, name)
live_list = (current.get("spec") or {}).get(args.spec_key) or []
if live_list != expected_list:
mismatches.append(name)
return mismatches


def main():
args = parse_args()
if args.max_attempts < 1:
raise RuntimeError("--max-attempts must be at least 1")

# Render the input manifest to a list of objects
rendered = run_json(
[
args.oc_bin,
"create",
"--dry-run=client",
"-f",
args.input_manifest,
"-o",
"json",
],
check=True,
)
docs = rendered.get("items", []) if rendered.get("kind") == "List" else [rendered]

unmatched_resources = []
for attempt in range(1, args.max_attempts + 1):
merged_docs, expected_specs = build_merged_docs(args, docs)
payload = {"apiVersion": "v1", "kind": "List", "items": merged_docs}
with open(args.output_manifest, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)

if not args.apply:
return

apply_payload(args)
unmatched_resources = live_matches_expected(args, expected_specs)
if not unmatched_resources:
return

if attempt < args.max_attempts:
time.sleep(args.retry_delay * attempt)

raise RuntimeError(
"concurrent update detected while applying merged mirrors for "
f"{args.oc_resource}: {', '.join(unmatched_resources)} "
f"(exhausted {args.max_attempts} attempts)"
)


if __name__ == "__main__":
try:
main()
except Exception as exc:
print(f"merge_mirror_sets.py failed: {exc}", file=sys.stderr)
sys.exit(1)
73 changes: 58 additions & 15 deletions operators/quay-operator/quay_disconnected_mirrors.yaml
Original file line number Diff line number Diff line change
@@ -1,48 +1,91 @@
---
# Quay disconnected: apply IDMS/ITMS from oc-mirror and trust Quay registry CA for image pulls.
# Included from quay_disconnected.yaml.
# Copy to idms-oc-mirror-internal.yaml / itms-oc-mirror-internal.yaml, rename the resource inside,
# then apply so they coexist with the landing-zone IDMS/ITMS (idms-oc-mirror, itms-oc-mirror).
# The cluster then has both: landing-zone mirrors (primary) and internal Quay mirrors (fallback).
# Copy to a target manifest name based on mode:
# - default: idms/itms *-internal (core flow)
# - plugin merge mode: idms/itms *-plugins (shared for all plugins)
# Then rename the resources inside to avoid conflict with landing-zone IDMS/ITMS.

- name: Set target mirror suffix and base directory
ansible.builtin.set_fact:
quay_mirror_suffix: "{{ 'plugins' if (quay_merge_plugin_mirrors | default(false) | bool) else 'internal' }}"
quay_cluster_resources_dir: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources"

- name: Set target mirror manifest paths
ansible.builtin.set_fact:
quay_idms_internal_manifest: "{{ quay_cluster_resources_dir }}/idms-oc-mirror-{{ quay_mirror_suffix }}.yaml"
quay_itms_internal_manifest: "{{ quay_cluster_resources_dir }}/itms-oc-mirror-{{ quay_mirror_suffix }}.yaml"
quay_idms_merged_manifest: "{{ quay_cluster_resources_dir }}/idms-oc-mirror-{{ quay_mirror_suffix }}-merged.json"
quay_itms_merged_manifest: "{{ quay_cluster_resources_dir }}/itms-oc-mirror-{{ quay_mirror_suffix }}-merged.json"

- name: Stat IDMS source from oc-mirror workspace
ansible.builtin.stat:
path: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/idms-oc-mirror.yaml"
register: quay_idms_src_stat

- name: Copy IDMS to idms-oc-mirror-internal.yaml
- name: Copy IDMS to target mirror manifest
ansible.builtin.copy:
src: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/idms-oc-mirror.yaml"
dest: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/idms-oc-mirror-internal.yaml"
dest: "{{ quay_idms_internal_manifest }}"
remote_src: true
when: quay_idms_src_stat.stat.exists

# oc-mirror v2 uses names like idms-release-0, idms-operator-0 (not idms-oc-mirror); rename all with '-internal' suffix.
# oc-mirror v2 uses names like idms-release-0, idms-operator-0 (not idms-oc-mirror).
- name: Rename ImageDigestMirrorSet resources in IDMS manifest to avoid conflict with landing-zone
ansible.builtin.replace:
path: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/idms-oc-mirror-internal.yaml"
path: "{{ quay_idms_internal_manifest }}"
regexp: '^(\s*name:\s*)idms-([^\s]+)\s*$'
replace: '\1idms-\2-internal'
replace: '\1idms-\2-{{ quay_mirror_suffix }}'
when: quay_idms_src_stat.stat.exists


- name: Stat ITMS source from oc-mirror workspace
ansible.builtin.stat:
path: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/itms-oc-mirror.yaml"
register: quay_itms_src_stat

- name: Copy ITMS to itms-oc-mirror-internal.yaml
- name: Copy ITMS to target mirror manifest
ansible.builtin.copy:
src: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/itms-oc-mirror.yaml"
dest: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/itms-oc-mirror-internal.yaml"
dest: "{{ quay_itms_internal_manifest }}"
remote_src: true
when: quay_itms_src_stat.stat.exists

# oc-mirror v2 uses names like itms-release-0, itms-operator-0; rename all with -internal suffix.
# oc-mirror v2 uses names like itms-release-0, itms-operator-0.
- name: Rename ImageTagMirrorSet resources in ITMS manifest to avoid conflict with landing-zone
ansible.builtin.replace:
path: "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/itms-oc-mirror-internal.yaml"
path: "{{ quay_itms_internal_manifest }}"
regexp: '^(\s*name:\s*)itms-([^\s]+)\s*$'
replace: '\1itms-\2-internal'
replace: '\1itms-\2-{{ quay_mirror_suffix }}'
when: quay_itms_src_stat.stat.exists

- name: Merge plugin mirror sources into existing internal resources
ansible.builtin.include_tasks:
file: "{{ playbook_dir }}/tasks/merge_mirror_manifests.yaml"
vars:
merge_mirror_enabled: "{{ quay_merge_plugin_mirrors | default(false) | bool }}"
merge_mirror_items:
- api_kind: ImageDigestMirrorSet
oc_resource: imagedigestmirrorset
spec_key: imageDigestMirrors
input_manifest: "{{ quay_idms_internal_manifest }}"
output_manifest: "{{ quay_idms_merged_manifest }}"
source_exists: "{{ quay_idms_src_stat.stat.exists }}"
- api_kind: ImageTagMirrorSet
oc_resource: imagetagmirrorset
spec_key: imageTagMirrors
input_manifest: "{{ quay_itms_internal_manifest }}"
output_manifest: "{{ quay_itms_merged_manifest }}"
source_exists: "{{ quay_itms_src_stat.stat.exists }}"

- name: Set IDMS apply manifest path
ansible.builtin.set_fact:
quay_idms_apply_manifest: "{{ quay_idms_merged_manifest if (quay_merge_plugin_mirrors | default(false) | bool) else quay_idms_internal_manifest }}"
when: quay_idms_src_stat.stat.exists

- name: Set ITMS apply manifest path
ansible.builtin.set_fact:
quay_itms_apply_manifest: "{{ quay_itms_merged_manifest if (quay_merge_plugin_mirrors | default(false) | bool) else quay_itms_internal_manifest }}"
when: quay_itms_src_stat.stat.exists

- name: Apply Quay IDMS to cluster (fallback mirrors to internal Quay)
Expand All @@ -51,7 +94,7 @@
- "{{ workingDir }}/bin/oc"
- apply
- -f
- "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/idms-oc-mirror-internal.yaml"
- "{{ quay_idms_apply_manifest }}"
environment:
KUBECONFIG: "{{ workingDir }}/ocp-cluster/auth/kubeconfig"
when: quay_idms_src_stat.stat.exists
Expand All @@ -62,7 +105,7 @@
- "{{ workingDir }}/bin/oc"
- apply
- -f
- "{{ workingDir }}/config/oc-mirror-workspace-quay/working-dir/cluster-resources/itms-oc-mirror-internal.yaml"
- "{{ quay_itms_apply_manifest }}"
environment:
KUBECONFIG: "{{ workingDir }}/ocp-cluster/auth/kubeconfig"
when: quay_itms_src_stat.stat.exists
Expand Down
2 changes: 2 additions & 0 deletions playbooks/tasks/deploy_plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
- name: Mirror plugin
ansible.builtin.include_tasks:
file: mirror_plugin.yaml
vars:
quay_merge_plugin_mirrors: true
when:
- plugin.mirror | default('none') == 'plugin'
- disconnected | default(true) | bool
Expand Down
29 changes: 29 additions & 0 deletions playbooks/tasks/merge_mirror_manifests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
- name: Merge mirror sources into existing resources
ansible.builtin.command:
argv:
- python3
- "{{ playbook_dir }}/../helpers/mirror/merge_mirror_sets.py"
- --oc-bin
- "{{ workingDir }}/bin/oc"
- --input-manifest
- "{{ item.input_manifest }}"
- --output-manifest
- "{{ item.output_manifest }}"
- --api-kind
- "{{ item.api_kind }}"
- --oc-resource
- "{{ item.oc_resource }}"
- --spec-key
- "{{ item.spec_key }}"
- --apply
- --max-attempts
- "{{ merge_mirror_max_attempts | default(5) }}"
- --retry-delay
- "{{ merge_mirror_retry_delay | default(1.0) }}"
changed_when: false
environment:
KUBECONFIG: "{{ workingDir }}/ocp-cluster/auth/kubeconfig"
loop: "{{ merge_mirror_items }}"
when:
- merge_mirror_enabled | bool
- item.source_exists | bool
Loading
Loading