From 37a2a0fc0f159759d07fc5c46150a7b10b64fa95 Mon Sep 17 00:00:00 2001 From: robbertuittenbroek Date: Fri, 26 Jun 2026 20:35:48 +0200 Subject: [PATCH] fix(deploy): prevent concurrent project mutations from half-deleting deployments Two sibling deployment deletes of the same project ran concurrently, collided on the shared project file during git push, hit an unrecoverable rebase conflict, and left the deployment half-deleted (toets-hn7/pr-36). The same bug class produced a durable orphan that nothing detected (toets-hn7/pr-32). - Serialize project-file-mutating tasks per project in the claim guard, so deploys/updates/deletes of one project no longer race on projects/

.yaml and the per-project ArgoCD kustomization. Backups/restores stay per-deployment. - Self-heal the project-file push: on an unmergeable rebase, reset to the current remote, re-apply the intended change, and retry instead of failing terminally (new typed GitPushConflictError + reapply hook, wired into the delete path). - Add a read-only GET /api/v2/admin/deployments/drift report that flags deployments live in ArgoCD but absent from the project file. Verified with a real Postgres (claim serialization), real git (push self-heal convergence), and against live production data (drift report flags pr-32 only). --- features/deployment-delete-race-prevention.md | 134 +++++++++++++++++ .../python/opi/api/admin_router.py | 35 +++++ .../python/opi/connectors/git.py | 81 +++++++++- .../python/opi/core/async_task_service.py | 47 +++++- .../python/opi/jobs/deployment_drift.py | 116 +++++++++++++++ .../opi/manager/delete_project_manager.py | 20 ++- .../test_async_task_claim_serialization_db.py | 139 ++++++++++++++++++ .../python/tests/test_async_task_service.py | 43 ++++++ .../python/tests/test_deployment_drift.py | 102 +++++++++++++ .../python/tests/test_git_push_conflict.py | 63 ++++++++ .../test_git_push_conflict_integration.py | 82 +++++++++++ 11 files changed, 851 insertions(+), 11 deletions(-) create mode 100644 features/deployment-delete-race-prevention.md create mode 100644 operations-manager/python/opi/jobs/deployment_drift.py create mode 100644 operations-manager/python/tests/test_async_task_claim_serialization_db.py create mode 100644 operations-manager/python/tests/test_deployment_drift.py create mode 100644 operations-manager/python/tests/test_git_push_conflict.py create mode 100644 operations-manager/python/tests/test_git_push_conflict_integration.py diff --git a/features/deployment-delete-race-prevention.md b/features/deployment-delete-race-prevention.md new file mode 100644 index 00000000..11f7db89 --- /dev/null +++ b/features/deployment-delete-race-prevention.md @@ -0,0 +1,134 @@ +# Deployment delete: race prevention and orphan detection + +## What it is + +Hardening for concurrent deployment mutations after the **toets-hn7/pr-36** +incident (2026-06-24): two sibling deployment deletes of the same project ran +concurrently, collided on the shared project file during `git push`, hit an +unrecoverable rebase conflict, and left the deployment half-deleted. The same +bug class produced the durable **toets-hn7/pr-32** orphan (a deployment removed +from the project file on 2026-06-12 whose ArgoCD Application and pods kept +running for 12 days, undetected). + +Three changes address it, plus a remediation runbook for existing orphans. + +## Background: why it happened + +Every deployment of a project writes the **same** shared git files: +`projects/.yaml` and the per-project ArgoCD `kustomization.yaml`. The +async task worker ran two deletes for the same project concurrently because the +in-flight guard keyed on `project_name` **and** `deployment_name`. A long +ArgoCD-prune wait (3.5 min for pr-36) widened the window for the sibling delete +to land on `main`, so pr-36's rebase could not auto-merge the overlapping list +edit. The push path aborted on conflict and failed the task terminally; only the +blind task auto-retry recovered it. When the retry does not recover (pr-32), the +orphan is permanent because nothing reconciles deployment-level drift. + +## The changes + +### 1a. Serialize project-file-mutating tasks per project + +`AsyncTaskService.claim_next_task` no longer starts a pending task while another +**project-file-mutating** task for the same project is in flight, regardless of +deployment. The set is `PROJECT_FILE_MUTATING_TASK_TYPES` (upsert/update-image/ +delete deployment, refresh, create-project, add-component(-to-deployment), +add-service). Clone/backup/restore are excluded so a slow restore never blocks +deploys. Non-mutating tasks keep the original per-deployment behavior. + +This removes the pr-36/pr-37 collision at the source. Cross-project work still +runs in parallel; only same-project mutations serialize. + +### 1b. Self-heal the project-file push on a true rebase conflict + +`GitConnector.push_changes` / `commit_and_push` accept an optional `reapply` +async callback. On a rebase conflict (non-fast-forward then unmergeable): + +- With `reapply`: hard-reset to the current remote, invoke `reapply` to + re-apply the intended change on top of it, re-commit, and retry the push. This + converges on a textual conflict that is semantically trivial. +- Without `reapply`: raise the new typed `GitPushConflictError`. + +The deployment-delete path (step 8, removing the deployment from the project +file) passes a `reapply` that re-reads the project file fresh and re-removes the +deployment. This covers residual races that 1a cannot (notably the resource +auto-tuner, which commits to project files outside the task system). + +### 2a. Deployment drift report (read-only) + +`GET /api/v2/admin/deployments/drift` (admin API key) compares deployments +declared in project files against live ArgoCD Applications and reports: + +- `orphaned_deployments`: live application with no project-file entry (the pr-32 + case), +- `missing_deployments`: declared but no live application. + +It performs **zero** mutations (mirrors the service-orphan sweep's "no +auto-delete from a scan" rule). Logic lives in `opi/jobs/deployment_drift.py` +(`classify_deployment_drift`, pure and unit-tested); the endpoint lists +Applications via kubectl using the `project` label so it does not depend on +ArgoCD RBAC. + +Project-infrastructure apps (`{project}-infrastructure`, which manage per-project +infra such as PostgreSQL clusters) carry the same `project` label but are not +deployments and never appear in `deployments[]`. They are explicitly excluded so +they are not mis-reported as orphans. Validated against live production (93 +ArgoCD apps, 36 project files): the report returns exactly the one real orphan +(`toets-hn7-pr-32`), zero false positives, zero missing. + +```bash +curl -X GET "https:///api/v2/admin/deployments/drift" \ + -H "X-API-Key: " +``` + +## Remediating an existing orphan (e.g. pr-32) + +Drift is report-only; cleanup is deliberate. Prefer driving the delete through +OPI once this build is rolled out: re-add the deployment to its project file, +let OPI adopt it on reprocess, then `DELETE /api/v2/projects//` +for a full, idempotent teardown (services, namespace, manifests, app). + +Manual Git fallback (two phases, matching OPI's own ordering to avoid a +finalizer deadlock): + +1. In the ArgoCD-applications repo: delete + `odcn-production//--argocd-application.yaml` and remove + its line from `odcn-production//kustomization.yaml`. Commit + push. + Wait for ArgoCD to prune the Application (its `resources-finalizer` removes + the pods). +2. After the app is gone, in the deployments repo: delete the + `odcn-production///` manifest folder. Commit + push. + +Also clear any now-stale `marked_for_deletion` row (a delete that timed out on +the ArgoCD wait marks `deployment_manifests` for deferred cleanup; a later retry +that deletes the manifests can leave the row behind): + +```bash +curl -X GET "https:///api/v2/admin/marked-for-deletion?project_name=" -H "X-API-Key: " +curl -X DELETE "https:///api/v2/admin/marked-for-deletion/" -H "X-API-Key: " +``` + +## Files + +- `opi/core/async_task_service.py` - `PROJECT_FILE_MUTATING_TASK_TYPES`, per-project claim guard +- `opi/connectors/git.py` - `GitPushConflictError`, `reapply` hook, `_reset_to_remote` +- `opi/manager/delete_project_manager.py` - step 8 passes a `reapply` closure +- `opi/jobs/deployment_drift.py` - drift classification +- `opi/api/admin_router.py` - `GET /api/v2/admin/deployments/drift` + +### Tests + +- `tests/test_async_task_service.py` - claim-guard query wiring (mocked) +- `tests/test_async_task_claim_serialization_db.py` - real-Postgres proof of 1a + (the pr-36/pr-37 scenario, backups not over-serialized, cross-project + parallelism). Marked `requires_infra`; run with `TEST_DATABASE_DSN` set against + an ephemeral Postgres (see the module docstring). +- `tests/test_git_push_conflict.py` - 1b control flow (mocked git) +- `tests/test_git_push_conflict_integration.py` - real-git end-to-end: forces an + actual rebase conflict and proves the reapply path converges while preserving + the concurrent writer's change. +- `tests/test_deployment_drift.py` - drift classification incl. infra-app exclusion + +## Dependencies + +None new. Uses the existing async task queue, GitConnector, kubectl connector, +and admin API-key auth. diff --git a/operations-manager/python/opi/api/admin_router.py b/operations-manager/python/opi/api/admin_router.py index 6601b3aa..14c17ca3 100644 --- a/operations-manager/python/opi/api/admin_router.py +++ b/operations-manager/python/opi/api/admin_router.py @@ -8,6 +8,7 @@ POST /api/v2/admin/cleanup/trigger - Trigger cleanup (purge expired) POST /api/v2/admin/reconciliation/trigger - Trigger full reconciliation DELETE /api/v2/admin/marked-for-deletion/{mark_id} - Remove a specific mark + GET /api/v2/admin/deployments/drift - Report live-but-undeclared deployments """ import logging @@ -215,6 +216,40 @@ async def orphan_sweep_report(request: Request) -> JSONResponse: return JSONResponse(content=report, status_code=200) +@admin_router.get("/deployments/drift") +@validate_admin_api_key +async def deployment_drift_report(request: Request) -> JSONResponse: + """Report deployments that are live on the cluster but no longer declared. + + Compares the deployments in the project files against the live ArgoCD + Application resources. Surfaces ``orphaned_deployments`` (a live application + with no project-file entry, the toets-hn7/pr-36-class durable failure where a + terminal delete left the app/manifests/pods running) and + ``missing_deployments`` (declared but no live application). Performs ZERO + mutations; remediation is a deliberate operator step. + + Example: + curl -X GET "http://localhost:9595/api/v2/admin/deployments/drift" \\ + -H "X-API-Key: your-admin-api-key" + """ + from opi.connectors.kubectl import create_kubectl_connector + from opi.core.cluster_config import get_argo_namespace + from opi.jobs.deployment_drift import classify_deployment_drift + from opi.services.project_service import get_project_service + + cluster = settings.CLUSTER_MANAGER + all_projects = get_project_service().get_all_projects() + project_yamls: list[dict[str, Any]] = [p.data for p in all_projects.values() if p.data] + + kubectl = create_kubectl_connector() + # OPI labels every Application it creates with `project`; selecting on the + # label's existence lists all OPI-managed apps without depending on ArgoCD RBAC. + argo_apps = await kubectl.get_resources_by_label("applications.argoproj.io", get_argo_namespace(cluster), "project") + + report = classify_deployment_drift(project_yamls, cluster, argo_apps) + return JSONResponse(content=report, status_code=200) + + @admin_router.post("/orphans/confirm") @validate_admin_api_key async def confirm_orphans(request: Request) -> JSONResponse: diff --git a/operations-manager/python/opi/connectors/git.py b/operations-manager/python/opi/connectors/git.py index 892e0c5e..56f16f56 100644 --- a/operations-manager/python/opi/connectors/git.py +++ b/operations-manager/python/opi/connectors/git.py @@ -46,6 +46,16 @@ def _obfuscate_git_command(cmd_str: str) -> str: return obfuscated +class GitPushConflictError(RuntimeError): + """Raised when a push fails because a rebase on the remote branch hit a + content conflict that git cannot resolve automatically. + + Distinct from generic push failures so callers can react specifically (for + example by re-reading the remote state and re-applying their intended change + instead of failing terminally). See push_changes(reapply=...). + """ + + class GitConnector: """Connector for interacting with Git repositories using GitPython.""" @@ -825,6 +835,23 @@ async def _rebase_on_remote(self, branch: str | None = None) -> bool: logger.error(f"Failed to rebase on {server_info}: {e}") raise + async def _reset_to_remote(self, branch: str | None = None) -> None: + """Discard local commits and working-tree changes, hard-resetting to + origin/. + + Used to recover from an unresolvable rebase conflict before re-applying an + intended change on top of the current remote state. + """ + target_branch = branch or self.branch + fetch_cmd = ["fetch", "origin", target_branch] + _, stderr, code = await self._run_git_command(fetch_cmd, cwd=self.__working_dir) + self._check_git_command_result(code, stderr, f"fetch origin/{target_branch}") + + reset_cmd = ["reset", "--hard", f"origin/{target_branch}"] + _, stderr, code = await self._run_git_command(reset_cmd, cwd=self.__working_dir) + self._check_git_command_result(code, stderr, f"reset --hard origin/{target_branch}") + logger.info(f"Hard-reset working tree to origin/{target_branch}") + async def file_changed_between_commits(self, file_path: str, old_commit: str, new_commit: str) -> bool: """ Check if a specific file was changed between commits using git diff. @@ -1098,7 +1125,11 @@ def _abort_if_plaintext_secrets_present(self) -> None: "Dit zou secrets in platte tekst naar git committen; operatie afgebroken." ) - async def commit_and_push(self, message: str) -> None: + async def commit_and_push( + self, + message: str, + reapply: Callable[[], Coroutine[Any, Any, None]] | None = None, + ) -> None: """ Commit all changes in the working directory and push to remote repository. This stages all changes (including new files, modifications, and deletions) before committing. @@ -1106,6 +1137,10 @@ async def commit_and_push(self, message: str) -> None: Args: message: Commit message + reapply: Optional async callback that re-applies the intended change to + the working tree if the push hits an unresolvable rebase conflict. + See push_changes for the recovery contract. When omitted, a rebase + conflict raises GitPushConflictError. """ logger.debug(f"Committing and pushing all changes: {message}") @@ -1122,7 +1157,7 @@ async def commit_and_push(self, message: str) -> None: # Commit and push the changes await self.commit_changes(message) - await self.push_changes() + await self.push_changes(reapply=reapply, commit_message=message) logger.info(f"Successfully committed and pushed all changes: {message}") @@ -1333,19 +1368,42 @@ async def commit_changes(self, message: str) -> None: logger.debug(f"Successfully committed changes: {message}") # TODO: update push changes to handle rebase, and if rebase fails, commit and push to temporary branch - async def push_changes(self, branch: str | None = None, max_retries: int = 5) -> None: + async def push_changes( + self, + branch: str | None = None, + max_retries: int = 5, + reapply: Callable[[], Coroutine[Any, Any, None]] | None = None, + commit_message: str | None = None, + ) -> None: """ Push committed changes to remote repository. If the push fails due to non-fast-forward (remote has newer commits), this method will automatically fetch, rebase, and retry the push. + If a non-fast-forward push then hits a content conflict during rebase + (concurrent writers touching the same file region, e.g. two sibling + deployment deletes editing the same project file), behaviour depends on + ``reapply``: + + - When ``reapply`` is provided, the local commit is discarded, the working + tree is hard-reset to the current remote state, ``reapply`` re-applies the + intended change on top of it, and the push is retried. This converges + instead of failing on a textual conflict that is semantically trivial. + - When ``reapply`` is None (default), a ``GitPushConflictError`` is raised. + Args: branch: Branch to push to (defaults to configured branch) max_retries: Maximum number of push attempts after rebase (default: 5) + reapply: Optional async callback that re-applies the caller's intended + change to the working tree (files only; staging and commit are + handled here). Invoked after a hard reset to the current remote. + commit_message: Commit message used when re-committing a re-applied + change. Required for the re-apply path to produce a clean commit. Raises: - RuntimeError: If push fails after all retries or if rebase has conflicts + GitPushConflictError: If rebase hits a conflict and no ``reapply`` is given. + RuntimeError: If push fails after all retries for other reasons. """ await self.ensure_repo_cloned() @@ -1380,8 +1438,21 @@ async def push_changes(self, branch: str | None = None, max_retries: int = 5) -> rebase_success = await self._rebase_on_remote(target_branch) if not rebase_success: + if reapply is not None: + logger.warning( + f"Rebase conflict on {target_branch}; resetting to current remote " + f"and re-applying the intended change before retrying push" + ) + await self._reset_to_remote(target_branch) + await reapply() + # Stage and commit the re-applied change on top of the + # now-current remote state, then retry the push. + await self._run_git_command(["add", "-A"], cwd=self.__working_dir) + await self.commit_changes(commit_message or "Re-apply change after rebase conflict") + continue + server_info = self._get_server_context() - raise RuntimeError( + raise GitPushConflictError( f"Cannot push to {target_branch} on {server_info}: " f"Remote has conflicting changes that cannot be automatically merged. " f"Manual intervention required." diff --git a/operations-manager/python/opi/core/async_task_service.py b/operations-manager/python/opi/core/async_task_service.py index d6a8e5b4..3ac6ff6f 100644 --- a/operations-manager/python/opi/core/async_task_service.py +++ b/operations-manager/python/opi/core/async_task_service.py @@ -31,6 +31,28 @@ class TaskType(StrEnum): RESTORE = "restore" +# Task types that mutate a project's shared git files (projects/.yaml and +# the per-project ArgoCD kustomization.yaml). Two of these running concurrently for +# the same project race on those shared files and can hit an unrecoverable rebase +# conflict on push. They are serialized per project at claim time (see +# claim_next_task). Clone/backup/restore are intentionally excluded: they touch +# per-deployment data only, so serializing a slow restore behind project edits would +# add latency without preventing any conflict. +PROJECT_FILE_MUTATING_TASK_TYPES: frozenset[str] = frozenset( + { + TaskType.UPSERT_DEPLOYMENT, + TaskType.UPDATE_IMAGE, + TaskType.DELETE_DEPLOYMENT, + TaskType.REFRESH_DEPLOYMENT, + TaskType.REFRESH_PROJECT, + TaskType.CREATE_PROJECT, + TaskType.ADD_COMPONENT, + TaskType.ADD_COMPONENT_TO_DEPLOYMENT, + TaskType.ADD_SERVICE, + } +) + + class AsyncTaskStatus(StrEnum): """Task lifecycle: pending → claimed → running → completed/failed. @@ -223,6 +245,10 @@ async def claim_next_task( # Build optional per-type concurrency filter type_limit_sql = "" params: list[Any] = [cluster] + # $2: the set of project-file-mutating task types, used by the + # in-flight guard below to serialize them per project. + params.append([str(t) for t in PROJECT_FILE_MUTATING_TASK_TYPES]) + mutating_idx = len(params) if type_concurrency_limits: # For each limited type, add a condition that skips pending # tasks of that type when the limit is already reached. @@ -243,10 +269,15 @@ async def claim_next_task( params.extend([task_type, max_concurrent]) type_limit_sql = "AND " + " AND ".join(conditions) - # Claim the next pending task, but skip tasks where another - # task for the same project/deployment is already in-flight. - # This prevents concurrent processing of the same deployment - # which could cause race conditions in git/ArgoCD operations. + # Claim the next pending task, but skip tasks where a conflicting + # task is already in-flight. Two cases are excluded: + # 1. Same project AND same deployment - never process one + # deployment twice at once (git/ArgoCD race). + # 2. Both tasks mutate the project's shared git files - serialize + # these per project (any deployment), because they edit the + # same projects/.yaml and ArgoCD kustomization.yaml + # and a concurrent push can hit an unrecoverable rebase + # conflict (e.g. two sibling deployment deletes). row = await conn.fetchrow( f""" SELECT id FROM async_tasks t @@ -254,9 +285,15 @@ async def claim_next_task( AND NOT EXISTS ( SELECT 1 FROM async_tasks running WHERE running.project_name = t.project_name - AND running.deployment_name IS NOT DISTINCT FROM t.deployment_name AND running.status IN ('claimed', 'running') AND running.id != t.id + AND ( + running.deployment_name IS NOT DISTINCT FROM t.deployment_name + OR ( + t.task_type = ANY(${mutating_idx}::text[]) + AND running.task_type = ANY(${mutating_idx}::text[]) + ) + ) ) {type_limit_sql} ORDER BY t.created_at ASC diff --git a/operations-manager/python/opi/jobs/deployment_drift.py b/operations-manager/python/opi/jobs/deployment_drift.py new file mode 100644 index 00000000..f66ad9bd --- /dev/null +++ b/operations-manager/python/opi/jobs/deployment_drift.py @@ -0,0 +1,116 @@ +"""Read-only deployment drift detection. + +Compares the deployments declared in project files against the ArgoCD +Application resources actually live on the cluster, surfacing deployments that +are live but no longer declared. This is the durable failure mode a per-deployment +delete can leave behind when a step fails terminally: the toets-hn7/pr-32 case, +where the deployment was removed from the project file but its ArgoCD Application, +manifests and pods kept running for days with nothing detecting it. + +Report-only: this module never mutates anything. Remediation is a deliberate, +operator-driven step (mirrors the service-orphan sweep's "no auto-delete from a +scan" safety rule). +""" + +from typing import Any + +from opi.utils.naming import generate_argocd_application_name, generate_infrastructure_application_name + + +def _declared_app_names(project_yamls: list[dict[str, Any]], cluster: str) -> dict[str, tuple[str, str]]: + """Map expected ArgoCD application name -> (project, deployment) for a cluster.""" + expected: dict[str, tuple[str, str]] = {} + for project in project_yamls: + project_name = project.get("name", "") + if not project_name: + continue + for deployment in project.get("deployments", []): + if deployment.get("cluster") != cluster: + continue + deployment_name = deployment.get("name", "") + if not deployment_name: + continue + app_name = generate_argocd_application_name(project_name, deployment_name) + expected[app_name] = (project_name, deployment_name) + return expected + + +def classify_deployment_drift( + project_yamls: list[dict[str, Any]], + cluster: str, + argo_apps: list[dict[str, Any]], +) -> dict[str, Any]: + """Classify drift between declared deployments and live ArgoCD applications. + + Args: + project_yamls: Parsed project files (each a dict with name + deployments). + cluster: The cluster this OPI manages (CLUSTER_MANAGER). + argo_apps: Live ArgoCD Application resources (kubectl JSON ``items``), each + expected to carry a ``project`` metadata label (set by OPI's template). + + Returns: + A report dict with an ``orphaned_deployments`` list (live but not declared, + the pr-32 case) and a ``missing_deployments`` list (declared but no live + application). No deletions are performed or scheduled. + """ + expected = _declared_app_names(project_yamls, cluster) + expected_names = set(expected) + live_names: set[str] = set() + orphaned: list[dict[str, Any]] = [] + + for app in argo_apps: + meta = app.get("metadata", {}) or {} + name = meta.get("name", "") + if not name: + continue + + labels = meta.get("labels", {}) or {} + project_label = labels.get("project", "") + + # Project-infrastructure apps ("{project}-infrastructure") are not + # deployments; they manage per-project infra (e.g. PostgreSQL clusters) + # and are never listed in deployments[]. Skip them so they are not + # mis-reported as orphaned deployments. + if project_label and name == generate_infrastructure_application_name(project_label): + continue + + live_names.add(name) + if name in expected_names: + continue + + # app name is "{project}-{deployment}"; strip the known project prefix. + deployment = name[len(project_label) + 1 :] if project_label and name.startswith(f"{project_label}-") else None + status = app.get("status", {}) or {} + orphaned.append( + { + "argocd_app": name, + "project": project_label or None, + "deployment": deployment, + "sync": (status.get("sync") or {}).get("status"), + "health": (status.get("health") or {}).get("status"), + "reason": "live ArgoCD application not declared in any project file for this cluster", + } + ) + + missing = [ + { + "argocd_app": app_name, + "project": project_name, + "deployment": deployment_name, + "reason": "declared deployment has no live ArgoCD application", + } + for app_name, (project_name, deployment_name) in expected.items() + if app_name not in live_names + ] + + return { + "cluster": cluster, + "summary": { + "declared": len(expected_names), + "live": len(live_names), + "orphaned": len(orphaned), + "missing": len(missing), + }, + "orphaned_deployments": orphaned, + "missing_deployments": missing, + } diff --git a/operations-manager/python/opi/manager/delete_project_manager.py b/operations-manager/python/opi/manager/delete_project_manager.py index 754687d3..7fa76dbd 100644 --- a/operations-manager/python/opi/manager/delete_project_manager.py +++ b/operations-manager/python/opi/manager/delete_project_manager.py @@ -28,6 +28,7 @@ generate_infrastructure_argocd_folder_path, get_output_filename_from_template, ) +from opi.utils.yaml_util import load_yaml_from_path, save_yaml_to_path logger = logging.getLogger(__name__) @@ -1631,9 +1632,26 @@ async def delete_deployment(self, project_name: str, deployment_name: str, force await self.project_manager.save_project_data() + async def _reapply_remove_deployment() -> None: + # Recovery path for a rebase conflict on the shared project file: + # a concurrent writer (e.g. a sibling deployment delete) edited the + # same deployments list. Re-read the file as it now exists on the + # freshly reset working tree and re-remove this deployment, so the + # other change is preserved instead of conflicting. Reads/writes the + # file directly to bypass the cached in-memory project data. + project_file_path = await self.project_manager.get_project_full_file_path() + fresh_data = load_yaml_from_path(project_file_path) + if fresh_data is None: + return + fresh_data["deployments"] = [ + dep for dep in fresh_data.get("deployments", []) if dep.get("name") != deployment_name + ] + save_yaml_to_path(project_file_path, fresh_data) + git_connector = await self.project_manager.get_git_connector_for_project_files() await git_connector.commit_and_push( - f"Delete deployment '{deployment_name}' from project {project_name}" + f"Delete deployment '{deployment_name}' from project {project_name}", + reapply=_reapply_remove_deployment, ) deletion_results["operations"].append( diff --git a/operations-manager/python/tests/test_async_task_claim_serialization_db.py b/operations-manager/python/tests/test_async_task_claim_serialization_db.py new file mode 100644 index 00000000..4bd5b2dc --- /dev/null +++ b/operations-manager/python/tests/test_async_task_claim_serialization_db.py @@ -0,0 +1,139 @@ +"""Real-Postgres integration tests for per-project claim serialization (1a). + +Proves the actual SQL semantics of AsyncTaskService.claim_next_task against a +live database, not just the query string. Reproduces the toets-hn7/pr-36 +scenario: two sibling deployment deletes of the same project must NOT both be +claimed, while unrelated work (other projects, non-mutating task types) must +still be claimable. + +Run against an ephemeral Postgres: + + docker run -d --rm -e POSTGRES_PASSWORD=pw -p 55432:5432 postgres:16 + TEST_DATABASE_DSN=postgresql://postgres:pw@localhost:55432/postgres \\ + uv run pytest tests/test_async_task_claim_serialization_db.py -m requires_infra -q +""" + +import os +from typing import TYPE_CHECKING, Any + +import asyncpg +import pytest + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator +from opi.core.async_task_schema import ASYNC_TASKS_TABLE_SQL +from opi.core.async_task_service import AsyncTaskService + +pytestmark = [pytest.mark.requires_infra] + +CLUSTER = "odcn-production" +DSN = os.environ.get("TEST_DATABASE_DSN") + + +# asyncpg.Pool is duck-compatible with the DatabasePool interface AsyncTaskService +# uses (acquire/release + connection transaction/fetchrow/execute); typed as Any +# so the real pool can be passed directly. +@pytest.fixture +async def pool() -> AsyncGenerator[Any]: + if not DSN: + pytest.skip("TEST_DATABASE_DSN not set") + pool = await asyncpg.create_pool(DSN) + async with pool.acquire() as conn: + await conn.execute(ASYNC_TASKS_TABLE_SQL) + await conn.execute("TRUNCATE async_tasks") + try: + yield pool + finally: + async with pool.acquire() as conn: + await conn.execute("TRUNCATE async_tasks") + await pool.close() + + +async def _insert(pool: Any, task_type: str, project: str, deployment: str | None) -> str: + row = await pool.fetchrow( + """ + INSERT INTO async_tasks (task_type, project_name, deployment_name, cluster, status) + VALUES ($1, $2, $3, $4, 'pending') + RETURNING id + """, + task_type, + project, + deployment, + CLUSTER, + ) + return str(row["id"]) + + +async def _claim_all(service: AsyncTaskService) -> list[tuple[str, str | None]]: + """Claim repeatedly until none remain; return (task_type, deployment) claimed. + + Claimed tasks stay in status 'claimed' (as in production, until the task + finishes), so this mirrors what a single worker loop would be allowed to run + concurrently. + """ + claimed: list[tuple[str, str | None]] = [] + while True: + task = await service.claim_next_task(cluster=CLUSTER) + if task is None: + return claimed + claimed.append((task["task_type"], task["deployment_name"])) + + +async def test_two_sibling_deletes_are_serialized(pool: Any) -> None: + """Two deletes of the same project (different deployments) -> only one claimable.""" + service = AsyncTaskService(pool=pool, cluster=CLUSTER) + await _insert(pool, "delete_deployment", "toets-hn7", "pr-36") + await _insert(pool, "delete_deployment", "toets-hn7", "pr-37") + + claimed = await _claim_all(service) + + assert len(claimed) == 1, f"only one sibling delete should be claimable, got {claimed}" + assert claimed[0][0] == "delete_deployment" + + +async def test_mutating_pair_serialized_across_types(pool: Any) -> None: + """A delete and an upsert on the same project also serialize (both mutate the + shared project file), even with different deployments.""" + service = AsyncTaskService(pool=pool, cluster=CLUSTER) + await _insert(pool, "delete_deployment", "toets-hn7", "pr-36") + await _insert(pool, "upsert_deployment", "toets-hn7", "pr-40") + + claimed = await _claim_all(service) + + assert len(claimed) == 1, f"mutating tasks on one project must serialize, got {claimed}" + + +async def test_backup_not_blocked_by_mutating_task(pool: Any) -> None: + """A backup (non-mutating) on the same project is NOT blocked by a running + delete, so slow restores/backups never queue behind deploys.""" + service = AsyncTaskService(pool=pool, cluster=CLUSTER) + await _insert(pool, "delete_deployment", "toets-hn7", "pr-36") + await _insert(pool, "backup", "toets-hn7", "pr-31") + + claimed = await _claim_all(service) + + types = sorted(t for t, _ in claimed) + assert types == ["backup", "delete_deployment"], f"backup must remain claimable, got {claimed}" + + +async def test_different_projects_run_in_parallel(pool: Any) -> None: + """Mutating tasks on different projects are not serialized against each other.""" + service = AsyncTaskService(pool=pool, cluster=CLUSTER) + await _insert(pool, "delete_deployment", "toets-hn7", "pr-36") + await _insert(pool, "delete_deployment", "regel-k4c", "pr-99") + + claimed = await _claim_all(service) + + assert len(claimed) == 2, f"cross-project deletes must both be claimable, got {claimed}" + + +async def test_same_deployment_still_serialized(pool: Any) -> None: + """The original guard still holds: two tasks on the exact same deployment + (even non-mutating) do not both run.""" + service = AsyncTaskService(pool=pool, cluster=CLUSTER) + await _insert(pool, "backup", "toets-hn7", "pr-31") + await _insert(pool, "restore", "toets-hn7", "pr-31") + + claimed = await _claim_all(service) + + assert len(claimed) == 1, f"same-deployment tasks must serialize, got {claimed}" diff --git a/operations-manager/python/tests/test_async_task_service.py b/operations-manager/python/tests/test_async_task_service.py index e2ee38c7..7eb4b462 100644 --- a/operations-manager/python/tests/test_async_task_service.py +++ b/operations-manager/python/tests/test_async_task_service.py @@ -204,6 +204,49 @@ async def test_claim_next_task_empty( mock_pool.release.assert_awaited_once_with(mock_connection) +async def test_claim_next_task_serializes_mutating_tasks_per_project( + task_service: AsyncTaskService, + mock_pool: MagicMock, + mock_connection: AsyncMock, +) -> None: + """The claim query serializes project-file-mutating tasks per project. + + Two sibling deployment deletes (same project, different deployment) edit the + same projects/.yaml and ArgoCD kustomization.yaml; running them + concurrently caused the toets-hn7/pr-36 rebase-conflict incident. The guard + must exclude any in-flight mutating task for the same project, regardless of + deployment_name, while leaving non-mutating tasks (backup/restore) on the + per-deployment behavior. + """ + from opi.core.async_task_service import PROJECT_FILE_MUTATING_TASK_TYPES + + mock_pool.acquire.return_value = mock_connection + mock_connection.fetchrow = AsyncMock(return_value=None) + tx_ctx = MagicMock() + tx_ctx.__aenter__ = AsyncMock() + tx_ctx.__aexit__ = AsyncMock(return_value=False) + mock_connection.transaction = MagicMock(return_value=tx_ctx) + + await task_service.claim_next_task(cluster="test-cluster") + + select_sql, params = mock_connection.fetchrow.await_args.args[0], mock_connection.fetchrow.await_args.args[1:] + + # The mutating-types set is passed as a query parameter and used to serialize + # mutating-vs-mutating pairs per project (any deployment). + mutating_param = next(p for p in params if isinstance(p, list)) + assert "delete_deployment" in mutating_param + assert "upsert_deployment" in mutating_param + # Backups/restores must NOT serialize per project. + assert "backup" not in mutating_param + assert "restore" not in mutating_param + assert set(mutating_param) == {str(t) for t in PROJECT_FILE_MUTATING_TASK_TYPES} + + # The per-deployment guard is retained, and the per-project mutating guard is added. + assert "running.deployment_name IS NOT DISTINCT FROM t.deployment_name" in select_sql + assert "t.task_type = ANY(" in select_sql + assert "running.task_type = ANY(" in select_sql + + async def test_start_task( task_service: AsyncTaskService, mock_pool: MagicMock, diff --git a/operations-manager/python/tests/test_deployment_drift.py b/operations-manager/python/tests/test_deployment_drift.py new file mode 100644 index 00000000..f8d8469a --- /dev/null +++ b/operations-manager/python/tests/test_deployment_drift.py @@ -0,0 +1,102 @@ +"""Tests for read-only deployment drift detection (the pr-32 orphan case).""" + +from opi.jobs.deployment_drift import classify_deployment_drift + +CLUSTER = "odcn-production" + + +def _app(name: str, project: str, sync: str = "Synced", health: str = "Healthy") -> dict: + return { + "metadata": {"name": name, "labels": {"project": project}}, + "status": {"sync": {"status": sync}, "health": {"status": health}}, + } + + +def _project(name: str, deployments: list[str], cluster: str = CLUSTER) -> dict: + return {"name": name, "deployments": [{"name": d, "cluster": cluster} for d in deployments]} + + +def test_detects_live_but_undeclared_deployment() -> None: + """A live ArgoCD app with no project-file entry is reported as orphaned. + + Mirrors toets-hn7/pr-32: removed from the project file on delete, but its + Application kept running. + """ + project_yamls = [_project("toets-hn7", ["production", "pr-31", "pr-38"])] + argo_apps = [ + _app("toets-hn7-production", "toets-hn7"), + _app("toets-hn7-pr-31", "toets-hn7"), + _app("toets-hn7-pr-38", "toets-hn7"), + _app("toets-hn7-pr-32", "toets-hn7", health="Healthy"), # orphan + ] + + report = classify_deployment_drift(project_yamls, CLUSTER, argo_apps) + + assert report["summary"]["orphaned"] == 1 + orphan = report["orphaned_deployments"][0] + assert orphan["argocd_app"] == "toets-hn7-pr-32" + assert orphan["project"] == "toets-hn7" + assert orphan["deployment"] == "pr-32" + assert orphan["health"] == "Healthy" + + +def test_no_drift_when_all_declared() -> None: + """When every live app is declared, nothing is flagged.""" + project_yamls = [_project("toets-hn7", ["production", "pr-31"])] + argo_apps = [ + _app("toets-hn7-production", "toets-hn7"), + _app("toets-hn7-pr-31", "toets-hn7"), + ] + + report = classify_deployment_drift(project_yamls, CLUSTER, argo_apps) + + assert report["summary"]["orphaned"] == 0 + assert report["summary"]["missing"] == 0 + + +def test_declared_but_no_live_app_is_missing_not_orphan() -> None: + """A declared deployment with no live application is reported as missing.""" + project_yamls = [_project("toets-hn7", ["production", "pr-99"])] + argo_apps = [_app("toets-hn7-production", "toets-hn7")] + + report = classify_deployment_drift(project_yamls, CLUSTER, argo_apps) + + assert report["summary"]["orphaned"] == 0 + assert report["summary"]["missing"] == 1 + assert report["missing_deployments"][0]["deployment"] == "pr-99" + + +def test_infrastructure_apps_are_not_reported_as_orphans() -> None: + """`{project}-infrastructure` apps manage per-project infra, not deployments, + so they must not be flagged as orphaned even though they carry a project label.""" + project_yamls = [_project("algor-odc", ["production"])] + argo_apps = [ + _app("algor-odc-production", "algor-odc"), + _app("algor-odc-infrastructure", "algor-odc"), # infra app, not a deployment + ] + + report = classify_deployment_drift(project_yamls, CLUSTER, argo_apps) + + assert report["summary"]["orphaned"] == 0 + assert report["summary"]["live"] == 1 # only the deployment counts as live + + +def test_other_cluster_deployments_are_ignored() -> None: + """Deployments declared on another cluster don't count toward this cluster.""" + project_yamls = [ + { + "name": "toets-hn7", + "deployments": [ + {"name": "production", "cluster": CLUSTER}, + {"name": "pr-50", "cluster": "some-other-cluster"}, + ], + } + ] + # pr-50 lives only on the other cluster, so it is not a live app here. + argo_apps = [_app("toets-hn7-production", "toets-hn7")] + + report = classify_deployment_drift(project_yamls, CLUSTER, argo_apps) + + assert report["summary"]["declared"] == 1 + assert report["summary"]["orphaned"] == 0 + assert report["summary"]["missing"] == 0 diff --git a/operations-manager/python/tests/test_git_push_conflict.py b/operations-manager/python/tests/test_git_push_conflict.py new file mode 100644 index 00000000..b4d0675b --- /dev/null +++ b/operations-manager/python/tests/test_git_push_conflict.py @@ -0,0 +1,63 @@ +"""Tests for self-healing project-file pushes on rebase conflicts. + +Reproduces the toets-hn7/pr-36 incident at the GitConnector level: a push is +rejected non-fast-forward, the rebase onto the remote then hits a content +conflict (a concurrent writer touched the same file region). With a ``reapply`` +callback the connector must reset to the current remote, re-apply the intended +change, and converge; without one it must raise a typed GitPushConflictError. +""" + +from unittest.mock import AsyncMock + +import pytest +from opi.connectors.git import GitConnector, GitPushConflictError + + +def _make_connector() -> GitConnector: + connector = GitConnector(repo_url="ssh://git@example.com/repo.git") + # Avoid any real git/network work; the push control flow is what we exercise. + connector.ensure_repo_cloned = AsyncMock() + connector.commit_changes = AsyncMock() + connector._reset_to_remote = AsyncMock() + return connector + + +async def test_push_reapplies_intent_on_rebase_conflict() -> None: + """On a rebase conflict with a reapply callback, the connector resets to the + remote, re-applies the change, and the retried push succeeds.""" + connector = _make_connector() + + # First push is rejected non-fast-forward; the retry (after re-apply) succeeds. + push_outcomes = [("", "! [rejected] (non-fast-forward)", 1), ("", "", 0)] + + async def fake_run(cmd: list[str], cwd: str | None = None) -> tuple[str, str, int]: + if cmd and cmd[0] == "push": + return push_outcomes.pop(0) + return ("", "", 0) + + connector._run_git_command = AsyncMock(side_effect=fake_run) + # The rebase cannot auto-merge the concurrent change. + connector._rebase_on_remote = AsyncMock(return_value=False) + + reapply = AsyncMock() + + await connector.push_changes(reapply=reapply, commit_message="Delete deployment 'pr-36'") + + reapply.assert_awaited_once() + connector._reset_to_remote.assert_awaited_once() + # The re-applied change is committed before the retry push. + connector.commit_changes.assert_awaited_once_with("Delete deployment 'pr-36'") + assert not push_outcomes, "both push attempts should have been consumed" + + +async def test_push_raises_typed_conflict_without_reapply() -> None: + """Without a reapply callback, a rebase conflict raises GitPushConflictError + (not a bare RuntimeError), so callers can react specifically.""" + connector = _make_connector() + connector._run_git_command = AsyncMock(return_value=("", "! [rejected] (non-fast-forward)", 1)) + connector._rebase_on_remote = AsyncMock(return_value=False) + + with pytest.raises(GitPushConflictError): + await connector.push_changes(commit_message="Delete deployment 'pr-36'") + + connector._reset_to_remote.assert_not_awaited() diff --git a/operations-manager/python/tests/test_git_push_conflict_integration.py b/operations-manager/python/tests/test_git_push_conflict_integration.py new file mode 100644 index 00000000..ffbce9bf --- /dev/null +++ b/operations-manager/python/tests/test_git_push_conflict_integration.py @@ -0,0 +1,82 @@ +"""Real-git end-to-end test for the project-file push self-heal (1b). + +Unlike test_git_push_conflict.py (which mocks the git layer to check control +flow), this drives actual git against a local bare remote: it forces a real +rebase conflict and proves the reapply path resets to the remote, re-applies the +intended change on fresh content, and converges, while preserving the concurrent +writer's independent change. + +The GitConnector only understands ssh/http(s)/git URLs, so the test pre-clones +the bare remote into the connector's working_dir and skips the connector's own +clone step. The push/fetch/rebase/reset code under test operates on the working +tree's configured ``origin`` remote, exactly as in production. +""" + +import subprocess +from typing import TYPE_CHECKING + +from opi.connectors.git import GitConnector + +if TYPE_CHECKING: + from pathlib import Path + + +def _git(*args: str, cwd: Path) -> None: + subprocess.run(["git", *args], cwd=cwd, check=True, capture_output=True) + + +def _commit_push(repo: Path, content: str, message: str) -> None: + (repo / "data.txt").write_text(content) + _git("add", "-A", cwd=repo) + _git("commit", "-m", message, cwd=repo) + _git("push", "origin", "main", cwd=repo) + + +def _set_identity(repo: Path) -> None: + _git("config", "user.email", "test@test", cwd=repo) + _git("config", "user.name", "test", cwd=repo) + + +async def test_self_heal_converges_and_preserves_concurrent_change(tmp_path: Path) -> None: + remote = tmp_path / "remote.git" + _git("init", "--bare", "-b", "main", str(remote), cwd=tmp_path) + + # Seed the remote with two independent lines. + seed = tmp_path / "seed" + _git("clone", str(remote), str(seed), cwd=tmp_path) + _set_identity(seed) + _commit_push(seed, "a=base\nb=base\n", "seed") + + # The connector's working tree: a separate clone of the same remote. + working = tmp_path / "working" + _git("clone", str(remote), str(working), cwd=tmp_path) + _set_identity(working) + + connector = GitConnector(repo_url="https://example.com/repo.git", branch="main", working_dir=str(working)) + # Skip the connector's own clone/fetch; the working tree is already set up. + connector._repo_cloned = True + connector._fetched_in_session = True + + # Stage our intended change in the working tree: a=ours. + (working / "data.txt").write_text("a=ours\nb=base\n") + + # A concurrent writer changes BOTH lines and pushes first, so our push is + # rejected and the rebase of "a=ours" onto "a=remote" conflicts on line a. + _commit_push(seed, "a=remote\nb=remote\n", "concurrent writer") + + async def reapply() -> None: + # Re-read whatever is now on disk (the reset put a=remote, b=remote there) + # and re-apply only our intent (a=ours), leaving b untouched. + lines = (working / "data.txt").read_text().splitlines() + rewritten = ["a=ours" if line.startswith("a=") else line for line in lines] + (working / "data.txt").write_text("\n".join(rewritten) + "\n") + + # Must not raise: the conflict is recovered via reset + reapply + retry. + await connector.commit_and_push("set a=ours", reapply=reapply) + + # Verify the remote ended in the converged state. + verify = tmp_path / "verify" + _git("clone", str(remote), str(verify), cwd=tmp_path) + final = (verify / "data.txt").read_text() + assert "a=ours" in final, f"our intent must win on the conflicting line: {final!r}" + assert "b=remote" in final, f"concurrent independent change must be preserved: {final!r}"