Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,5 @@ The 'dockerhub' Docker Service Connector connector was used to successfully conf

The Docker Service Connector can be used by all Container Registry stack component flavors to authenticate to a remote Docker/OCI container registry. This allows container images to be built and published to private container registries without the need to configure explicit Docker credentials in the target environment or the Stack Component.

{% hint style="warning" %}
ZenML does not yet support automatically configuring Docker credentials in container runtimes such as Kubernetes clusters (i.e. via imagePullSecrets) to allow container images to be pulled from the private container registries. This will be added in a future release.
{% endhint %}

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
33 changes: 33 additions & 0 deletions src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,39 @@ def create_or_update_secret(
update_secret(core_api, namespace, secret_name, data)


def create_or_update_secret_from_manifest(
core_api: k8s_client.CoreV1Api,
secret_manifest: Dict[str, Any],
) -> None:
"""Create or update a Kubernetes secret from a complete manifest.

Args:
core_api: Client of Core V1 API of Kubernetes API.
secret_manifest: Complete Kubernetes secret manifest dict.

Raises:
ApiException: If the secret creation failed for any reason other than
the secret already existing.
"""
namespace = secret_manifest["metadata"]["namespace"]
secret_name = secret_manifest["metadata"]["name"]

try:
core_api.create_namespaced_secret(
namespace=namespace,
body=secret_manifest,
)
except ApiException as e:
if e.status == 409: # Already exists, update it
core_api.patch_namespaced_secret(
name=secret_name,
namespace=namespace,
body=secret_manifest,
)
else:
raise


def delete_secret(
core_api: k8s_client.CoreV1Api,
namespace: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def submit_pipeline(
"schedule. Use `Schedule(cron_expression=...)` instead."
)
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_job_manifest, secret_manifests = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
Expand All @@ -533,8 +533,16 @@ def submit_pipeline(
successful_jobs_history_limit=settings.successful_jobs_history_limit,
failed_jobs_history_limit=settings.failed_jobs_history_limit,
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
namespace=self.config.kubernetes_namespace,
)

# Create imagePullSecrets first
for secret_manifest in secret_manifests:
kube_utils.create_or_update_secret_from_manifest(
core_api=self._k8s_core_api,
secret_manifest=secret_manifest,
)

self._k8s_batch_api.create_namespaced_cron_job(
body=cron_job_manifest,
namespace=self.config.kubernetes_namespace,
Expand All @@ -546,7 +554,7 @@ def submit_pipeline(
return None
else:
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
pod_manifest, secret_manifests = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
Expand All @@ -558,8 +566,16 @@ def submit_pipeline(
service_account_name=service_account_name,
env=environment,
mount_local_stores=self.config.is_local,
namespace=self.config.kubernetes_namespace,
)

# Create imagePullSecrets first
for secret_manifest in secret_manifests:
kube_utils.create_or_update_secret_from_manifest(
core_api=self._k8s_core_api,
secret_manifest=secret_manifest,
)

kube_utils.create_and_wait_for_pod_to_start(
core_api=self._k8s_core_api,
pod_display_name="Kubernetes orchestrator pod",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ def run_step_on_kubernetes(step_name: str) -> None:
}
)

# Define Kubernetes pod manifest.
pod_manifest = build_pod_manifest(
# Define Kubernetes pod manifest and any required secrets.
pod_manifest, secret_manifests = build_pod_manifest(
pod_name=pod_name,
run_name=args.run_name,
pipeline_name=deployment.pipeline_configuration.name,
Expand All @@ -207,8 +207,39 @@ def run_step_on_kubernetes(step_name: str) -> None:
or settings.service_account_name,
mount_local_stores=mount_local_stores,
owner_references=owner_references,
namespace=args.kubernetes_namespace,
)

# Step pods should reuse secrets created by the orchestrator pod
# Only create secrets if they don't already exist
for secret_manifest in secret_manifests:
secret_name = secret_manifest["metadata"]["name"]
try:
# Check if secret already exists
core_api.read_namespaced_secret(
name=secret_name, namespace=args.kubernetes_namespace
)
logger.debug(
f"imagePullSecret {secret_name} already exists, reusing it"
)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# Secret doesn't exist, create it
try:
kube_utils.create_or_update_secret_from_manifest(
core_api=core_api,
secret_manifest=secret_manifest,
)
logger.debug(f"Created imagePullSecret {secret_name}")
except Exception as create_e:
logger.warning(
f"Failed to create imagePullSecret {secret_name}: {create_e}"
)
else:
logger.warning(
f"Failed to check for existing secret {secret_name}: {e}"
)

kube_utils.create_and_wait_for_pod_to_start(
core_api=core_api,
pod_display_name=f"pod for step `{step_name}`",
Expand Down
Loading