diff --git a/src/dagster_ray/_base/cluster_sharing_lock.py b/src/dagster_ray/_base/cluster_sharing_lock.py index 552f868..dbaf814 100644 --- a/src/dagster_ray/_base/cluster_sharing_lock.py +++ b/src/dagster_ray/_base/cluster_sharing_lock.py @@ -18,6 +18,10 @@ class ClusterSharingLock(BaseModel): def identifier(self) -> str: return f"{self.run_id}-{self.step_key}" + @property + def tag(self) -> str: + return f"dagster/lock-{self.identifier}"[:63] # 63 is the limit for k8s + @property def expired_at(self) -> datetime: return self.created_at + timedelta(seconds=self.ttl_seconds) diff --git a/src/dagster_ray/kuberay/resources/raycluster.py b/src/dagster_ray/kuberay/resources/raycluster.py index 096573b..91e3555 100644 --- a/src/dagster_ray/kuberay/resources/raycluster.py +++ b/src/dagster_ray/kuberay/resources/raycluster.py @@ -266,7 +266,7 @@ def get_sharing_lock_annotations(self, context: AnyDagsterContext) -> dict[str, ttl_seconds=self.cluster_sharing.ttl_seconds, created_at=datetime.now(), ) - annotations[f"dagster/lock-{lock.identifier}"] = lock.model_dump_json() + annotations[lock.tag] = lock.model_dump_json() return annotations def get_cluster_sharing_alive_locks(self, context: AnyDagsterContext) -> Sequence[ClusterSharingLock]: diff --git a/src/dagster_ray/kuberay/utils.py b/src/dagster_ray/kuberay/utils.py index ad96af5..d9f6966 100644 --- a/src/dagster_ray/kuberay/utils.py +++ b/src/dagster_ray/kuberay/utils.py @@ -21,7 +21,7 @@ def normalize_k8s_label_values(labels: dict[str, str]) -> dict[str, str]: - Max 63 characters - Empty string is valid - Additionally, key starting with `dagster/` are replaced with `dagster.io/` + Additionally, key starting with `dagster.io/` are replaced with `dagster/` """ normalized = {}