Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 4 additions & 35 deletions dagster_uc/manage_user_code_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@
import logging
import os
import pprint
import subprocess
import time
from dataclasses import asdict
from typing import Annotated, cast

import kr8s
import typer
from kr8s.objects import (
ConfigMap,
Pod,
)

from dagster_uc.config import UserCodeDeploymentsConfig, load_config
from dagster_uc.log import logger
from dagster_uc.uc_handler import DagsterUserCodeHandler
from dagster_uc.utils import BuildTool, build_and_push, gen_tag
from dagster_uc.utils import BuildTool, build_and_push, gen_tag, is_command_available

app = typer.Typer(invoke_without_command=True)
deployment_app = typer.Typer(
Expand Down Expand Up @@ -79,7 +77,6 @@ def default(
kr8s_api = kr8s.api(context=f"{config.kubernetes_context}", namespace=config.namespace)

handler = DagsterUserCodeHandler(config, kr8s_api)
handler._ensure_dagster_version_match()
handler.maybe_create_user_deployments_configmap()
logger.debug(f"Done: Switched kubernetes context to {config.environment}")

Expand Down Expand Up @@ -253,18 +250,7 @@ def deployment_delete(
) -> None:
if delete_all:
handler.remove_all_deployments()
handler.delete_k8s_resources(
label_selector="app.kubernetes.io/name=dagster-user-deployments",
)
handler.delete_k8s_resources(label_selector="app=dagster-user-deployments")
for item in handler.api.get(
ConfigMap,
namespace=config.namespace,
label_selector="app=dagster-user-deployments",
):
item.delete() # type: ignore
handler.delete_k8s_resources(label_selector="dagster/code-location")
handler.deploy_to_k8s()
handler.deploy_to_k8s(reload_dagster=True)
typer.echo("\033[1mDeleted all deployments\033[0m")
else:
if not name:
Expand All @@ -276,10 +262,6 @@ def deployment_delete(
# In case the UI name separator of the deployment is passed
name = name.replace(":", "--")
handler.remove_user_deployment_from_configmap(name)
handler.delete_k8s_resources_for_user_deployment(
name,
delete_deployments=True,
)
handler.deploy_to_k8s(reload_dagster=True)
typer.echo(f"Deleted deployment \033[1m{name}\033[0m")

Expand Down Expand Up @@ -373,18 +355,7 @@ def deployment_deploy(
),
] = False,
):
def is_command_available(command: str) -> bool:
try:
subprocess.run(
[command, "--version"],
capture_output=True,
check=True, # ruff: ignore
)
return True
except subprocess.CalledProcessError:
return False
except FileNotFoundError:
return False
handler._ensure_dagster_version_match()

count = 0
while not handler.acquire_semaphore(reset_lock):
Expand Down Expand Up @@ -451,6 +422,7 @@ def is_command_available(command: str) -> bool:
logger.info(
f"Deployment with name '{deployment_name}' exists in '{config.environment}'. Updating deployment in configmap",
)
# TODO(ion): make this into a single operation (replace)
handler.remove_user_deployment_from_configmap(deployment_name)
handler.add_user_deployment_to_configmap(
handler.gen_new_deployment_yaml(
Expand All @@ -460,19 +432,16 @@ def is_command_available(command: str) -> bool:
),
)
if config.cicd or force:
handler.delete_k8s_resources_for_user_deployment(deployment_name)
handler.deploy_to_k8s()
elif not handler.check_if_code_pod_exists(label=deployment_name):
logger.info(
"Code deployment present in configmap but pod not found, triggering full deploy...",
)
handler.delete_k8s_resources_for_user_deployment(deployment_name, True)
handler.deploy_to_k8s() # Something went wrong - redeploy yamls and reload webserver
else:
logger.info(
"Code deployment present in configmap and pod found...",
)
handler.delete_k8s_resources_for_user_deployment(deployment_name, False)
handler.deploy_to_k8s(reload_dagster=False)
finally:
handler.release_semaphore()
Expand Down
114 changes: 35 additions & 79 deletions dagster_uc/uc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import NamedTuple

import kr8s
import pyhelm3
import yaml
from kr8s.objects import (
ConfigMap,
Expand Down Expand Up @@ -39,20 +40,21 @@ def __init__(self, config: UserCodeDeploymentsConfig, kr8s_api: kr8s.Api) -> Non

def maybe_create_user_deployments_configmap(self) -> None:
"""Creates a user deployments_configmap if it doesn't exist yet."""
from copy import deepcopy

dagster_user_deployments_values_yaml_configmap = deepcopy(BASE_CONFIGMAP)
dagster_user_deployments_values_yaml_configmap["metadata"]["name"] = (
self.config.user_code_deployments_configmap_name
)
dagster_user_deployments_values_yaml_configmap["data"]["yaml"] = yaml.dump(
BASE_CONFIGMAP_DATA,
)
try:
self._read_namespaced_config_map(
self.config.user_code_deployments_configmap_name,
)
except kr8s.NotFoundError:
from copy import deepcopy

dagster_user_deployments_values_yaml_configmap = deepcopy(BASE_CONFIGMAP)
dagster_user_deployments_values_yaml_configmap["metadata"]["name"] = (
self.config.user_code_deployments_configmap_name
)
dagster_user_deployments_values_yaml_configmap["data"]["yaml"] = yaml.dump(
BASE_CONFIGMAP_DATA,
)

ConfigMap(
resource=dagster_user_deployments_values_yaml_configmap,
namespace=self.config.namespace,
Expand Down Expand Up @@ -180,36 +182,43 @@ def deploy_to_k8s(
self.update_dagster_workspace_yaml()

loop = asyncio.new_event_loop()
helm_client = Client()
RELEASE_NAME = "dagster-user-code" # noqa
helm_client = Client(kubecontext=self.config.kubernetes_context)

chart = loop.run_until_complete(
helm_client.get_chart(
chart_ref="dagster-user-deployments",
repo="https://dagster-io.github.io/helm",
version=self.config.dagster_version,
),
)
helm_templates = [
*loop.run_until_complete(
helm_client.template_resources(
chart,
"dagster",
values_dict,
namespace=self.config.namespace,
),
logger.info(
"Upgrading helm release '%s'...",
RELEASE_NAME,
)
installed = loop.run_until_complete(
helm_client.install_or_upgrade_release(
RELEASE_NAME,
chart,
values_dict,
namespace=self.config.namespace,
wait=True,
),
]
)
if installed.status == pyhelm3.ReleaseRevisionStatus.FAILED:
logger.error(
"Dagster-usercode helm release install or upgrade failed, rolling back now..",
)
from pyhelm3 import Release

# Update user code deployments in k8s (akin to kubectl apply -f)
for obj in helm_templates:
k8s_obj = eval(obj["kind"])(obj, api=self.api)
try:
k8s_obj.patch(obj)
except kr8s.NotFoundError:
k8s_obj.create()
release = Release(name=RELEASE_NAME, namespace=self.config.namespace)
loop.run_until_complete(release.rollback())
raise Exception("Helm user-code deployment failed, had to rollback.")

if reload_dagster:
for deployment_name in ["dagster-daemon", "dagster-dagster-webserver"]:
deployment = Deployment.get(deployment_name, namespace=self.config.namespace)

reload_patch = {
"spec": {
"template": {
Expand All @@ -225,38 +234,6 @@ def deploy_to_k8s(
}
deployment.patch(reload_patch)

def delete_k8s_resources_for_user_deployment(
self,
label: str,
delete_deployments: bool = True,
) -> None:
"""Deletes all k8s resources related to a specific user code deployment.
Returns a boolean letting you know if pod was found
"""
for pod in self.api.get(
Pod,
label_selector=f"dagster/code-location={label}",
field_selector="status.phase=Succeeded",
namespace=self.config.namespace,
):
logger.info(f"Deleting pod {pod.name}")
pod.delete()

if delete_deployments:
import contextlib

with contextlib.suppress(kr8s.NotFoundError):
Deployment.get(
namespace=self.config.namespace,
label_selector=f"deployment={label}",
api=self.api,
).delete()
Deployment.get(
namespace=self.config.namespace,
label_selector=f"dagster/code-location={label}",
api=self.api,
).delete()

def gen_new_deployment_yaml(
self,
name: str,
Expand Down Expand Up @@ -480,27 +457,6 @@ def check_if_code_pod_exists(self, label: str) -> bool:
)
return len(running_pods) > 0

def delete_k8s_resources(self, label_selector: str):
"""Delete all k8s resources with a specified label_selector"""
for resource in [
"Pod",
"ReplicationController",
"Service",
"DaemonSet",
"Deployment",
"ReplicaSet",
"StatefulSet",
"HorizontalPodAutoscaler",
"CronJob",
"Job",
]:
for item in self.api.get(
resource,
namespace=self.config.namespace,
label_selector=label_selector,
):
item.delete()

def acquire_semaphore(self, reset_lock: bool = False) -> bool:
"""Acquires a semaphore by creating a configmap"""
if reset_lock:
Expand Down
15 changes: 15 additions & 0 deletions dagster_uc/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,18 @@ def build_and_push(
cmd = ["sudo"] + cmd
exception_on_failed_subprocess(subprocess.run(cmd, capture_output=False))
os.chdir(previous_dir)


def is_command_available(command: str) -> bool:
"""Checks if command is available."""
try:
subprocess.run(
[command, "--version"],
capture_output=True,
check=True, # ruff: ignore
)
return True
except subprocess.CalledProcessError:
return False
except FileNotFoundError:
return False
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dagster-uc"
version = "0.3.5"
version = "0.4.0"
authors = [
{name = "Stefan Verbruggen"},
{name = "Ion Koutsouris"},
Expand Down