-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathproject_manager.py
More file actions
7876 lines (6693 loc) · 377 KB
/
Copy pathproject_manager.py
File metadata and controls
7876 lines (6693 loc) · 377 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
The project manager handles project files. It can read, update, delete, or process them.
Processing means it can create, update, or delete any resources defined in a project file.
"""
import asyncio
import base64
import contextlib
import copy
import glob
import logging
import os
import secrets
import shutil
from collections.abc import Callable # noqa: TC003 - used in an eagerly-evaluated annotation (no future-annotations)
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, TypeVar, cast
from fastapi import HTTPException
from jsonpath_ng.ext import parse as jsonpath_parse
from ruamel.yaml import YAML
from ruamel.yaml.scalarstring import LiteralScalarString
from opi.connectors import create_argo_connector
from opi.connectors.chisel_connector import ChiselConnector
from opi.connectors.git import (
GitConnector,
GitPushConflictError,
create_git_connector_for_argocd,
create_git_connector_for_project_files,
create_git_connector_from_repo_config,
)
from opi.connectors.kubectl import KubectlConnector
from opi.connectors.subdomain import SubdomainConnector, ensure_domain_requests
from opi.core.cluster_config import (
get_argo_namespace,
get_backup_namespace,
get_ca_certificate_config,
get_external_dns_target_for_hostname,
get_infrastructure_namespace,
get_ingress_cluster_issuer,
get_ingress_controller_selector,
get_ingress_ip_whitelist,
get_ingress_postfix,
get_ingress_tls_enabled,
get_keycloak_discovery_url,
get_letsencrypt_contact_email,
get_minio_host,
get_minio_port,
get_namespace,
get_prefixed_namespace,
supports_vpa,
uses_capsule,
)
from opi.core.config import settings
from opi.core.project_schema import ProjectIntegrityError, ProjectSchemaError, validate_project_schema
from opi.extensions import load_extensions
from opi.forms.editables.enforcers import DomainConfigEnforcer, FieldWarning
from opi.generation.manifests import ManifestGenerator
from opi.handlers.project_file_handler import (
ProjectFileHandler,
attachment_is_referenced,
extract_service_names_from_component,
find_attachment_data_list,
is_image_pull_disable_reason,
save_project_file,
validate_attachment_couplings,
validate_attachment_references,
)
from opi.handlers.sops import SopsHandler
from opi.manager.revision_manager import RevisionManager
from opi.manager.run_support import resolve_image
from opi.services import ServiceAdapter, ServiceType, ServiceValidationError, VariableDefinition
from opi.services.project_service import ProjectUser, get_project_service
from opi.utils.age import (
decrypt_age_content,
decrypt_password_smart,
decrypt_password_smart_auto,
encrypt_age_content,
get_decoded_project_private_key,
get_project_public_key,
is_age_encrypted,
)
from opi.utils.env_vars import detect_circular_references, extract_variable_references, substitute_variables
# Environment variables are now generated using service definitions
from opi.utils.naming import (
DOMAIN_FORMAT_TEMPLATES,
ROOT_COMPONENT_FORMAT_IDS,
HostnameFormat,
generate_argocd_application_name,
generate_bare_domain_hostname,
generate_external_hostname,
generate_helm_values_filename,
generate_ingress_name_from_path,
generate_issuer_manifest_name,
generate_issuer_name,
generate_issuer_secret_name,
generate_manifest_name,
generate_network_policy_manifest_name,
generate_network_policy_name,
generate_nice_url_root_hostname,
generate_project_realm_name,
generate_public_url,
generate_pvc_name,
generate_registry_secret_name,
generate_storage_name,
generate_tls_secret_name,
generate_unique_name,
get_component_ingress_map,
)
from opi.utils.project_utils import (
ComponentValidationError,
build_component_config,
normalize_container_image,
validate_component_paths,
validate_root_component,
)
from opi.utils.secrets import (
BaseSecret,
DatabaseSecret,
KeycloakSecret,
MetricsAuthSecret,
MinIOSecret,
PlatformSecret,
RedisSecret,
RegistrySecret,
UserSecret,
)
from opi.utils.sops import encrypt_to_sops_files_or_fail
from opi.utils.yaml_util import (
find_value_by_jsonpath,
save_yaml_to_path,
)
if TYPE_CHECKING:
from collections.abc import Iterable
from opi.core.task_manager import TaskProgressManager
from opi.manager.database_manager import DatabaseManager
# TypeVar for generic secret types
T = TypeVar("T", bound=BaseSecret)
logger = logging.getLogger(__name__)
def enforce_namespace_pin(project_data: dict[str, Any]) -> None:
"""Pin every deployment namespace to the project name (mutates in place).
Defaults a missing namespace to the project name; raises ValueError on
mismatch. Must be called by every entry that turns a project file into
namespace/ArgoCD actions, otherwise a tenant can label another
project's namespace.
"""
project_name = project_data.get("name")
if not project_name:
return
for deployment in project_data.get("deployments", []):
declared_namespace = deployment.get("namespace")
if declared_namespace is None:
deployment["namespace"] = project_name
elif declared_namespace != project_name:
deployment_label = deployment.get("name", "<naamloos>")
raise ValueError(
f"Deployment '{deployment_label}' in project '{project_name}' "
f"gebruikt namespace '{declared_namespace}', maar de namespace "
f"moet gelijk zijn aan de projectnaam '{project_name}'. "
f"Een afwijkende namespace is niet toegestaan omdat dit toegang "
f"tot de resources van een ander project zou geven."
)
def _deployment_name_for_path(path: str, yaml_doc: dict[str, Any] | None) -> str | None:
"""Resolve a DeepDiff path like ``deployments.7`` to the deployment name in ``yaml_doc``.
Returns None when the path does not point inside a resolvable deployment entry.
"""
parts = path.split(".")
if len(parts) < 2 or parts[0] != "deployments" or not parts[1].isdigit():
return None
deployments = (yaml_doc or {}).get("deployments", [])
index = int(parts[1])
if 0 <= index < len(deployments) and isinstance(deployments[index], dict):
return deployments[index].get("name")
return None
def _deployment_names_for_paths(paths: Iterable[str], yaml_doc: dict[str, Any] | None) -> list[str]:
"""Resolve DeepDiff paths to deployment names (falling back to the raw path), deduplicated."""
names = [_deployment_name_for_path(path, yaml_doc) or path for path in paths]
return list(dict.fromkeys(names))
def _resolve_deployment_filter(deployment_name: str | None, deployment_names: list[str] | None) -> list[str] | None:
"""Normalize the single/plural deployment-filter arguments into one list.
Returns None when no filter is requested (process all deployments). When a
list is returned it is the exhaustive set of deployment names to target -
an empty list therefore means *zero* deployments, not "all". ``deployment_names``
takes precedence over the singular ``deployment_name`` convenience argument.
"""
if deployment_names is not None:
return deployment_names
if deployment_name is not None:
return [deployment_name]
return None
# Filename extensions used for generated per-component manifests. A file is only
# considered for pruning when it ends with one of these (covers plain manifests
# and both pre/post SOPS-encryption secret files).
_COMPONENT_MANIFEST_EXTENSIONS: tuple[str, ...] = (
".sops.yaml",
".to-sops.yaml",
".yaml",
".yml",
)
def _select_obsolete_component_manifests(
directory: str,
component_names: set[str],
generated_files: set[str],
deployment_name: str,
) -> list[str]:
"""Select per-component manifest files that this run did not (re)generate.
Per-component manifests are named with the component reference as the filename
prefix (e.g. ``<component>-deployment.yaml``, ``<component>-service.yaml``,
``<component>-platform-secret.yaml``, ``<component>-user-secret.sops.yaml``,
ingress/pvc/configmap variants), and the authorization-wall cookie secret uses
the deployment-scoped unique name (``<deployment>-<component>-oauth2-cookie-...``).
A per-component file is obsolete when the current run did not generate it
(``generated_files`` is the complete set written this run). That covers both a
removed component (none of its files are regenerated) and a renamed/dropped file of
a surviving component (e.g. an ingress whose path changed, so its filename changed).
Shared/deployment-level files (kustomization, decrypt-sops, issuer-*,
*-network-policy, keycloak/registry secrets, etc.) never start with a component
prefix and are left alone. ``*.marked-for-deletion.yaml`` PVCs are skipped so their
data lifecycle stays governed by the reconciliation grace period.
Args:
directory: Deployment manifest directory to scan (non-recursive).
component_names: All component references owning files here (current + previous).
generated_files: Basenames generated this run (the desired-state set).
deployment_name: Deployment name (for the unique-name oauth2 cookie prefix).
Returns:
Sorted list of basenames to remove.
"""
if not os.path.isdir(directory):
return []
def _owning_component(basename: str) -> str | None:
"""Return the component that a basename belongs to, longest prefix wins.
Considers both the bare ``<component>-`` prefix and the deployment-scoped
``<deployment>-<component>-`` prefix used by the oauth2 cookie secret.
"""
best: str | None = None
best_len = -1
for comp in component_names:
for prefix in (f"{comp}-", f"{deployment_name}-{comp}-"):
if basename.startswith(prefix) and len(prefix) > best_len:
best = comp
best_len = len(prefix)
return best
selected: list[str] = []
for path in glob.glob(os.path.join(directory, "*")):
if not os.path.isfile(path):
continue
basename = os.path.basename(path)
if not basename.endswith(_COMPONENT_MANIFEST_EXTENSIONS):
continue
# PVC manifests for a removed service are renamed to *.marked-for-deletion.yaml
# by pvc_manager.handle_service_removal so ArgoCD keeps the volume alive until
# the reconciliation grace period (recoverable if the removal was a mistake).
# Hard-deleting them here would prune the PVC and its data immediately.
if basename.endswith(".marked-for-deletion.yaml"):
continue
# Generated this run -> part of the desired state, keep it.
if basename in generated_files:
continue
# Only prune files that belong to a component; shared/deployment-level files
# (no component prefix) are never selected.
if _owning_component(basename) is not None:
selected.append(basename)
return sorted(selected)
@dataclass
class DeploymentResult:
"""Result information for a processed deployment."""
deployment_name: str
cluster: str
namespace: str
urls: dict[str, str] = field(default_factory=dict) # component_name -> public_url
status: str = "success"
errors: list[str] = field(default_factory=list)
class ProjectManager:
"""Manager for project resources and deployments."""
def __init__(
self,
*,
project_file_relative_path: str | None = None,
git_connector_for_project_files: GitConnector | None = None,
) -> None:
self.__has_contents = False
logger.debug("Initializing ProjectManager")
self._project_file_relative_path = project_file_relative_path
self._kubectl_connector = KubectlConnector()
self._sops_handler = SopsHandler(self._kubectl_connector)
self._manifest_generator = ManifestGenerator()
self._project_file_handler = ProjectFileHandler()
self._revision_manager = RevisionManager(self._project_file_handler)
self.__git_connector_for_project_files = git_connector_for_project_files
# Track ownership: if connector was injected, we don't own it and shouldn't close it
self.__owns_git_connector_for_project_files = git_connector_for_project_files is None
self.__git_connector_for_argocd = None
# each deployment has a repository, referenced by name
self.__git_connectors_for_deployments: dict[str, GitConnector] = {}
# Progress manager for tracking operation status
self.__progress_manager = None
# Private map for storing secrets that need to be created
# Structure: {deployment_name: {secret_type: secret_instance}}
# Example: {"dev": {"database": DatabaseSecret(...), "keycloak": KeycloakSecret(...)}}
self._secrets_to_create: dict[str, dict[str, BaseSecret]] = {}
# Private map for storing aliases collected from all components in a deployment
# Structure: {deployment_name: {source_type: {service_category: {alias_name: alias_template}}}}
# Example: {"dev": {"secret": {"database": {"DATABASE_URL": "$HOST:$PORT"}}, "direct": {"web": {"PREVIEW_URL": "https://$PUBLIC_HOST"}}}}
self._deployment_aliases: dict[str, dict[str, dict[str, dict[str, str]]]] = {}
# Deployment results collected during processing
# Structure: {deployment_name: DeploymentResult}
# Contains URLs, status, and errors for each processed deployment
self._deployment_results: dict[str, DeploymentResult] = {}
# Track clones performed during processing
# Structure: {deployment_name: {service_type: {"generation": int | None, "timestamp": str}}}
# Example: {"productie": {"postgresql-database": {"generation": None, "timestamp": "..."}}}
# Services report clones here; project_manager updates clone-from status at the end
self._clones_performed: dict[str, dict[str, dict[str, Any]]] = {}
# Runtime force_clone override from API (used by PVC manager and other nested calls)
self._force_clone_override: bool = False
# Last processing error message (set when process_project fails)
self._processing_error: str | None = None
# Per-component failure details (set when DeploymentHealthError is caught)
self._component_failures: list[dict[str, Any]] | None = None
# Change context from YAML diff (populated by process_project_changes)
# Structure: {"previous_yaml": {...}, "current_yaml": {...}, "changes": {"added": {}, "changed": {}, "deleted": {}}}
self._project_changes: dict[str, Any] | None = None
self._closed = False
# Service managers for handling service-specific operations
# Import here to avoid circular dependencies
# TODO: fix me, we don't want this
from opi.manager.argo_manager import ArgoManager
from opi.manager.bootstrap_manager import BootstrapManager
from opi.manager.delete_project_manager import DeleteProjectManager
from opi.manager.keycloak_manager import KeycloakManager
from opi.manager.minio_manager import MinioManager
from opi.manager.pvc_manager import PVCManager
from opi.manager.redis_manager import RedisManager
# DatabaseManager will be lazily initialized on first access
# This allows us to determine the correct database host based on project services
self._database_manager: DatabaseManager | None = None
self._minio_manager = MinioManager(self)
self._keycloak_manager = KeycloakManager(self)
self._redis_manager = RedisManager(self)
self._argo_manager = ArgoManager(self)
self._bootstrap_manager = BootstrapManager(self)
self._delete_project_manager = DeleteProjectManager(self)
self._pvc_manager = PVCManager(self)
async def __aenter__(self) -> ProjectManager:
return self
async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
await self.close()
async def _ensure_database_manager(self, skip_credential_check: bool = False) -> DatabaseManager:
"""
Lazily initialize DatabaseManager with correct database host based on project services.
Args:
skip_credential_check: If True, skip checking for superuser credentials.
Used during infrastructure bootstrapping when credentials
don't exist yet.
"""
if self._database_manager is not None:
# If we have a cached manager but it was created with placeholder credentials,
# and now we need real credentials, reinitialize it
if not skip_credential_check and self._database_manager._admin_password == "placeholder":
logger.info(
"DatabaseManager was initialized with placeholder credentials, reinitializing with real credentials"
)
await self._database_manager.close()
self._database_manager = None
else:
return self._database_manager
from opi.core.cluster_config import get_database_cluster_service_endpoint, get_infrastructure_namespace
from opi.manager.database_manager import DatabaseManager
from opi.services import ServiceType
from opi.utils.naming import generate_postgres_superuser_secret_name
project_data = await self.get_contents()
project_name = project_data.get("name")
project_services = project_data.get("services", [])
# Check if project uses namespace-specific PostgreSQL
uses_namespace_db = any(
ServiceType.NAMESPACE_POSTGRESQL_DATABASE.value in service
if isinstance(service, dict)
else service == ServiceType.NAMESPACE_POSTGRESQL_DATABASE.value
for service in project_services
)
if uses_namespace_db:
# Namespace-specific database
db_host = get_database_cluster_service_endpoint(settings.CLUSTER_MANAGER, project_name)
infrastructure_namespace = get_infrastructure_namespace(settings.CLUSTER_MANAGER, project_name)
secret_name = generate_postgres_superuser_secret_name(project_name)
if skip_credential_check:
# During infrastructure creation, credentials don't exist yet
# Use placeholder values - they'll be replaced after infrastructure is ready
admin_username = "postgres"
admin_password = "placeholder"
logger.info(
f"Initializing DatabaseManager for configuration only (credentials not validated): {db_host}"
)
else:
# Normal operation - get credentials from Kubernetes secret
secret_data = await self._kubectl_connector.get_secret(secret_name, infrastructure_namespace)
if not secret_data:
raise RuntimeError(
f"Superuser secret '{secret_name}' not found in '{infrastructure_namespace}'. "
f"Infrastructure may not be deployed yet."
)
admin_username = secret_data.get("username")
admin_password = secret_data.get("password")
logger.info(f"Initializing DatabaseManager with namespace-specific PostgreSQL: {db_host}")
else:
# Shared database
db_host = settings.DATABASE_HOST
admin_username = settings.DATABASE_ADMIN_NAME
admin_password = settings.DATABASE_ADMIN_PASSWORD
logger.info(f"Initializing DatabaseManager with shared PostgreSQL: {db_host}")
self._database_manager = DatabaseManager(
self, db_host=db_host, admin_username=admin_username, admin_password=admin_password
)
return self._database_manager
async def get_name(self) -> str:
contents = await self.get_contents()
return contents["name"]
async def get_deployments(
self,
cluster_filter: bool = True,
deployment_name: str | None = None,
deployment_names: list[str] | None = None,
) -> list[dict[str, Any]]:
"""
Get deployments with optional cluster and name filtering.
In a Distributed Operations Manager architecture, each operations-manager
instance manages resources only for its configured CLUSTER_MANAGER cluster.
Args:
cluster_filter: If True, filter by CLUSTER_MANAGER setting (default: True)
deployment_name: If provided, filter to this single deployment
deployment_names: If provided, filter to exactly these deployments.
Takes precedence over ``deployment_name``. An empty list yields
zero deployments (not "all").
Returns:
List of deployment configurations matching the filters
Raises:
ValueError: If a deployment declares a namespace that does not
match the project name. See ``enforce_namespace_pin``.
"""
project_data = await self.get_contents()
# Tenant-isolation guard: pin every deployment namespace to the
# project name before anything reads it. Shared with the git-monitor
# path so a project committed directly to git cannot bypass the pin.
enforce_namespace_pin(project_data)
deployments = project_data.get("deployments", [])
# Filter by CLUSTER_MANAGER if requested
if cluster_filter:
deployments = [d for d in deployments if d.get("cluster") == settings.CLUSTER_MANAGER]
# Filter by deployment name(s) if requested. ``is not None`` (not truthiness)
# so an empty target list correctly resolves to zero deployments.
targets = _resolve_deployment_filter(deployment_name, deployment_names)
if targets is not None:
target_set = set(targets)
deployments = [d for d in deployments if d.get("name") in target_set]
return deployments
async def get_deployment_by_name(self, deployment_name: str) -> dict[str, Any] | None:
"""
Get a specific deployment by name (respects CLUSTER_MANAGER).
Args:
deployment_name: Name of the deployment to find
Returns:
Deployment configuration or None if not found
"""
deployments = await self.get_deployments(cluster_filter=True, deployment_name=deployment_name)
return deployments[0] if deployments else None
def get_deployment_results(self, deployment_name: str | None = None) -> dict[str, DeploymentResult]:
"""
Get deployment results collected during processing.
Results include URLs, cluster info, and status for each processed deployment.
Call this after process_project() or process_project_from_git() to get the results.
Args:
deployment_name: Optional specific deployment name to filter results
Returns:
Dictionary mapping deployment names to DeploymentResult objects
"""
if deployment_name:
if deployment_name in self._deployment_results:
return {deployment_name: self._deployment_results[deployment_name]}
return {}
return self._deployment_results
def get_processing_error(self) -> str | None:
"""
Get the last processing error message.
Returns the error message from the most recent process_project() failure,
or None if processing succeeded.
"""
return self._processing_error
def get_component_failures(self) -> list[dict[str, Any]] | None:
"""Get per-component failure details from the most recent deployment sync."""
return self._component_failures
async def _queue_refresh_task(self, task_service: Any, project_name: str, deployment_name: str) -> None:
"""Queue a refresh_deployment task via the task queue."""
if task_service:
await task_service.create_task(
task_type="refresh_deployment",
project_name=project_name,
deployment_name=deployment_name,
cluster=settings.CLUSTER_MANAGER,
payload={
"project_name": project_name,
"deployment_name": deployment_name,
"force_clone": False,
# Automated retry after a disable: must not re-enable moving-tag disables.
"automated_remediation": True,
},
)
else:
logger.warning(
"Task service not available, cannot queue refresh for %s/%s",
project_name,
deployment_name,
)
def get_processing_exception(self) -> Exception | None:
"""Get the original exception from the most recent processing failure."""
return getattr(self, "_processing_exception", None)
async def get_repositories(self) -> list[dict[str, Any]]:
"""
Get all repositories defined in project.
Returns:
List of repository configurations
"""
project_data = await self.get_contents()
return project_data.get("repositories", [])
async def get_components(self) -> list[dict[str, Any]]:
"""
Get all components defined in project.
Returns:
List of component configurations
"""
project_data = await self.get_contents()
return project_data.get("components", [])
async def get_git_connector_for_project_files(self) -> GitConnector:
if self.__git_connector_for_project_files is None:
self.__git_connector_for_project_files = await create_git_connector_for_project_files("")
await self.__git_connector_for_project_files.ensure_repo_cloned()
return self.__git_connector_for_project_files
async def set_git_connector_for_project_files(self, git_connector: GitConnector) -> None:
if self.__git_connector_for_project_files:
raise Exception("git_connector_for_projectfiles already set")
self.__git_connector_for_project_files = git_connector
# Injected connector is not owned by this instance
self.__owns_git_connector_for_project_files = False
async def close_git_connector_for_project_files(self) -> None:
if self.__git_connector_for_project_files and self.__owns_git_connector_for_project_files:
await self.__git_connector_for_project_files.close()
self.__git_connector_for_project_files = None
def _add_secret_to_create(self, deployment_name: str, secret_type: str, secret_data: BaseSecret) -> None:
"""
Add a secret to the private secrets map for later creation.
Args:
deployment_name: Name of the deployment
secret_type: Type of secret (e.g., "database", "keycloak", "vault")
secret_data: Secret instance (BaseSecret subclass) to store
"""
if deployment_name not in self._secrets_to_create:
self._secrets_to_create[deployment_name] = {}
self._secrets_to_create[deployment_name][secret_type] = secret_data
logger.debug(f"Added {secret_type} secret for deployment {deployment_name} to secrets map")
def report_clone_performed(
self,
deployment_name: str,
service_type: str,
generation: int | None = None,
) -> None:
"""
Report that a clone operation was performed by a service.
Services call this method after successfully completing a clone operation.
Project manager will use this information to update clone-from status
after all services complete.
Args:
deployment_name: Name of the deployment
service_type: Type of service (e.g., "postgresql-database", "minio-storage")
generation: Generation number if applicable (for versioned resources)
"""
from datetime import UTC, datetime
if deployment_name not in self._clones_performed:
self._clones_performed[deployment_name] = {}
self._clones_performed[deployment_name][service_type] = {
"generation": generation,
"timestamp": datetime.now(UTC).isoformat(),
}
logger.info(f"Clone reported: {deployment_name}/{service_type} (generation={generation})")
def has_clones_performed(self, deployment_name: str) -> bool:
"""Check if any clones were performed for a deployment."""
return deployment_name in self._clones_performed and len(self._clones_performed[deployment_name]) > 0
def clear_clones_performed(self) -> None:
"""Clear the clones tracking (called at start of processing)."""
self._clones_performed = {}
def _get_secret_from_map(
self, deployment_name: str, secret_type: str, secret_class: type[T] | None = None
) -> T | None:
"""
Get a secret from the private secrets map with type safety.
Args:
deployment_name: Name of the deployment
secret_type: Type of secret (e.g., "database", "keycloak", "vault")
secret_class: Expected secret class type for type safety (optional)
Returns:
Secret instance of the specified type if found, None otherwise
"""
secret = self._secrets_to_create.get(deployment_name, {}).get(secret_type)
if secret is None:
return None
# If secret_class is provided, verify the type for runtime safety
if secret_class is not None and not isinstance(secret, secret_class):
raise ValueError(
f"Secret type mismatch for {deployment_name}.{secret_type}: "
f"expected {secret_class.__name__}, got {type(secret).__name__}"
)
return cast("T", secret)
def _get_expected_secrets(
self, deployment_name: str, deployment: dict[str, Any], project_data: dict[str, Any]
) -> dict[str, str]:
"""
Determine which secrets should be referenced in deployment based on:
- Services used (database, keycloak, minio)
- User environment variables from components
This is used to build the envFrom list in deployment manifests, ensuring
all required secrets are referenced even if they're not in the _secrets_to_create map.
Args:
deployment_name: Name of the deployment
deployment: Deployment configuration
project_data: Full project configuration
Returns:
Dictionary mapping secret_type to secret_name
Example: {"database": "deployment1-database", "keycloak": "deployment1-keycloak"}
"""
expected_secrets = {}
# Check all components in this deployment to determine which services are used
components = deployment.get("components", [])
# Track which services are used across all components
uses_postgresql = False
uses_minio = False
uses_sso = False
for component in components:
component_reference = component.get("reference")
if not component_reference:
continue
# Check services used by this component
component = self._project_file_handler._find_component(project_data, component_reference)
all_services = extract_service_names_from_component(component) if component else []
# Check for each service type
# Check for both postgresql-database (shared) and namespace-postgresql-database (dedicated)
if (
ServiceType.POSTGRESQL_DATABASE.value in all_services
or ServiceType.NAMESPACE_POSTGRESQL_DATABASE.value in all_services
):
uses_postgresql = True
if ServiceType.MINIO_STORAGE.value in all_services:
uses_minio = True
if ServiceType.KEYCLOAK.value in all_services:
# TODO: fix this, using keycloak only is sso if the configuration is provided for it
uses_sso = True
# Build expected secrets based on services used
if uses_postgresql:
expected_secrets["database"] = DatabaseSecret.get_secret_name(deployment_name)
logger.debug(f"Deployment {deployment_name} expects database secret")
if uses_minio:
expected_secrets["minio"] = MinIOSecret.get_secret_name(deployment_name)
logger.debug(f"Deployment {deployment_name} expects MinIO secret")
if uses_sso:
expected_secrets["keycloak"] = KeycloakSecret.get_secret_name(deployment_name)
logger.debug(f"Deployment {deployment_name} expects Keycloak secret")
# Note: User secrets are per-component, not per-deployment
# They will be added during component processing
logger.info(f"Expected secrets for deployment {deployment_name}: {list(expected_secrets.keys())}")
return expected_secrets
def _get_service_category_name(self, service_type: ServiceType) -> str:
"""
Get a consistent category name for a service type.
This maps service types to their category names used in alias resolution.
Args:
service_type: The service type enum
Returns:
Category name string (e.g., "database", "minio", "keycloak", "web", "storage")
"""
# Map service types to their category names. Both the shared and
# namespace-dedicated variants of postgresql/redis collapse to the
# same category so an alias referencing e.g. $REDIS_URL routes to the
# same secret bucket regardless of which variant the project picked.
# Without this, the var_to_service lookup map ends up keyed by the
# namespace variant (later overwrites earlier in dict iteration), and
# aliases categorise as "namespace-redis" -- a bucket no project
# consumes, silently dropping the alias.
category_map = {
ServiceType.POSTGRESQL_DATABASE: "database",
ServiceType.NAMESPACE_POSTGRESQL_DATABASE: "database",
ServiceType.REDIS: "redis",
ServiceType.NAMESPACE_REDIS: "redis",
ServiceType.MINIO_STORAGE: "minio",
ServiceType.KEYCLOAK: "keycloak",
ServiceType.PUBLISH_ON_WEB: "web",
ServiceType.PERSISTENT_STORAGE: "storage",
ServiceType.TEMP_STORAGE: "storage",
ServiceType.PLATFORM: "platform",
}
return category_map.get(service_type, service_type.value)
def _categorize_alias(self, alias_name: str, alias_template: str) -> tuple[str, str]:
"""
Determine which service and source type an alias belongs to based on the variables it references.
Args:
alias_name: Name of the alias
alias_template: Template string with variable references
Returns:
Tuple of (service_category, source_type) where:
- service_category: 'database', 'minio', 'keycloak', 'web', 'storage'
- source_type: 'secret' or 'direct'
Raises:
ValueError: If alias references variables from multiple services or unknown variables
Logic:
- Dynamically checks all services defined in ServiceAdapter.SERVICE_DEFINITIONS
- Categorizes based on which service's variables are referenced
- Determines if variables come from secrets or are direct env vars
- Ensures all variables in an alias come from the same service
"""
# Extract all referenced variables
referenced_vars = extract_variable_references(alias_template)
if not referenced_vars:
raise ValueError(
f"Alias '{alias_name}' has no variable references. "
f"Aliases must reference at least one service variable."
)
# Build a mapping of variable_name -> (service_type, variable_definition)
# by checking all services dynamically
var_to_service: dict[str, tuple[ServiceType, VariableDefinition]] = {}
all_known_vars = set()
for service_type in ServiceAdapter.SERVICE_DEFINITIONS:
service_def = ServiceAdapter.get_service_definition(service_type)
for var_def in service_def.variables:
# Add primary variable name
var_to_service[var_def.name] = (service_type, var_def)
all_known_vars.add(var_def.name)
# Add all aliases for this variable
for alias in var_def.aliases:
var_to_service[alias] = (service_type, var_def)
all_known_vars.add(alias)
# Check for unknown variables
unknown_vars = [var for var in referenced_vars if var not in all_known_vars]
if unknown_vars:
known_vars_list = ", ".join(sorted(all_known_vars))
raise ValueError(
f"Alias '{alias_name}' references unknown variables: {', '.join(unknown_vars)}. "
f"Available variables: {known_vars_list}"
)
# Determine which service(s) and source type(s) are referenced
services_referenced: dict[str, ServiceType] = {}
source_types: set[str] = set()
for var in referenced_vars:
service_type, var_def = var_to_service[var]
service_category = self._get_service_category_name(service_type)
services_referenced[service_category] = service_type
source_types.add(var_def.source)
# Error if multiple services referenced
if len(services_referenced) > 1:
raise ValueError(
f"Alias '{alias_name}' references variables from multiple services: {', '.join(services_referenced.keys())}. "
f"Each alias must reference variables from only one service."
)
# Error if multiple source types referenced (mixing secret and direct variables)
if len(source_types) > 1:
raise ValueError(
f"Alias '{alias_name}' mixes variables from different sources: {', '.join(source_types)}. "
f"Each alias must use variables from only one source type (either 'secret' or 'direct')."
)
# Get the single service category and source type
service_category = next(iter(services_referenced.keys()))
source_type = next(iter(source_types))
logger.debug(f"Alias '{alias_name}' categorized as service='{service_category}', source='{source_type}'")
return service_category, source_type
async def _rollback_subdomain_if_needed(self) -> bool:
"""
Rollback subdomain registration if one is pending.
This is called when manifest creation fails after subdomain was registered.
Uses the rollback info stored in self._pending_subdomain_rollback.
Returns:
True if rollback was performed, False otherwise
"""
rollback_info = getattr(self, "_pending_subdomain_rollback", None)
if not rollback_info or not rollback_info.get("should_rollback"):
return False
connector = rollback_info.get("connector")
project_name = rollback_info.get("project_name")
deployment_name = rollback_info.get("deployment_name")
subdomain = rollback_info.get("subdomain")
base_domain = rollback_info.get("base_domain")
if not connector:
logger.warning("Cannot rollback subdomain: no connector available")
return False
try:
await connector.delete_by_deployment(project_name, deployment_name)
logger.info(
f"Rolled back subdomain '{subdomain}.{base_domain}' for {project_name}/{deployment_name} "
f"due to deployment failure"
)
self._pending_subdomain_rollback = None
return True
except Exception as e:
logger.error(f"Failed to rollback subdomain '{subdomain}.{base_domain}': {e}")
return False
async def _collect_deployment_aliases(self, deployment_name: str) -> dict[str, dict[str, dict[str, str]]]:
"""
Scan all components in a deployment and collect aliases, categorized by source type and service.
Args:
deployment_name: Name of the deployment
Returns:
Dictionary mapping source type -> service category -> aliases:
{
'direct': {
'web': {alias_name: template, ...},
'storage': {alias_name: template, ...}
},
'secret': {
'database': {alias_name: template, ...},
'minio': {alias_name: template, ...},
'keycloak': {alias_name: template, ...}
}
}
Raises:
ValueError: If any alias has invalid references (multiple services, unknown variables, etc.)
"""
logger.debug(f"Collecting aliases for deployment: {deployment_name}")
# Get project data
project_data = await self.get_contents()
# Find the deployment in project data
deployments = project_data.get("deployments", [])
deployment = next((d for d in deployments if d.get("name") == deployment_name), None)
if not deployment:
logger.warning(f"Deployment '{deployment_name}' not found in project data")
return {"direct": {}, "secret": {}}
# Initialize categorized aliases with two-level structure: source_type -> service_category -> aliases
categorized_aliases: dict[str, dict[str, dict[str, str]]] = {
"direct": {},
"secret": {},
}
# Alias values may hold secrets (e.g. a password) and are stored AGE-encrypted
# like user-env-vars. Decrypt lazily below, only when a value is encrypted, so
# projects without an AGE key and existing plaintext aliases keep working.
project_private_key: str | None = None
# Scan all components
components = deployment.get("components", [])
for component in components:
component_name = component["reference"]
component_definition = await self._get_by_json_path(f"$.components[?@.name=='{component_name}']")
if not component_definition:
logger.warning("Component '%s' referenced in deployment but not found in project", component_name)
continue