diff --git a/.github/workflows/run_train_deploy_pipeline.yml b/.github/workflows/run_train_deploy_pipeline.yml new file mode 100644 index 00000000..85577ff3 --- /dev/null +++ b/.github/workflows/run_train_deploy_pipeline.yml @@ -0,0 +1,55 @@ +name: Staging Trigger Train and Deploy Pipeline +on: + pull_request: + types: [opened, synchronize] + branches: [staging, main] + paths: + - 'train_and_deploy/**' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + run-staging-workflow: + runs-on: ubuntu-latest + env: + ZENML_STORE_URL: ${{ secrets.ZENML_BENTO_PROJECTS_HOST }} + ZENML_STORE_API_KEY: ${{ secrets.ZENML_BENTO_PROJECTS_API_KEY }} + ZENML_STAGING_STACK : 281f82f3-6bdb-4951-bbdd-b85b57b463cc # Set this to your staging stack ID + ZENML_GITHUB_SHA: ${{ github.event.pull_request.head.sha }} + ZENML_GITHUB_URL_PR: ${{ github.event.pull_request._links.html.href }} + ZENML_DEBUG: true + ZENML_ANALYTICS_OPT_IN: false + ZENML_LOGGING_VERBOSITY: INFO + ZENML_DISABLE_CLIENT_SERVER_MISMATCH_WARNING: True + + steps: + - name: Check out repository code + uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install requirements + working-directory: ./train_and_deploy + run: | + sudo apt install socat + pip3 install -r requirements.txt + zenml integration install bentoml skypilot_kubernetes s3 aws evidently --uv -y + + - name: Connect to ZenML server + working-directory: ./train_and_deploy + run: | + zenml init + + - name: Set stack (Staging) + working-directory: ./train_and_deploy + run: | + zenml stack set ${{ env.ZENML_STAGING_STACK }} + + - name: Run pipeline (Staging) + working-directory: ./train_and_deploy + run: | + python run.py --training \ No newline at end of file diff --git a/train_and_deploy/README.md b/train_and_deploy/README.md index ffa4dad0..4eae458e 100644 --- a/train_and_deploy/README.md +++ b/train_and_deploy/README.md @@ -7,7 +7,7 @@ classification datasets provided by the scikit-learn library. The project was generated from the [E2E Batch ZenML project template](https://github.com/zenml-io/template-e2e-batch) with the following properties: - Project name: ZenML E2E project -- Technical Name: e2e_use_case +- Technical Name: secret_detection - Version: `0.0.1` - Licensed with apache to ZenML GmbH<> - Deployment environment: `staging` diff --git a/train_and_deploy/RUN.md b/train_and_deploy/RUN.md new file mode 100644 index 00000000..6c43eef2 --- /dev/null +++ b/train_and_deploy/RUN.md @@ -0,0 +1,97 @@ +# Train and Deploy ML Project + +This README provides step-by-step instructions for running the training and deployment pipeline using ZenML. + +## Prerequisites + +- Git installed +- Python environment set up +- ZenML installed +- Access to the ZenML project repository + +## Project Setup + +1. Clone the repository and checkout the feature branch: +```bash +git clone git@github.com:zenml-io/zenml-projects.git +git checkout feature/update-train-deploy +``` + +2. Navigate to the project directory: +```bash +cd train_and_deploy +``` + +3. Initialize ZenML in the project: +```bash +zenml init +``` + +## Running the Pipeline + +### Training + +You have two options for running the training pipeline: + +#### Option 1: Automatic via CI +Make any change to the code and push it. This will automatically trigger the CI pipeline that launches training in SkyPilot. + +#### Option 2: Manual Execution +1. First, set up your stack. You can choose between: + - Local stack (uses local orchestrator): + ```bash + zenml stack set LocalGitGuardian + ``` + - Remote stack (uses SkyPilot orchestrator): + ```bash + zenml stack set RemoteGitGuardian + ``` + +2. Run the training pipeline: +```bash +python run --training +``` + +### Model Deployment + +1. After training completes, deploy the model: +```bash +python run --deployment +``` + +Note: At this stage, the deployment is done to the model set as "staging" (configured in `target_env`), and the model is deployed locally using BentoML. + +2. Test the deployed model: +```bash +python run --inference +``` + +### Production Deployment + +If the staging model performs well and you want to proceed with production deployment: + +1. Deploy to Kubernetes: +```bash +python run --production +``` +This pipeline will: +- Build a Docker image from the BentoML service +- Deploy it to Kubernetes + +## Additional Resources + +- [ZenML Projects Tenant Dashboard](https://cloud.zenml.io/organizations/fc992c14-d960-4db7-812e-8f070c99c6f0/tenants/12ec0fd2-ed02-4479-8ff9-ecbfbaae3285) +- [Example GitHub Actions Pipeline](https://github.com/zenml-io/zenml-projects/actions/runs/12075854945/job/33676323427) + +## Pipeline Flow Overview + +1. Training → Creates and trains the model +2. Deployment → Deploys model to staging environment (local BentoML) +3. Inference → Tests the deployed model +4. Production → Deploys to production Kubernetes environment + +## Notes + +- The deployment configurations are controlled by the `target_env` setting in the configs +- Make sure you have the necessary permissions and access rights before running the pipelines +- Monitor the CI/CD pipeline in GitHub Actions when using automatic deployment \ No newline at end of file diff --git a/train_and_deploy/configs/deploy_production.yaml b/train_and_deploy/configs/deploy_production.yaml new file mode 100644 index 00000000..808eb93b --- /dev/null +++ b/train_and_deploy/configs/deploy_production.yaml @@ -0,0 +1,45 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2024. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# environment configuration +settings: + docker: + python_package_installer: uv + required_integrations: + - aws + - sklearn + - bentoml + + +# configuration of steps +steps: + notify_on_success: + parameters: + notify_on_success: False + +# configuration of the Model Control Plane +model: + name: secret_detection + version: staging + +# pipeline level extra configurations +extra: + notify_on_failure: True + + +parameters: + target_env: staging diff --git a/train_and_deploy/configs/deployer_config.yaml b/train_and_deploy/configs/deployer_config.yaml index aae54437..808eb93b 100644 --- a/train_and_deploy/configs/deployer_config.yaml +++ b/train_and_deploy/configs/deployer_config.yaml @@ -18,14 +18,13 @@ # environment configuration settings: docker: + python_package_installer: uv required_integrations: - aws - - evidently - - mlflow - sklearn - - slack - bentoml + # configuration of steps steps: notify_on_success: @@ -34,10 +33,13 @@ steps: # configuration of the Model Control Plane model: - name: e2e_use_case - version: production + name: secret_detection + version: staging # pipeline level extra configurations extra: notify_on_failure: True + +parameters: + target_env: staging diff --git a/train_and_deploy/configs/inference_config.yaml b/train_and_deploy/configs/inference_config.yaml index 5c79f2d7..2611d300 100644 --- a/train_and_deploy/configs/inference_config.yaml +++ b/train_and_deploy/configs/inference_config.yaml @@ -18,10 +18,8 @@ # environment configuration settings: docker: + python_package_installer: uv required_integrations: - - gcp - - evidently - - mlflow - sklearn - slack - bentoml @@ -34,10 +32,12 @@ steps: # configuration of the Model Control Plane model: - name: e2e_use_case + name: secret_detection version: staging # pipeline level extra configurations extra: notify_on_failure: True +parameters: + target_env: staging \ No newline at end of file diff --git a/train_and_deploy/configs/train_config.yaml b/train_and_deploy/configs/train_config.yaml index 674bbf66..249b7bac 100644 --- a/train_and_deploy/configs/train_config.yaml +++ b/train_and_deploy/configs/train_config.yaml @@ -18,31 +18,32 @@ # environment configuration settings: docker: + python_package_installer: uv required_integrations: - - gcp - - evidently - - mlflow - sklearn - slack - bentoml + orchestrator.vm_kubernetes: + down: True + idle_minutes_to_autostop: 2 # configuration of steps steps: model_trainer: parameters: - name: e2e_use_case + name: secret_detection promote_with_metric_compare: parameters: - mlflow_model_name: e2e_use_case + mlflow_model_name: secret_detection notify_on_success: parameters: notify_on_success: False # configuration of the Model Control Plane model: - name: e2e_use_case + name: secret_detection license: apache - description: e2e_use_case E2E Batch Use Case + description: secret_detection E2E Batch Use Case audience: All ZenML users use_cases: | The ZenML E2E project project demonstrates how the most important steps of @@ -61,10 +62,10 @@ model: extra: notify_on_failure: True # pipeline level parameters +# Updated train_config.yaml + parameters: target_env: staging - # This set contains all the model configurations that you want - # to evaluate during hyperparameter tuning stage. model_search_space: random_forest: model_package: sklearn.ensemble @@ -80,15 +81,20 @@ parameters: - 8 - 10 - 12 + - None # Allow unlimited depth min_samples_leaf: range: start: 1 - end: 10 + end: 15 n_estimators: range: start: 50 - end: 500 - step: 25 + end: 1000 + step: 50 + max_features: + - auto + - sqrt + - log2 decision_tree: model_package: sklearn.tree model_class: DecisionTreeClassifier @@ -103,7 +109,29 @@ parameters: - 8 - 10 - 12 + - None min_samples_leaf: range: start: 1 - end: 10 \ No newline at end of file + end: 15 + gradient_boosting: + model_package: sklearn.ensemble + model_class: GradientBoostingClassifier + search_grid: + learning_rate: + - 0.01 + - 0.1 + - 0.2 + n_estimators: + range: + start: 50 + end: 500 + step: 50 + max_depth: + - 3 + - 5 + - 7 + subsample: + - 0.6 + - 0.8 + - 1.0 \ No newline at end of file diff --git a/train_and_deploy/gitguarden.yaml b/train_and_deploy/gitguarden.yaml new file mode 100644 index 00000000..c2fb92c0 --- /dev/null +++ b/train_and_deploy/gitguarden.yaml @@ -0,0 +1,35 @@ +components: + artifact_store: + configuration: + path: s3://zenml-dev + flavor: s3 + name: s3 + type: artifact_store + container_registry: + configuration: + uri: 339712793861.dkr.ecr.eu-central-1.amazonaws.com + flavor: aws + name: ecr + type: container_registry + data_validator: + configuration: {} + flavor: evidently + name: evidently_data_validator + type: data_validator + image_builder: + configuration: {} + flavor: local + name: local_builder + type: image_builder + model_deployer: + configuration: {} + flavor: bentoml + name: jayesh_bento + type: model_deployer + orchestrator: + configuration: {} + flavor: local + name: default + type: orchestrator +stack_name: secret_detection +zenml_version: 0.70.0 diff --git a/train_and_deploy/pipelines/__init__.py b/train_and_deploy/pipelines/__init__.py index 634503d1..cd019ac0 100644 --- a/train_and_deploy/pipelines/__init__.py +++ b/train_and_deploy/pipelines/__init__.py @@ -16,6 +16,7 @@ # -from .batch_inference import e2e_use_case_batch_inference -from .training import e2e_use_case_training -from .deployment import e2e_use_case_deployment +from .batch_inference import secret_detection_batch_inference +from .training import secret_detection_training +from .local_deployment import secret_detection_local_deployment +from .deploy_production import secret_detection_production_deployment diff --git a/train_and_deploy/pipelines/batch_inference.py b/train_and_deploy/pipelines/batch_inference.py index fbc77227..f891c291 100644 --- a/train_and_deploy/pipelines/batch_inference.py +++ b/train_and_deploy/pipelines/batch_inference.py @@ -33,7 +33,9 @@ @pipeline(on_failure=notify_on_failure) -def e2e_use_case_batch_inference(): +def secret_detection_batch_inference( + target_env: str, +): """ Model batch inference pipeline. @@ -66,6 +68,7 @@ def e2e_use_case_batch_inference(): ########## Inference stage ########## inference_predict( dataset_inf=df_inference, + target_env=target_env, after=["drift_quality_gate"], ) diff --git a/train_and_deploy/pipelines/deploy_production.py b/train_and_deploy/pipelines/deploy_production.py new file mode 100644 index 00000000..86cb911b --- /dev/null +++ b/train_and_deploy/pipelines/deploy_production.py @@ -0,0 +1,40 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2024. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from steps import dockerize_bento_model, notify_on_failure, notify_on_success, deploy_model_to_k8s + +from zenml import pipeline + + +@pipeline(on_failure=notify_on_failure, enable_cache=False) +def secret_detection_production_deployment( + target_env: str, +): + """Model deployment pipeline. + + This is a pipeline deploys trained model for future inference. + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + # Link all the steps together by calling them and passing the output + # of one step as the input of the next step. + ########## Deployment stage ########## + # Get the production model artifact + bento_model_image = dockerize_bento_model(target_env=target_env) + deploy_model_to_k8s(bento_model_image) + + notify_on_success(after=["deploy_model_to_k8s"]) + ### YOUR CODE ENDS HERE ### diff --git a/train_and_deploy/pipelines/deployment.py b/train_and_deploy/pipelines/local_deployment.py similarity index 89% rename from train_and_deploy/pipelines/deployment.py rename to train_and_deploy/pipelines/local_deployment.py index 7d0875a6..4fce7bc2 100644 --- a/train_and_deploy/pipelines/deployment.py +++ b/train_and_deploy/pipelines/local_deployment.py @@ -21,9 +21,10 @@ @pipeline(on_failure=notify_on_failure, enable_cache=False) -def e2e_use_case_deployment(): - """ - Model deployment pipeline. +def secret_detection_local_deployment( + target_env: str, +): + """Model deployment pipeline. This is a pipeline deploys trained model for future inference. """ @@ -33,7 +34,7 @@ def e2e_use_case_deployment(): ########## Deployment stage ########## # Get the production model artifact bento = bento_builder() - deployment_deploy(bento=bento) + deployment_deploy(bento=bento, target_env=target_env) notify_on_success(after=["deployment_deploy"]) ### YOUR CODE ENDS HERE ### diff --git a/train_and_deploy/pipelines/training.py b/train_and_deploy/pipelines/training.py index ba9a2f75..f0c9e9e4 100644 --- a/train_and_deploy/pipelines/training.py +++ b/train_and_deploy/pipelines/training.py @@ -32,7 +32,6 @@ train_data_preprocessor, train_data_splitter, ) - from zenml import pipeline from zenml.logger import get_logger @@ -40,7 +39,7 @@ @pipeline(on_failure=notify_on_failure) -def e2e_use_case_training( +def secret_detection_training( model_search_space: Dict[str, Any], target_env: str, test_size: float = 0.2, @@ -51,8 +50,7 @@ def e2e_use_case_training( min_test_accuracy: float = 0.0, fail_on_accuracy_quality_gates: bool = False, ): - """ - Model training pipeline. + """Model training pipeline. This is a pipeline that loads the data, processes it and splits it into train and test sets, then search for best hyperparameters, diff --git a/train_and_deploy/requirements.txt b/train_and_deploy/requirements.txt index aab392b5..055d50a7 100644 --- a/train_and_deploy/requirements.txt +++ b/train_and_deploy/requirements.txt @@ -1,16 +1,4 @@ -zenml[server]==0.56.3 +zenml[server]==0.70.0 bentoml>=1.0.10 scikit-learn -mlflow>=2.1.1,<=2.12.1 -mlserver>=1.3.3 -mlserver-mlflow>=1.3.3 -python-rapidjson<1.15 -kfp==1.8.22 -gcsfs -google-cloud-secret-manager -google-cloud-container>=2.21.0 -google-cloud-storage>=2.9.0 -google-cloud-aiplatform>=1.34.0 -google-cloud-build>=3.11.0 -kubernetes -evidently>0.2.6,<0.4.5 \ No newline at end of file +kubernetes \ No newline at end of file diff --git a/train_and_deploy/run.py b/train_and_deploy/run.py index 5a9866c2..4f1d18db 100644 --- a/train_and_deploy/run.py +++ b/train_and_deploy/run.py @@ -21,11 +21,11 @@ import click from pipelines import ( - e2e_use_case_batch_inference, - e2e_use_case_deployment, - e2e_use_case_training, + secret_detection_batch_inference, + secret_detection_local_deployment, + secret_detection_production_deployment, + secret_detection_training, ) - from zenml.logger import get_logger logger = get_logger(__name__) @@ -134,6 +134,12 @@ default=False, help="Whether to run the inference pipeline.", ) +@click.option( + "--production", + is_flag=True, + default=False, + help="Whether to run the production pipeline.", +) def main( no_cache: bool = False, no_drop_na: bool = False, @@ -146,6 +152,7 @@ def main( training: bool = True, deployment: bool = False, inference: bool = False, + production: bool = False, ): """Main entry point for the pipeline execution. @@ -167,8 +174,8 @@ def main( thresholds are violated - the pipeline will fail. If `False` thresholds will not affect the pipeline. only_inference: If `True` only inference pipeline will be triggered. + production: If `True` only production pipeline will be triggered. """ - # Run a pipeline with the required parameters. This executes # all steps in the pipeline in the correct order using the orchestrator # stack component that is configured in your active ZenML stack. @@ -195,9 +202,9 @@ def main( "train_config.yaml", ) pipeline_args["run_name"] = ( - f"e2e_use_case_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + f"secret_detection_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" ) - e2e_use_case_training.with_options(**pipeline_args)(**run_args_train) + secret_detection_training.with_options(**pipeline_args)(**run_args_train) logger.info("Training pipeline finished successfully!") if deployment: @@ -209,9 +216,9 @@ def main( "deployer_config.yaml", ) pipeline_args["run_name"] = ( - f"e2e_use_case_deployment_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + f"secret_detection_local_deployment_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" ) - e2e_use_case_deployment.with_options(**pipeline_args)(**run_args_inference) + secret_detection_local_deployment.with_options(**pipeline_args)(**run_args_inference) if inference: # Execute Batch Inference Pipeline @@ -222,11 +229,25 @@ def main( "inference_config.yaml", ) pipeline_args["run_name"] = ( - f"e2e_use_case_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + f"secret_detection_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" ) - e2e_use_case_batch_inference.with_options(**pipeline_args)( + secret_detection_batch_inference.with_options(**pipeline_args)( **run_args_inference ) + if production: + # Execute Production Pipeline + run_args_production = {} + pipeline_args["config_path"] = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "configs", + "deploy_production.yaml", + ) + pipeline_args["run_name"] = ( + f"secret_detection_production_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + ) + secret_detection_production_deployment.with_options(**pipeline_args)( + **run_args_production + ) if __name__ == "__main__": diff --git a/train_and_deploy/service.py b/train_and_deploy/service.py index 0ac94c25..6bc59dcf 100644 --- a/train_and_deploy/service.py +++ b/train_and_deploy/service.py @@ -1,14 +1,32 @@ +import bentoml +import numpy as np +from bentoml.validators import Shape +from typing_extensions import Annotated -import bentoml -from bentoml.io import NumpyNdarray +@bentoml.service +class GitGuarden: + """ + A simple service using a sklearn model + """ -e2e_use_case_runner = bentoml.sklearn.get("e2e_use_case").to_runner() + # Load in the class scope to declare the model as a dependency of the service + iris_model = bentoml.models.get("secret_detection:latest") -svc = bentoml.Service(name="e2e_use_case_service", runners=[e2e_use_case_runner]) + def __init__(self): + """ + Initialize the service by loading the model from the model store + """ + import joblib -input_spec = NumpyNdarray(dtype="float", shape=(-1, 30)) + self.model = joblib.load(self.iris_model.path_of("saved_model.pkl")) -@svc.api(input=input_spec, output=NumpyNdarray()) -async def predict(input_arr): - return await e2e_use_case_runner.predict.async_run(input_arr) + @bentoml.api + def predict( + self, + input_series: Annotated[np.ndarray, Shape((-1, 30))], + ) -> np.ndarray: + """ + Define API with preprocessing and model inference logic + """ + return self.model.predict(input_series) \ No newline at end of file diff --git a/train_and_deploy/steps/__init__.py b/train_and_deploy/steps/__init__.py index 9e546c08..1ed982a2 100644 --- a/train_and_deploy/steps/__init__.py +++ b/train_and_deploy/steps/__init__.py @@ -31,4 +31,4 @@ promote_with_metric_compare, ) from .training import model_evaluator, model_trainer -from .deployment import deployment_deploy, bento_builder +from .deployment import deployment_deploy, bento_builder, dockerize_bento_model, deploy_model_to_k8s diff --git a/train_and_deploy/steps/deployment/__init__.py b/train_and_deploy/steps/deployment/__init__.py index efe4755f..2b2389ef 100644 --- a/train_and_deploy/steps/deployment/__init__.py +++ b/train_and_deploy/steps/deployment/__init__.py @@ -18,3 +18,5 @@ from .deployment_deploy import deployment_deploy from .bento_builder import bento_builder +from .dockerize_bento import dockerize_bento_model +from .deploy_to_k8s import deploy_model_to_k8s diff --git a/train_and_deploy/steps/deployment/bento_builder.py b/train_and_deploy/steps/deployment/bento_builder.py index c53c7b7f..983e28e1 100644 --- a/train_and_deploy/steps/deployment/bento_builder.py +++ b/train_and_deploy/steps/deployment/bento_builder.py @@ -35,7 +35,7 @@ def bento_builder() -> ( Annotated[ Optional[bento.Bento], - ArtifactConfig(name="mlflow_deployment", is_model_artifact=True), + ArtifactConfig(name="bentoml_deployment", is_model_artifact=True), ] ): """Predictions step. @@ -63,7 +63,7 @@ def bento_builder() -> ( bento_model = bentoml.sklearn.save_model(model.name, model.load_artifact(name="model")) # Build the BentoML bundle bento = bentos.build( - service="service.py:svc", + service="service.py:GitGuarden", labels={ "zenml_version": zenml_version, "model_name": model.name, @@ -72,6 +72,14 @@ def bento_builder() -> ( "bento_uri": os.path.join(get_step_context().get_output_artifact_uri(), DEFAULT_BENTO_FILENAME), }, build_ctx=source_utils.get_source_root(), + python={ + "packages": [ + "scikit-learn", + "pandas", + "numpy", + "zenml" + ], + }, ) else: logger.warning("Skipping deployment as the orchestrator is not local.") diff --git a/train_and_deploy/steps/deployment/deploy_to_k8s.py b/train_and_deploy/steps/deployment/deploy_to_k8s.py new file mode 100644 index 00000000..b479963c --- /dev/null +++ b/train_and_deploy/steps/deployment/deploy_to_k8s.py @@ -0,0 +1,166 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +from pathlib import Path +from typing import Dict, Optional +import re +import yaml +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from zenml import get_step_context, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + +def apply_kubernetes_configuration(k8s_configs: list) -> None: + """Apply Kubernetes configurations using the K8s Python client. + + Args: + k8s_configs: List of Kubernetes configuration dictionaries + """ + # Load Kubernetes configuration + try: + config.load_kube_config() + except: + config.load_incluster_config() # For in-cluster deployment + + # Initialize API clients + k8s_apps_v1 = client.AppsV1Api() + k8s_core_v1 = client.CoreV1Api() + + for k8s_config in k8s_configs: + kind = k8s_config["kind"] + name = k8s_config["metadata"]["name"] + namespace = k8s_config["metadata"].get("namespace", "default") + + try: + if kind == "Deployment": + # Check if deployment exists + try: + k8s_apps_v1.read_namespaced_deployment(name, namespace) + # Update existing deployment + k8s_apps_v1.patch_namespaced_deployment( + name=name, + namespace=namespace, + body=k8s_config + ) + logger.info(f"Updated existing deployment: {name}") + except ApiException as e: + if e.status == 404: + # Create new deployment + k8s_apps_v1.create_namespaced_deployment( + namespace=namespace, + body=k8s_config + ) + logger.info(f"Created new deployment: {name}") + else: + raise e + + elif kind == "Service": + # Check if service exists + try: + k8s_core_v1.read_namespaced_service(name, namespace) + # Update existing service + k8s_core_v1.patch_namespaced_service( + name=name, + namespace=namespace, + body=k8s_config + ) + logger.info(f"Updated existing service: {name}") + except ApiException as e: + if e.status == 404: + # Create new service + k8s_core_v1.create_namespaced_service( + namespace=namespace, + body=k8s_config + ) + logger.info(f"Created new service: {name}") + else: + raise e + + except ApiException as e: + logger.error(f"Error applying {kind} {name}: {e}") + raise e + +@step +def deploy_model_to_k8s( + docker_image_tag: str, + namespace: str = "default" +) -> Dict: + # Get the raw model name + raw_model_name = get_step_context().model.name + # Sanitize the model name + model_name = sanitize_name(raw_model_name) + + # Read the K8s template + template_path = Path(__file__).parent / "k8s_template.yaml" + with open(template_path, "r") as f: + k8s_configs = list(yaml.safe_load_all(f)) + + # Update configurations with sanitized names + for config in k8s_configs: + # Add namespace + config["metadata"]["namespace"] = namespace + + # Update metadata labels and name + config["metadata"]["labels"]["app"] = model_name + config["metadata"]["name"] = model_name + + if config["kind"] == "Service": + # Update service selector + config["spec"]["selector"]["app"] = model_name + + elif config["kind"] == "Deployment": + # Update deployment selector and template + config["spec"]["selector"]["matchLabels"]["app"] = model_name + config["spec"]["template"]["metadata"]["labels"]["app"] = model_name + + # Update the container image and name + containers = config["spec"]["template"]["spec"]["containers"] + for container in containers: + container["name"] = model_name + container["image"] = docker_image_tag + + # Apply the configurations + try: + apply_kubernetes_configuration(k8s_configs) + deployment_status = "success" + logger.info(f"Successfully deployed model {model_name} with image: {docker_image_tag}") + except Exception as e: + deployment_status = "failed" + logger.error(f"Failed to deploy model {model_name}: {str(e)}") + raise e + + # Return deployment information + deployment_info = { + "model_name": model_name, + "docker_image": docker_image_tag, + "namespace": namespace, + "status": deployment_status, + "service_port": 3000, + "configurations": k8s_configs + } + + return deployment_info + + + +def sanitize_name(name: str) -> str: + # Convert to lowercase and replace invalid characters with '-' + sanitized = re.sub(r"[^a-z0-9-]", "-", name.lower()) + # Trim to a maximum length of 63 characters and strip leading/trailing '-' + sanitized = sanitized[:63].strip("-") + # Ensure the name doesn't start or end with '-' + sanitized = sanitized.strip("-") + return sanitized \ No newline at end of file diff --git a/train_and_deploy/steps/deployment/deployment_deploy.py b/train_and_deploy/steps/deployment/deployment_deploy.py index 90cc82ed..3cd109c9 100644 --- a/train_and_deploy/steps/deployment/deployment_deploy.py +++ b/train_and_deploy/steps/deployment/deployment_deploy.py @@ -1,92 +1,60 @@ -# Apache Software License 2.0 +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. # -# Copyright (c) ZenML GmbH 2024. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# https://www.apache.org/licenses/LICENSE-2.0 # - +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. from typing import Optional -from typing_extensions import Annotated - -from zenml import ArtifactConfig, get_step_context, step +from bentoml._internal.bento import bento +from zenml import get_step_context, step from zenml.client import Client -from zenml.integrations.bentoml.services.bentoml_deployment import ( - BentoMLDeploymentService, +from zenml.integrations.bentoml.services.bentoml_local_deployment import ( + BentoMLLocalDeploymentConfig, + BentoMLLocalDeploymentService, ) -from zenml import Model, log_artifact_metadata -from zenml.integrations.bentoml.steps import bentoml_model_deployer_step from zenml.logger import get_logger - -from bentoml._internal.bento import bento +from zenml.utils import source_utils logger = get_logger(__name__) + @step def deployment_deploy( bento: bento.Bento, -) -> ( - Annotated[ - Optional[BentoMLDeploymentService], - ArtifactConfig(name="bentoml_deployment", is_deployment_artifact=True), - ] -): - """Predictions step. - - This is an example of a predictions step that takes the data in and returns - predicted values. - - This step is parameterized, which allows you to configure the step - independently of the step code, before running it in a pipeline. - In this example, the step can be configured to use different input data. - See the documentation for more information: - - https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines - - Args: - dataset_inf: The inference dataset. - - Returns: - The predictions as pandas series - """ - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - if Client().active_stack.orchestrator.flavor == "local": - model = get_step_context().model - - # deploy predictor service - bentoml_deployment = bentoml_model_deployer_step.entrypoint( - model_name=model.name, # Name of the model - port=3009, # Port to be used by the http server - production=False, # Deploy the model in production mode - timeout=1000, - bento=bento, - ) - - bentoml_service = Client().get_service(name_id_or_prefix=bentoml_deployment.uuid) - - log_artifact_metadata( - metadata={ - "service_type": "bentoml", - "status": bentoml_service.state, - "prediction_url": bentoml_service.prediction_url, - "health_check_url": bentoml_service.health_check_url, - "model_uri": model.get_artifact(name="model").uri, - "bento" : bentoml_service.config.get("bento"), - } - ) - else: - logger.warning("Skipping deployment as the orchestrator is not local.") - bentoml_deployment = None - ### YOUR CODE ENDS HERE ### - return bentoml_deployment \ No newline at end of file + target_env: str, +) -> Optional[BentoMLLocalDeploymentService]: + # Deploy a model using the MLflow Model Deployer + zenml_client = Client() + step_context = get_step_context() + pipeline_name = step_context.pipeline.name + step_name = step_context.step_run.name + model_deployer = zenml_client.active_stack.model_deployer + bentoml_deployment_config = BentoMLLocalDeploymentConfig( + model_name=step_context.model.name, + model_version=target_env, + description="An example of deploying a model using the MLflow Model Deployer", + pipeline_name=pipeline_name, + pipeline_step_name=step_name, + model_uri=bento.info.labels.get("model_uri"), + bento_tag=str(bento.tag), + bento_uri=bento.info.labels.get("bento_uri"), + working_dir=source_utils.get_source_root(), + timeout=1500, + ) + service = model_deployer.deploy_model( + config=bentoml_deployment_config, + service_type=BentoMLLocalDeploymentService.SERVICE_TYPE, + ) + logger.info( + f"The deployed service info: {model_deployer.get_model_server_info(service)}" + ) + return service diff --git a/train_and_deploy/steps/deployment/dockerize_bento.py b/train_and_deploy/steps/deployment/dockerize_bento.py new file mode 100644 index 00000000..164178ff --- /dev/null +++ b/train_and_deploy/steps/deployment/dockerize_bento.py @@ -0,0 +1,69 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +import os +from typing import Optional + +import bentoml +from bentoml import bentos +from bentoml._internal.bento import bento +from typing_extensions import Annotated +from zenml import ArtifactConfig, Model, get_step_context, step +from zenml import __version__ as zenml_version +from zenml.client import Client +from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME +from zenml.integrations.bentoml.steps import bento_builder_step +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + +@step +def dockerize_bento_model( + target_env: str, + ) -> ( + Annotated[ + str, + ArtifactConfig(name="bentoml_model_image"), + ] +): + """dockerize_bento step. + + This step is responsible for dockerizing the BentoML model. + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + model = get_step_context().model + version_to_deploy = Model(name=model.name, version=target_env) + bentoml_deployment = version_to_deploy.get_model_artifact(name="bentoml_deployment") + bento_tag = f'{bentoml_deployment.run_metadata["bento_tag_name"]}:{bentoml_deployment.run_metadata["bento_info_version"]}' + + zenml_client = Client() + container_registry = zenml_client.active_stack.container_registry + assert container_registry, "Container registry is not configured." + image_name = f"{container_registry.config.uri}/{bento_tag}" + image_tag = (image_name,) + try: + bentoml.container.build( + bento_tag=bento_tag, + backend="docker", # hardcoding docker since container service only supports docker + image_tag=image_tag, + ) + + except Exception as e: + logger.error(f"Error containerizing the bento: {e}") + raise e + + container_registry.push_image(image_name) + ### YOUR CODE ENDS HERE ### + return image_name + diff --git a/train_and_deploy/steps/deployment/k8s_template.yaml b/train_and_deploy/steps/deployment/k8s_template.yaml new file mode 100644 index 00000000..dd6b918f --- /dev/null +++ b/train_and_deploy/steps/deployment/k8s_template.yaml @@ -0,0 +1,35 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: placeholder + name: placeholder +spec: + ports: + - name: http # Changed from 'predict' to 'http' for clarity + port: 80 # External port exposed by LoadBalancer + targetPort: 3000 # Internal container port + selector: + app: placeholder + type: LoadBalancer +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: placeholder + name: placeholder +spec: + selector: + matchLabels: + app: placeholder + template: + metadata: + labels: + app: placeholder + spec: + containers: + - image: placeholder + name: placeholder + ports: + - containerPort: 3000 \ No newline at end of file diff --git a/train_and_deploy/steps/hp_tuning/hp_tuning_select_best_model.py b/train_and_deploy/steps/hp_tuning/hp_tuning_select_best_model.py index 7d5a6bc3..65e524ec 100644 --- a/train_and_deploy/steps/hp_tuning/hp_tuning_select_best_model.py +++ b/train_and_deploy/steps/hp_tuning/hp_tuning_select_best_model.py @@ -50,7 +50,7 @@ def hp_tuning_select_best_model( hp_output = model.get_data_artifact("hp_result") model_: ClassifierMixin = hp_output.load() # fetch metadata we attached earlier - metric = float(hp_output.run_metadata["metric"].value) + metric = float(hp_output.run_metadata["metric"]) if best_model is None or best_metric < metric: best_model = model_ ### YOUR CODE ENDS HERE ### diff --git a/train_and_deploy/steps/inference/inference_predict.py b/train_and_deploy/steps/inference/inference_predict.py index f8242b5c..76bb017f 100644 --- a/train_and_deploy/steps/inference/inference_predict.py +++ b/train_and_deploy/steps/inference/inference_predict.py @@ -16,14 +16,13 @@ # -from typing import Optional - +from typing import Optional, cast +from zenml.client import Client import pandas as pd from typing_extensions import Annotated - from zenml import get_step_context, step -from zenml.integrations.bentoml.services.bentoml_deployment import ( - BentoMLDeploymentService, +from zenml.integrations.bentoml.services.bentoml_local_deployment import ( + BentoMLLocalDeploymentService, ) from zenml.logger import get_logger @@ -33,6 +32,7 @@ @step def inference_predict( dataset_inf: pd.DataFrame, + target_env: str, ) -> Annotated[pd.Series, "predictions"]: """Predictions step. @@ -56,12 +56,18 @@ def inference_predict( model = get_step_context().model # get predictor - predictor_service: Optional[BentoMLDeploymentService] = model.load_artifact( - "bentomldeployment" + zenml_client = Client() + model_deployer = zenml_client.active_stack.model_deployer + + # fetch existing services with same pipeline name, step name and model name + existing_services = model_deployer.find_model_server( + model_name=model.name, + model_version=target_env, ) + predictor_service = cast(BentoMLLocalDeploymentService, existing_services[0]) if predictor_service is not None: # run prediction from service - predictions = predictor_service.predict(request=dataset_inf) + predictions = predictor_service.predict(api_endpoint="predict",data=dataset_inf) else: logger.warning( "Predicting from loaded model instead of deployment service " diff --git a/train_and_deploy/steps/promotion/compute_performance_metrics_on_current_data.py b/train_and_deploy/steps/promotion/compute_performance_metrics_on_current_data.py index 7fabfb30..83f08978 100644 --- a/train_and_deploy/steps/promotion/compute_performance_metrics_on_current_data.py +++ b/train_and_deploy/steps/promotion/compute_performance_metrics_on_current_data.py @@ -20,8 +20,8 @@ import pandas as pd from sklearn.metrics import accuracy_score from typing_extensions import Annotated - from zenml import Model, get_step_context, step +from zenml.enums import ModelStages from zenml.logger import get_logger logger = get_logger(__name__) @@ -53,7 +53,6 @@ def compute_performance_metrics_on_current_data( Returns: Latest version and current version metric values on a test set. """ - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### X = dataset_tst.drop(columns=["target"]) y = dataset_tst["target"].to_numpy() @@ -61,15 +60,19 @@ def compute_performance_metrics_on_current_data( # Get model version numbers from Model Control Plane latest_version = get_step_context().model - current_version = Model(name=latest_version.name, version=target_env) latest_version_number = latest_version.number - current_version_number = current_version.number + current_version_number = None + try: + current_version = Model(name=latest_version.name, version=ModelStages.STAGING) + except Exception: + pass if current_version_number is None: current_version_number = -1 metrics = {latest_version_number: 1.0, current_version_number: 0.0} else: + current_version_number = current_version.number # Get predictors predictors = { latest_version_number: latest_version.load_artifact("model"), diff --git a/train_and_deploy/steps/promotion/promote_with_metric_compare.py b/train_and_deploy/steps/promotion/promote_with_metric_compare.py index 48d23594..22ac2f3e 100644 --- a/train_and_deploy/steps/promotion/promote_with_metric_compare.py +++ b/train_and_deploy/steps/promotion/promote_with_metric_compare.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from zenml import Model, get_step_context, step +from zenml.enums import ModelStages from zenml.logger import get_logger logger = get_logger(__name__) @@ -49,16 +50,17 @@ def promote_with_metric_compare( latest_metric: Recently trained model metric results. current_metric: Previously promoted model metric results. """ - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### should_promote = True # Get model version numbers from Model Control Plane latest_version = get_step_context().model - current_version = Model(name=latest_version.name, version=target_env) - - current_version_number = current_version.number - + current_version_number = None + try: + current_version = Model(name=latest_version.name, version=ModelStages.STAGING) + current_version_number = current_version.number + except Exception: + pass if current_version_number is None: logger.info("No current model version found - promoting latest") else: diff --git a/train_and_deploy/steps/training/model_evaluator.py b/train_and_deploy/steps/training/model_evaluator.py index 7a4656e5..2e61a172 100644 --- a/train_and_deploy/steps/training/model_evaluator.py +++ b/train_and_deploy/steps/training/model_evaluator.py @@ -16,7 +16,6 @@ # -import mlflow import pandas as pd from sklearn.base import ClassifierMixin @@ -26,10 +25,10 @@ logger = get_logger(__name__) -experiment_tracker = Client().active_stack.experiment_tracker +#experiment_tracker = Client().active_stack.experiment_tracker -@step(experiment_tracker=experiment_tracker.name) +@step#(experiment_tracker=experiment_tracker.name) def model_evaluator( model: ClassifierMixin, dataset_trn: pd.DataFrame, @@ -88,7 +87,7 @@ def model_evaluator( dataset_tst[target], ) logger.info(f"Test accuracy={tst_acc*100:.2f}%") - mlflow.log_metric("testing_accuracy_score", tst_acc) + #mlflow.log_metric("testing_accuracy_score", tst_acc) messages = [] if trn_acc < min_train_accuracy: diff --git a/train_and_deploy/steps/training/model_trainer.py b/train_and_deploy/steps/training/model_trainer.py index 8d2fdb2d..f369948a 100644 --- a/train_and_deploy/steps/training/model_trainer.py +++ b/train_and_deploy/steps/training/model_trainer.py @@ -15,37 +15,28 @@ # limitations under the License. # -import mlflow import pandas as pd from sklearn.base import ClassifierMixin from typing_extensions import Annotated -from zenml import log_model_metadata -from zenml.metadata.metadata_types import Uri - -from zenml import ArtifactConfig, get_step_context, step +from zenml import ArtifactConfig, get_step_context, log_model_metadata, step from zenml.client import Client -from zenml.integrations.mlflow.experiment_trackers import ( - MLFlowExperimentTracker, -) -from zenml.integrations.mlflow.steps.mlflow_registry import ( - mlflow_register_model_step, -) from zenml.logger import get_logger +from zenml.metadata.metadata_types import Uri logger = get_logger(__name__) -experiment_tracker = Client().active_stack.experiment_tracker +#experiment_tracker = Client().active_stack.experiment_tracker -if not experiment_tracker or not isinstance( - experiment_tracker, MLFlowExperimentTracker -): - raise RuntimeError( - "Your active stack needs to contain a MLFlow experiment tracker for " - "this example to work." - ) +#if not experiment_tracker or not isinstance( +# experiment_tracker, MLFlowExperimentTracker +#): + #raise RuntimeError( + # "Your active stack needs to contain a MLFlow experiment tracker for " + # "this example to work." + #) -@step(experiment_tracker=experiment_tracker.name) +@step#(experiment_tracker=experiment_tracker.name) def model_trainer( dataset_trn: pd.DataFrame, model: ClassifierMixin, @@ -84,27 +75,26 @@ def model_trainer( Returns: The trained model artifact. """ - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### # Initialize the model with the hyperparameters indicated in the step # parameters and train it on the training set. logger.info(f"Training model {model}...") - mlflow.sklearn.autolog() + #mlflow.sklearn.autolog() model.fit( dataset_trn.drop(columns=[target]), dataset_trn[target], ) - log_model_metadata( - metadata={ - "experiment_tracker": { - "experiment_tracker_url": Uri( - experiment_tracker.get_tracking_uri() - ), - "experiment_tracker_run_id": mlflow.last_active_run().info.run_id, - "experiment_tracker_run_name": mlflow.active_run().info.run_name, - "experiment_tracker_experiment_id": mlflow.active_run().info.experiment_id, - }} - ) + #log_model_metadata( + # metadata={ + # "experiment_tracker": { + # "experiment_tracker_url": Uri( + # experiment_tracker.get_tracking_uri() + # ), + # "experiment_tracker_run_id": mlflow.last_active_run().info.run_id, + # "experiment_tracker_run_name": mlflow.active_run().info.run_name, + # "experiment_tracker_experiment_id": mlflow.active_run().info.experiment_id, + # }} + #) return model