diff --git a/kratix_sdk/__init__.py b/kratix_sdk/__init__.py index 935df7c..db326cc 100644 --- a/kratix_sdk/__init__.py +++ b/kratix_sdk/__init__.py @@ -8,6 +8,7 @@ set_input_dir, set_metadata_dir, set_output_dir, + timedelta_to_go_duration, ) from .promise import Promise from .resource import Resource @@ -34,5 +35,6 @@ "get_output_dir", "get_metadata_dir", "set_metadata_dir", + "timedelta_to_go_duration", "__version__", ] diff --git a/kratix_sdk/kratix_sdk.py b/kratix_sdk/kratix_sdk.py index ccd3eec..ee14bc1 100644 --- a/kratix_sdk/kratix_sdk.py +++ b/kratix_sdk/kratix_sdk.py @@ -1,4 +1,5 @@ import os +from datetime import timedelta from pathlib import Path import yaml @@ -42,6 +43,32 @@ def set_metadata_dir(path: Path | str) -> None: METADATA_DIR = Path(path) +def timedelta_to_go_duration(td: timedelta) -> str: + """Converts a Python timedelta to a Go duration string (e.g. "1h30m5s300ms").""" + total_microseconds = (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds + if total_microseconds <= 0: + raise ValueError("duration must be positive") + + hours, remainder = divmod(total_microseconds, 3_600_000_000) + minutes, remainder = divmod(remainder, 60_000_000) + seconds, remainder = divmod(remainder, 1_000_000) + milliseconds, microseconds = divmod(remainder, 1_000) + + parts = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + if seconds: + parts.append(f"{seconds}s") + if milliseconds: + parts.append(f"{milliseconds}ms") + if microseconds: + parts.append(f"{microseconds}us") + + return "".join(parts) + + class KratixSDK: def read_resource_input(self) -> Resource: """Reads the file in /kratix/input/object.yaml and returns a Resource. @@ -174,3 +201,33 @@ def is_configure_action(self) -> bool: def is_delete_action(self) -> bool: """Returns true if the workflow is a delete action.""" return self.workflow_action() == "delete" + + def write_suspend(self, message: str = "") -> None: + """Writes workflow-control.yaml with suspend: true. + + Kratix will stop further pipeline execution and set the workflow phase to + Suspended. + + If a message is provided, it will be surfaced in the object's status.""" + data: dict = {"suspend": True} + if message: + data["message"] = message + self._write_workflow_control(data) + + def write_retry_after(self, duration: timedelta, message: str = "") -> None: + """Writes workflow-control.yaml with retryAfter set to the given duration. + + Kratix will requeue the pipeline after the specified duration and increment + the attempt counter in the object's status. + + If a message is provided, it will be surfaced in the object's status.""" + data: dict = {"retryAfter": timedelta_to_go_duration(duration)} + if message: + data["message"] = message + self._write_workflow_control(data) + + def _write_workflow_control(self, data: dict) -> None: + path = METADATA_DIR / "workflow-control.yaml" + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w") as f: + yaml.safe_dump(data, f) diff --git a/tests/test_kratix_sdk.py b/tests/test_kratix_sdk.py index e78795c..ac73b51 100644 --- a/tests/test_kratix_sdk.py +++ b/tests/test_kratix_sdk.py @@ -1,3 +1,4 @@ +from datetime import timedelta from pathlib import Path import pytest @@ -235,6 +236,118 @@ def test_is_delete_action(monkeypatch): assert sdk.is_delete_action() is False +# ---------- timedelta_to_go_duration Tests ---------- + + +@pytest.mark.parametrize( + "td,expected", + [ + (timedelta(hours=1, minutes=30, seconds=5, milliseconds=300), "1h30m5s300ms"), + (timedelta(hours=1), "1h"), + (timedelta(minutes=30), "30m"), + (timedelta(seconds=5), "5s"), + (timedelta(milliseconds=300), "300ms"), + (timedelta(microseconds=500), "500us"), + (timedelta(days=1), "24h"), + (timedelta(hours=2, microseconds=1), "2h1us"), + (timedelta(milliseconds=1, microseconds=500), "1ms500us"), + ( + timedelta( + hours=1, minutes=30, seconds=5, milliseconds=300, microseconds=123 + ), + "1h30m5s300ms123us", + ), + ], +) +def test_timedelta_to_go_duration(td, expected): + assert ks.timedelta_to_go_duration(td) == expected + + +def test_timedelta_to_go_duration_raises_on_zero(): + with pytest.raises(ValueError): + ks.timedelta_to_go_duration(timedelta(seconds=0)) + + +def test_timedelta_to_go_duration_raises_on_negative(): + with pytest.raises(ValueError): + ks.timedelta_to_go_duration(timedelta(seconds=-1)) + + +# ---------- Suspend / Retry Tests ---------- + + +def test_write_suspend_writes_workflow_control(): + sdk = ks.KratixSDK() + + sdk.write_suspend() + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"suspend": True} + + +def test_write_suspend_with_message(): + sdk = ks.KratixSDK() + + sdk.write_suspend(message="waiting for dependency") + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"suspend": True, "message": "waiting for dependency"} + + +def test_write_retry_after_minutes(): + sdk = ks.KratixSDK() + + sdk.write_retry_after(timedelta(minutes=5, seconds=125)) + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"retryAfter": "7m5s"} + + +def test_write_retry_after_days(): + sdk = ks.KratixSDK() + + sdk.write_retry_after(timedelta(days=1, hours=3, minutes=66)) + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"retryAfter": "28h6m"} + + +def test_write_retry_after_hours(): + sdk = ks.KratixSDK() + + sdk.write_retry_after(timedelta(hours=3, seconds=65)) + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"retryAfter": "3h1m5s"} + + +def test_write_retry_after_with_message(): + sdk = ks.KratixSDK() + + sdk.write_retry_after(timedelta(hours=1, minutes=30), message="configmap not found") + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"retryAfter": "1h30m", "message": "configmap not found"} + + +def test_write_retry_after_zero_duration_raises(): + sdk = ks.KratixSDK() + with pytest.raises(ValueError): + sdk.write_retry_after(timedelta(0)) + + # ---------- Write to Output Tests ----------