-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkratix_sdk.py
More file actions
209 lines (167 loc) · 7.31 KB
/
kratix_sdk.py
File metadata and controls
209 lines (167 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
from pathlib import Path
import yaml
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from .promise import Promise
from .resource import Resource
from .status import Status
from .types import DestinationSelector
INPUT_DIR = Path("/kratix/input")
OUTPUT_DIR = Path("/kratix/output")
METADATA_DIR = Path("/kratix/metadata")
def get_input_dir() -> Path:
return INPUT_DIR
def get_output_dir() -> Path:
return OUTPUT_DIR
def get_metadata_dir() -> Path:
return METADATA_DIR
def set_input_dir(path: Path | str) -> None:
global INPUT_DIR
INPUT_DIR = Path(path)
def set_output_dir(path: Path | str) -> None:
global OUTPUT_DIR
OUTPUT_DIR = Path(path)
def set_metadata_dir(path: Path | str) -> None:
global METADATA_DIR
METADATA_DIR = Path(path)
class KratixSDK:
def read_resource_input(self) -> Resource:
"""Reads the file in /kratix/input/object.yaml and returns a Resource.
Can be used in Resource configure workflow."""
path = INPUT_DIR / "object.yaml"
with path.open() as f:
data = yaml.safe_load(f) or {}
return Resource(data)
def read_promise_input(self) -> Promise:
"""Reads the file in /kratix/input/object.yaml and returns a Promise.
Can be used in Promise configure workflow."""
path = INPUT_DIR / "object.yaml"
with path.open() as f:
data = yaml.safe_load(f) or {}
return Promise(data)
def read_status(self) -> Status:
"""Reads the file in /kratix/metadata/status.yaml and returns a Status."""
path = METADATA_DIR / "status.yaml"
with path.open() as f:
data = yaml.safe_load(f) or {}
return Status(data)
def read_destination_selectors(self) -> list[DestinationSelector]:
"""Reads the file in /kratix/metadata/destination-selectors.yaml and
returns a list of DestinationSelector"""
path = METADATA_DIR / "destination-selectors.yaml"
with path.open() as f:
raw = yaml.safe_load(f) or []
selectors = [
DestinationSelector(
directory=item.get("directory", ""),
match_labels=item.get("matchLabels", {}) or {},
)
for item in raw
]
return selectors
def write_output(self, relative_path: str, content: bytes) -> None:
"""writes the content to the specifies file at the path
/kratix/output/relative_path."""
dest = OUTPUT_DIR / relative_path
dest.parent.mkdir(parents=True, exist_ok=True)
with dest.open("wb") as f:
f.write(content)
def write_status(self, status: Status) -> None:
"""writes the specified status to the /kratix/metadata/status.yaml."""
path = METADATA_DIR / "status.yaml"
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml.safe_dump(status.to_dict(), f)
def write_destination_selectors(self, selectors: list[DestinationSelector]) -> None:
"""writes the specified Destination Selectors to the
/kratix/metadata/destination_selectors.yaml."""
path = METADATA_DIR / "destination-selectors.yaml"
data = []
for s in selectors:
data.append(
{
"directory": s.directory or "", # directory is optional
"matchLabels": s.match_labels or {},
}
)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml.safe_dump(data, f)
def workflow_action(self) -> str:
"""Returns the value of KRATIX_WORKFLOW_ACTION environment variable."""
return os.getenv("KRATIX_WORKFLOW_ACTION", "")
def workflow_type(self) -> str:
"""Returns the value of KRATIX_WORKFLOW_TYPE environment variable."""
return os.getenv("KRATIX_WORKFLOW_TYPE", "")
def promise_name(self) -> str:
"""Returns the value of KRATIX_PROMISE_NAME environment variable."""
return os.getenv("KRATIX_PROMISE_NAME", "")
def pipeline_name(self) -> str:
"""Returns the value of KRATIX_PIPELINE_NAME environment variable."""
return os.getenv("KRATIX_PIPELINE_NAME", "")
def publish_status(self, resource: Resource, status: Status) -> None:
"""Updates the status of a Resource.
This function uses the Kubernetes API to patch the status of a Custom Resource.
Update is instant and will not change the /kratix/metadata/status.yaml file."""
try:
k8s_config.load_incluster_config()
except Exception:
k8s_config.load_kube_config()
gvk = resource.get_group_version_kind()
plural = os.getenv("KRATIX_CRD_PLURAL")
if not plural:
raise RuntimeError("KRATIX_CRD_PLURAL environment variable is not set")
namespace = resource.get_namespace()
name = resource.get_name()
body = {"status": status.to_dict()}
api = k8s_client.CustomObjectsApi()
api.api_client.set_default_header(
"Content-Type", "application/merge-patch+json"
)
api.patch_namespaced_custom_object_status(
group=gvk.group,
version=gvk.version,
namespace=namespace,
plural=plural,
name=name,
body=body,
)
def is_promise_workflow(self) -> bool:
"""Returns true if the workflow is a promise workflow."""
return self.workflow_type() == "promise"
def is_resource_workflow(self) -> bool:
"""Returns true if the workflow is a resource workflow."""
return self.workflow_type() == "resource"
def is_configure_action(self) -> bool:
"""Returns true if the workflow is a configure action."""
return self.workflow_action() == "configure"
def is_delete_action(self) -> bool:
"""Returns true if the workflow is a delete action."""
return self.workflow_action() == "delete"
def suspend(self, message: str = "") -> None:
"""Suspends the pipeline by writing workflow-control.yaml with suspend: true.
Kratix will stop further pipeline execution and set the workflow phase to
Suspended.
If a message is provided, it will be surfaced in the object's status."""
data: dict = {"suspend": True}
if message:
data["message"] = message
self._write_workflow_control(data)
def retry_after(self, duration: str, message: str = "") -> None:
"""Configures the pipeline to be retried after a given duration.
The duration must be a valid Go duration string (e.g. "5m", "1h30m", "300ms").
Kratix will requeue the pipeline after the specified duration and increment
the attempt counter in the object's status.
If a message is provided, it will be surfaced in the object's status."""
if not duration:
raise ValueError("duration must be a non-empty string")
data: dict = {"retryAfter": duration}
if message:
data["message"] = message
self._write_workflow_control(data)
def _write_workflow_control(self, data: dict) -> None:
path = METADATA_DIR / "workflow-control.yaml"
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml.safe_dump(data, f)