Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

from zenml.image_builders import BaseImageBuilderConfig, BaseImageBuilderFlavor
from zenml.integrations.kaniko import KANIKO_IMAGE_BUILDER_FLAVOR

from zenml.models import ServiceConnectorRequirements
from zenml.constants import KUBERNETES_CLUSTER_RESOURCE_TYPE
if TYPE_CHECKING:
from zenml.integrations.kaniko.image_builders import KanikoImageBuilder

Expand Down Expand Up @@ -156,3 +157,11 @@ def implementation_class(self) -> Type["KanikoImageBuilder"]:
from zenml.integrations.kaniko.image_builders import KanikoImageBuilder

return KanikoImageBuilder
@property
def service_connector_requirements(self) -> Optional[ServiceConnectorRequirements]:
"""Service connector requirements.

Returns:
The service connector requirements.
"""
return ServiceConnectorRequirements(resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE)
106 changes: 91 additions & 15 deletions src/zenml/integrations/kaniko/image_builders/kaniko_image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
from zenml.logger import get_logger
from zenml.stack import StackValidator
from zenml.utils.archivable import ArchiveType
from zenml.service_connectors.service_connector import ServiceConnector
import kubernetes.client
import kubernetes
from kubernetes.client.rest import ApiException

if TYPE_CHECKING:
from zenml.container_registries import BaseContainerRegistry
Expand Down Expand Up @@ -202,9 +206,7 @@ def _generate_spec_overrides(

optional_spec_args: Dict[str, Any] = {}
if self.config.service_account_name:
optional_spec_args["serviceAccountName"] = (
self.config.service_account_name
)
optional_spec_args["serviceAccountName"] = self.config.service_account_name

return {
"apiVersion": "v1",
Expand All @@ -231,6 +233,88 @@ def _run_kaniko_build(
pod_name: str,
spec_overrides: Dict[str, Any],
build_context: "BuildContext",
) -> None:
"""Runs the kaniko build in kubernetes with provided connectors.
Args:
pod_name: Name of the Pod that should be created to run the build.
spec_overrides: Pod spec override values.
build_context: The build context.
Raises:
RuntimeError: If the process running the Kaniko build failed.
"""
connector = self.get_connector()
if connector:
logger.info("Using kubernetes connector to run the kaniko build.")
api_client = connector.connect()

if not isinstance(api_client, kubernetes.client.api_client.ApiClient):
raise RuntimeError(f"Expected ApiClient, got {type(api_client)}")
core_api = kubernetes.client.CoreV1Api(api_client)

# Second step define the kaniko pod spec
container = kubernetes.client.V1Container(
name=pod_name,
image=self.config.excutor_image,
args=[
f"--destination={self.config.target_image}",
"--container=tar://stdin",
"--dockerfile=DockerFile",
"--verbosity=info",
],
vloume_mounts=self.config.volume_mounts,
env=self.config.env,
)
pod_spec = kubernetes.client.V1PodSpec(
containers=[container],
restart_policy="Never",
service_account_name=self.config.service_account_name
)
pod = kubernetes.client.V1Pod(metadata=kubernetes.client.V1ObjectMeta(name = pod_name),
spec = pod_spec)

# now lets create the pod
try:
core_api.create_namespace(
namespace=self.config.kubernetes_namespace,
body=pod
)
logger.info(f"Kaniko pod {pod_name} created")
except ApiException as e:
raise RuntimeError(f"Failed to create the pod {pod_name}: {e}")
# Stream the context if needed
if not self.config.store_context_in_artifact_store:
logger.info("streaming build context into kaniko pod.")
kubernetes.k8s_utils.stream_file_to_pod(
core_api,
namespcae=self.config.kubernetes_namespace,
pod_name=pod_name,
container_name="kaniko",
source_path=build_context,
destination_parh="/workspace"
)
# wait for the pod completion
kubernetes.k8s_utils.wait_pod(
core_api,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda = kubernetes.k8s_utils.pod_is_done,
timeout_sec = self.config.pod_running_timeout
)

#cleanup the pod
logger.info(f"Deleting the pod {pod_name}")
core_api.delete_namespaced_pod(name=pod_name,
namespace=self.config.kubernetes_namespace
)
else:
logger.info("Connector not found continuing build with kubectl")
self._run_kaniko_build_kubectl(pod_name, spec_overrides, build_context)

def _run_kaniko_build_kubectl(
self,
pod_name: str,
spec_overrides: Dict[str, Any],
build_context: "BuildContext",
) -> None:
"""Runs the Kaniko build in Kubernetes.

Expand Down Expand Up @@ -267,9 +351,7 @@ def _run_kaniko_build(
stdin=subprocess.PIPE,
) as p:
if not self.config.store_context_in_artifact_store:
self._write_build_context(
process=p, build_context=build_context
)
self._write_build_context(process=p, build_context=build_context)

try:
return_code = p.wait()
Expand All @@ -284,9 +366,7 @@ def _run_kaniko_build(
)

@staticmethod
def _write_build_context(
process: BytePopen, build_context: "BuildContext"
) -> None:
def _write_build_context(process: BytePopen, build_context: "BuildContext") -> None:
"""Writes the build context to the process stdin.

Args:
Expand Down Expand Up @@ -376,14 +456,10 @@ def _check_prerequisites() -> None:
RuntimeError: If any of the prerequisites are not installed.
"""
if not shutil.which("kubectl"):
raise RuntimeError(
"`kubectl` is required to run the Kaniko image builder."
)
raise RuntimeError("`kubectl` is required to run the Kaniko image builder.")

@staticmethod
def _verify_image_name(
image_name_with_tag: str, image_name_with_sha: str
) -> None:
def _verify_image_name(image_name_with_tag: str, image_name_with_sha: str) -> None:
"""Verifies the name/sha of the pushed image.

Args:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import base64
import datetime
import json
import os
import re
from typing import Any, Dict, List, Optional, Tuple, cast
from pydantic import Field

from zenml.constants import (
DOCKER_REGISTRY_RESOURCE_TYPE,
KUBERNETES_CLUSTER_RESOURCE_TYPE,
)

from zenml.logger import get_logger
from zenml.exceptions import AuthorizationException

from zenml.models import (
AuthenticationMethodModel,
ResourceTypeModel,
ServiceConnectorTypeModel,
)
from zenml.service_connectors.docker_service_connector import (
DockerAuthenticationMethods,
DockerConfiguration,
DockerServiceConnector,
)
from zenml.service_connectors.service_connector import (
AuthenticationConfig,
ServiceConnector,
)

class KanikoConnectorConfig(AuthenticationConfig):
"""Kubernetes connection configuration for Kaniko."""

api_token: str = Field(
description="Kubernetes API token for authentication.",
title="API token",
secret=True,
default=None
)

service_account_name: str = Field(
description="Kubernetes service account name for authentication.",
title="Service Account Name",
default=None,
)
kubeconfig: Optional[str] = Field(
description="Content of the kubecofig file,",
title="Kubeconfig",
secret=True,
default=None,
)

class KubernetesKanikoServiceConnector(ServiceConnector):
"""Kubernetes Service Connector for Kaniko."""

config: KanikoConnectorConfig

@classmethod
def _get_connector_type(cls):
return ServiceConnectorTypeModel(
name="kubernetes-kaniko",
type="kubernetes",
description="Kubernetes Service Connector for Kaniko.",
)
Loading