-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathadmin_router.py
More file actions
339 lines (274 loc) · 13.1 KB
/
Copy pathadmin_router.py
File metadata and controls
339 lines (274 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""Admin API router for cleanup, reconciliation, and maintenance operations.
Provides endpoints for managing resource lifecycle operations that are
separate from the normal project API. Authenticated via ADMIN_API_KEY.
Endpoints:
GET /api/v2/admin/marked-for-deletion - List marked resources
POST /api/v2/admin/cleanup/trigger - Trigger cleanup (purge expired)
POST /api/v2/admin/reconciliation/trigger - Trigger full reconciliation
DELETE /api/v2/admin/marked-for-deletion/{mark_id} - Remove a specific mark
GET /api/v2/admin/deployments/drift - Report live-but-undeclared deployments
"""
import logging
from typing import Any
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import JSONResponse
from opi.api.endpoint_util import validate_admin_api_key
from opi.core.config import settings
from opi.core.database_pools import get_database_pool
from opi.services.marked_for_deletion_service import MarkedForDeletionService
logger = logging.getLogger(__name__)
admin_router: APIRouter = APIRouter(
prefix="/api/v2/admin",
tags=["admin"],
responses={
401: {"description": "Unauthorized - invalid or missing ADMIN_API_KEY"},
501: {"description": "ADMIN_API_KEY not configured"},
500: {"description": "Internal server error"},
},
default_response_class=JSONResponse,
)
def _get_marked_for_deletion_service() -> MarkedForDeletionService:
"""Get a MarkedForDeletionService instance using the main database pool."""
pool = get_database_pool("main")
return MarkedForDeletionService(pool)
@admin_router.get("/marked-for-deletion")
@validate_admin_api_key
async def list_marked_for_deletion(
request: Request,
project_name: str | None = Query(None, description="Filter by project name"),
) -> JSONResponse:
"""List resources marked for deletion.
Returns all resources currently in the marked_for_deletion table.
Optionally filter by project_name to see marks for a specific project
(works even if the project no longer exists).
Example:
curl -X GET "http://localhost:9595/api/v2/admin/marked-for-deletion?project_name=my-project" \\
-H "X-API-Key: your-admin-api-key"
"""
service = _get_marked_for_deletion_service()
if project_name:
marks = await service.get_marks_for_project(project_name)
else:
marks = await service.get_all_marks()
return JSONResponse(
content={
"marks": marks,
"total": len(marks),
"filter": {"project_name": project_name},
},
status_code=200,
)
@admin_router.post("/cleanup/trigger")
@validate_admin_api_key
async def trigger_cleanup(
request: Request,
project_name: str = Query(..., description="Project name to clean up (required)"),
dry_run: bool = Query(True, description="Preview actions without executing (default: true)"),
grace_period_days: int | None = Query(None, description="Override grace period in days"),
) -> JSONResponse:
"""Trigger cleanup of expired marked resources for a specific project.
Purges resources that are marked for deletion AND past the grace period.
Uses project_name from the marked_for_deletion table, so this works even
if the project no longer exists in the system.
IMPORTANT: dry_run defaults to true. Set dry_run=false to actually purge resources.
Example:
curl -X POST "http://localhost:9595/api/v2/admin/cleanup/trigger?project_name=my-project&dry_run=false" \\
-H "X-API-Key: your-admin-api-key"
"""
from opi.jobs.reconciliation import cleanup_project
pool = get_database_pool("main")
results = await cleanup_project(
pool=pool,
project_name=project_name,
grace_period_days=grace_period_days,
dry_run=dry_run,
)
if not results["purged"] and not results["errors"]:
results["message"] = f"No expired marks found for project '{project_name}'"
return JSONResponse(content=results, status_code=200)
@admin_router.post("/reconciliation/trigger")
@validate_admin_api_key
async def trigger_reconciliation(
request: Request,
dry_run: bool = Query(True, description="Preview actions without executing (default: true)"),
grace_period_days: int | None = Query(None, description="Override grace period in days"),
) -> JSONResponse:
"""Trigger a full reconciliation run.
Reconciliation performs three operations:
1. Unmarks resources that reappeared in project YAMLs (git revert recovery).
2. Purges resources that are marked AND past the grace period.
3. (Future) Detects newly orphaned resources.
Uses all currently loaded project YAML definitions as the source of truth.
IMPORTANT: dry_run defaults to true. Set dry_run=false to actually purge resources.
Example:
curl -X POST "http://localhost:9595/api/v2/admin/reconciliation/trigger?dry_run=false" \\
-H "X-API-Key: your-admin-api-key"
"""
from opi.jobs.reconciliation import reconcile
from opi.services.project_service import get_project_service
pool = get_database_pool("main")
project_service = get_project_service()
# Build project YAML list from all loaded projects
all_projects = project_service.get_all_projects()
project_yamls: list[dict[str, Any]] = [p.data for p in all_projects.values() if p.data]
results = await reconcile(
pool=pool,
project_yamls=project_yamls,
grace_period_days=grace_period_days,
dry_run=dry_run,
)
return JSONResponse(
content={
"message": "Reconciliation completed",
"projects_evaluated": len(project_yamls),
"dry_run": dry_run,
"grace_period_days": grace_period_days or settings.DELETION_GRACE_PERIOD_DAYS,
**results,
},
status_code=200,
)
@admin_router.delete("/marked-for-deletion/{mark_id}")
@validate_admin_api_key
async def delete_mark(
request: Request,
mark_id: str,
) -> JSONResponse:
"""Remove a specific deletion mark without purging the resource.
Use this to manually cancel the scheduled deletion of a resource.
The resource itself is NOT deleted - only the mark is removed.
Example:
curl -X DELETE "http://localhost:9595/api/v2/admin/marked-for-deletion/some-uuid" \\
-H "X-API-Key: your-admin-api-key"
"""
service = _get_marked_for_deletion_service()
deleted = await service.delete_mark(mark_id)
if not deleted:
raise HTTPException(
status_code=404,
detail=f"Mark '{mark_id}' not found",
)
return JSONResponse(
content={"message": f"Mark '{mark_id}' removed successfully"},
status_code=200,
)
@admin_router.get("/orphans/report")
@validate_admin_api_key
async def orphan_sweep_report(request: Request) -> JSONResponse:
"""Run the read-only service-orphan sweep and return the report.
Inventories PostgreSQL databases, Keycloak realms/clients and MinIO
buckets, classified against the live project files. Performs ZERO
mutations. Deletion requires POST /orphans/confirm with an explicit
item list, followed by the normal grace-period purge.
Example:
curl -X GET "http://localhost:9595/api/v2/admin/orphans/report" \\
-H "X-API-Key: your-admin-api-key"
"""
from opi.jobs.service_orphan_sweep import sweep
from opi.services.project_service import get_project_service
pool = get_database_pool("main")
all_projects = get_project_service().get_all_projects()
project_yamls: list[dict[str, Any]] = [p.data for p in all_projects.values() if p.data]
report = await sweep(pool, project_yamls, cluster=settings.CLUSTER_MANAGER)
return JSONResponse(content=report, status_code=200)
@admin_router.get("/deployments/drift")
@validate_admin_api_key
async def deployment_drift_report(request: Request) -> JSONResponse:
"""Report deployments that are live on the cluster but no longer declared.
Compares the deployments in the project files against the live ArgoCD
Application resources. Surfaces ``orphaned_deployments`` (a live application
with no project-file entry, the toets-hn7/pr-36-class durable failure where a
terminal delete left the app/manifests/pods running) and
``missing_deployments`` (declared but no live application). Performs ZERO
mutations; remediation is a deliberate operator step.
Example:
curl -X GET "http://localhost:9595/api/v2/admin/deployments/drift" \\
-H "X-API-Key: your-admin-api-key"
"""
from opi.connectors.kubectl import create_kubectl_connector
from opi.core.cluster_config import get_argo_namespace
from opi.jobs.deployment_drift import classify_deployment_drift
from opi.services.project_service import get_project_service
cluster = settings.CLUSTER_MANAGER
all_projects = get_project_service().get_all_projects()
project_yamls: list[dict[str, Any]] = [p.data for p in all_projects.values() if p.data]
kubectl = create_kubectl_connector()
# OPI labels every Application it creates with `project`; selecting on the
# label's existence lists all OPI-managed apps without depending on ArgoCD RBAC.
argo_apps = await kubectl.get_resources_by_label("applications.argoproj.io", get_argo_namespace(cluster), "project")
report = classify_deployment_drift(project_yamls, cluster, argo_apps)
return JSONResponse(content=report, status_code=200)
@admin_router.post("/orphans/confirm")
@validate_admin_api_key
async def confirm_orphans(request: Request) -> JSONResponse:
"""Mark confirmed orphan candidates for grace-period deletion.
Body: {"items": [{"type": "...", "name": "...", "realm": "..."}]}
with type one of postgresql_database, postgresql_user, minio_bucket,
keycloak_client (keycloak_client requires "realm").
Safety: the sweep is re-run server-side and each submitted item must
still be classified ``orphan_candidate`` in the fresh report. Items
that are expected, system, in_use_anomaly or unknown are rejected.
Accepted items are marked in marked_for_deletion; actual deletion
happens via the normal reconciliation purge after the grace period.
Example:
curl -X POST "http://localhost:9595/api/v2/admin/orphans/confirm" \\
-H "X-API-Key: your-admin-api-key" -H "Content-Type: application/json" \\
-d '{"items": [{"type": "postgresql_database", "name": "regel_k4c_pr104"}]}'
"""
from opi.jobs.service_orphan_sweep import CONFIRMABLE, sweep
from opi.services.project_service import get_project_service
body = await request.json()
items = body.get("items")
if not isinstance(items, list) or not items:
raise HTTPException(status_code=400, detail="Body must contain a non-empty 'items' list")
pool = get_database_pool("main")
all_projects = get_project_service().get_all_projects()
project_yamls: list[dict[str, Any]] = [p.data for p in all_projects.values() if p.data]
cluster = settings.CLUSTER_MANAGER
report = await sweep(pool, project_yamls, cluster=cluster)
# Index the fresh report by (type, name[, realm]) -> classification
candidates: dict[tuple, dict[str, Any]] = {}
for entry in report["databases"]:
candidates[("postgresql_database", entry["name"])] = entry
candidates[("postgresql_user", entry["name"])] = entry
for entry in report["minio_buckets"]:
candidates[("minio_bucket", entry["name"])] = entry
for entry in report["keycloak_clients"]:
candidates[("keycloak_client", entry["client_id"], entry["realm"])] = entry
service = _get_marked_for_deletion_service()
accepted: list[dict[str, Any]] = []
rejected: list[dict[str, Any]] = []
for item in items:
itype = item.get("type", "")
name = item.get("name", "")
realm = item.get("realm", "")
key = ("keycloak_client", name, realm) if itype == "keycloak_client" else (itype, name)
entry = candidates.get(key)
if entry is None:
rejected.append({**item, "reason": "not present in the current sweep report"})
continue
if entry["classification"] != CONFIRMABLE:
rejected.append(
{**item, "reason": f"classified '{entry['classification']}' - only orphan_candidate is confirmable"}
)
continue
metadata: dict[str, Any] = {"confirmed_via": "orphans/confirm", "sweep_reason": entry["reason"]}
if itype == "keycloak_client":
metadata["realm"] = realm
await service.mark_resource(
resource_type=itype,
resource_name=name,
project_name=item.get("project_name", ""),
deployment_name=item.get("deployment_name", ""),
cluster=cluster,
metadata=metadata,
)
accepted.append(item)
return JSONResponse(
content={
"message": f"{len(accepted)} item(s) marked for deletion, {len(rejected)} rejected",
"grace_period_days": settings.DELETION_GRACE_PERIOD_DAYS,
"accepted": accepted,
"rejected": rejected,
},
status_code=200,
)