diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..2f41707 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.ipynb filter=strip-notebook-output diff --git a/bring-your-own-model/sklearn-end2end.ipynb b/bring-your-own-model/sklearn-end2end.ipynb new file mode 100644 index 0000000..2e914ef --- /dev/null +++ b/bring-your-own-model/sklearn-end2end.ipynb @@ -0,0 +1,777 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Targeting Direct Marketing with Amazon SageMaker and Scikit-Learn\n", + "\n", + "---\n", + "\n", + "## Background\n", + "Direct marketing, either through mail, email, phone, etc., is a common tactic to acquire customers. Because resources and a customer's attention is limited, the goal is to only target the subset of prospects who are likely to engage with a specific offer. Predicting those potential customers based on readily available information like demographics, past interactions, and environmental factors is a common machine learning problem.\n", + "\n", + "This notebook presents an example problem to predict if a customer will enroll for a term deposit at a bank, after one or more phone calls. The steps include:\n", + "\n", + "* Preparing your Amazon SageMaker notebook\n", + "* Downloading data from the internet into Amazon SageMaker\n", + "* Investigating and transforming the data so that it can be fed to Amazon SageMaker algorithms\n", + "* Estimating a model using the Gradient Boosting algorithm\n", + "* Evaluating the effectiveness of the model\n", + "* Setting the model up to make on-going predictions\n", + "\n", + "---\n", + "\n", + "## Preparation\n", + "\n", + "_This notebook was created and tested on an ml.m4.xlarge notebook instance._\n", + "\n", + "Let's start by specifying:\n", + "\n", + "- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", + "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "isConfigCell": true + }, + "outputs": [], + "source": [ + "# cell 01\n", + "import sagemaker\n", + "bucket=sagemaker.Session().default_bucket()\n", + "prefix = 'sagemaker/sklearn-end-2end-immday'\n", + " \n", + "# Define IAM role\n", + "import boto3\n", + "import re\n", + "from sagemaker import get_execution_role\n", + "\n", + "role = get_execution_role()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now let's bring in the Python libraries that we'll use throughout the analysis" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 02\n", + "import numpy as np # For matrix operations and numerical processing\n", + "import pandas as pd # For munging tabular data\n", + "import matplotlib.pyplot as plt # For charts and visualizations\n", + "from IPython.display import Image # For displaying images in the notebook\n", + "from IPython.display import display # For displaying outputs in the notebook\n", + "from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.\n", + "import sys # For writing outputs to notebook\n", + "import math # For ceiling function\n", + "import json # For parsing hosting outputs\n", + "import os # For manipulating filepath names\n", + "import sagemaker \n", + "import zipfile # Amazon SageMaker's Python SDK provides many helper functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 03\n", + "pd.__version__" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Make sure pandas version is set to 1.2.4 or later. If it is not the case, restart the kernel before going further" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Data\n", + "Let's start by downloading the [direct marketing dataset](https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip) from the sample data s3 bucket. \n", + "\n", + "\\[Moro et al., 2014\\] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 04\n", + "!wget https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip --no-check-certificate\n", + "\n", + "with zipfile.ZipFile('bank-additional.zip', 'r') as zip_ref:\n", + " zip_ref.extractall('.')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now lets read this into a Pandas data frame and take a look." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 05\n", + "data = pd.read_csv('./bank-additional/bank-additional-full.csv')\n", + "pd.set_option('display.max_columns', 500) # Make sure we can see all of the columns\n", + "pd.set_option('display.max_rows', 20) # Keep the output on one page\n", + "data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We will store this natively in S3 to then process it with SageMaker Processing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 06\n", + "from sagemaker import Session\n", + "\n", + "sess = Session()\n", + "input_source = sess.upload_data('./bank-additional/bank-additional-full.csv', bucket=bucket, key_prefix=f'{prefix}/input_data')\n", + "input_source" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Feature Engineering with Amazon SageMaker Processing\n", + "\n", + "Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.\n", + "\n", + "![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)\n", + "\n", + "Here, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances. SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks. \n", + "\n", + "To use SageMaker Processing, simply supply a Python data preprocessing script as shown below. For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data. There are few limitations on what kinds of code and operations you can run, and only a minimal contract: input and output data must be placed in specified directories. If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile preprocessing.py\n", + "import pandas as pd\n", + "import numpy as np\n", + "import argparse\n", + "import os\n", + "from sklearn.preprocessing import OrdinalEncoder\n", + "\n", + "def _parse_args():\n", + "\n", + " parser = argparse.ArgumentParser()\n", + "\n", + " # Data, model, and output directories\n", + " # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.\n", + " parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')\n", + " parser.add_argument('--filename', type=str, default='bank-additional-full.csv')\n", + " parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')\n", + " parser.add_argument('--categorical_features', type=str, default='y, job, marital, education, default, housing, loan, contact, month, day_of_week, poutcome')\n", + "\n", + " return parser.parse_known_args()\n", + "\n", + "if __name__==\"__main__\":\n", + " # Process arguments\n", + " args, _ = _parse_args()\n", + " # Load data\n", + " df = pd.read_csv(os.path.join(args.filepath, args.filename))\n", + " # Change the value . into _\n", + " df = df.replace(regex=r'\\.', value='_')\n", + " df = df.replace(regex=r'\\_$', value='')\n", + " # Add two new indicators\n", + " df[\"no_previous_contact\"] = (df[\"pdays\"] == 999).astype(int)\n", + " df[\"not_working\"] = df[\"job\"].isin([\"student\", \"retired\", \"unemployed\"]).astype(int)\n", + " df = df.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)\n", + " # Encode the categorical features\n", + " df = pd.get_dummies(df)\n", + " # Train, test, validation split\n", + " train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=42), [int(0.7 * len(df)), int(0.9 * len(df))]) # Randomly sort the data then split out first 70%, second 20%, and last 10%\n", + " # Local store\n", + " pd.concat([train_data['y_yes'], train_data.drop(['y_yes','y_no'], axis=1)], axis=1).to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)\n", + " pd.concat([validation_data['y_yes'], validation_data.drop(['y_yes','y_no'], axis=1)], axis=1).to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)\n", + " test_data['y_yes'].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)\n", + " test_data.drop(['y_yes','y_no'], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)\n", + " print(\"## Processing complete. Exiting.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object. This object allows you to specify the instance type to use in the job, as well as how many instances. Although the Boston Housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 08\n", + "train_path = f\"s3://{bucket}/{prefix}/train\"\n", + "validation_path = f\"s3://{bucket}/{prefix}/validation\"\n", + "test_path = f\"s3://{bucket}/{prefix}/test\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 09\n", + "from sagemaker.sklearn.processing import SKLearnProcessor\n", + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", + "from sagemaker import get_execution_role\n", + "\n", + "\n", + "sklearn_processor = SKLearnProcessor(\n", + " framework_version=\"0.23-1\",\n", + " role=get_execution_role(),\n", + " instance_type=\"ml.m5.large\",\n", + " instance_count=1, \n", + " base_job_name='sm-immday-skprocessing'\n", + ")\n", + "\n", + "sklearn_processor.run(\n", + " code='preprocessing.py',\n", + " inputs=[\n", + " ProcessingInput(\n", + " source=input_source, \n", + " destination=\"/opt/ml/processing/input\",\n", + " s3_input_mode=\"File\",\n", + " s3_data_distribution_type=\"ShardedByS3Key\"\n", + " )\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(\n", + " output_name=\"train_data\", \n", + " source=\"/opt/ml/processing/output/train\",\n", + " destination=train_path,\n", + " ),\n", + " ProcessingOutput(output_name=\"validation_data\", source=\"/opt/ml/processing/output/validation\", destination=validation_path),\n", + " ProcessingOutput(output_name=\"test_data\", source=\"/opt/ml/processing/output/test\", destination=test_path),\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## End of Lab 1\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Training\n", + "Now we know that most of our features have skewed distributions, some are highly correlated with one another, and some appear to have non-linear relationships with our target variable. Also, for targeting future prospects, good predictive accuracy is preferred to being able to explain why that prospect was targeted. Taken together, these aspects make gradient boosted trees a good candidate algorithm.\n", + "\n", + "There are several intricacies to understanding the algorithm, but at a high level, gradient boosted trees works by combining predictions from many simple models, each of which tries to address the weaknesses of the previous models. By doing this the collection of simple models can actually outperform large, complex models. Other Amazon SageMaker notebooks elaborate on gradient boosting trees further and how they differ from similar algorithms.\n", + "\n", + "In this notebook we show how to use Amazon SageMaker to develop, train, tune and deploy a Scikit-Learn based ML model (Random Forest). More info on Scikit-Learn can be found [here](https://scikit-learn.org/stable/index.html)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 10\n", + "s3_input_train = sagemaker.inputs.TrainingInput(s3_data=train_path.format(bucket, prefix), content_type='csv')\n", + "s3_input_validation = sagemaker.inputs.TrainingInput(s3_data=validation_path.format(bucket, prefix), content_type='csv')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The below script contains both training and inference functionality and can run both in SageMaker Training hardware or locally (desktop, SageMaker notebook, on prem, etc). Detailed guidance here https://sagemaker.readthedocs.io/en/stable/using_sklearn.html#preparing-the-scikit-learn-training-script" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile sklearn-train.py\n", + "from sklearn.ensemble import RandomForestClassifier\n", + "from sklearn.metrics import accuracy_score\n", + "from joblib import dump, load\n", + "import pandas as pd, numpy as np, os, argparse\n", + "\n", + "# inference function - tells SageMaker how to load the model\n", + "def model_fn(model_dir):\n", + " clf = load(os.path.join(model_dir, \"model.joblib\"))\n", + " return clf\n", + "\n", + "# Argument parser\n", + "def _parse_args():\n", + " parser = argparse.ArgumentParser()\n", + " # Hyperparameters\n", + " parser.add_argument(\"--n-estimators\", type=int, default=10)\n", + " parser.add_argument(\"--min-samples-leaf\", type=int, default=3)\n", + " # Data, model, and output directories\n", + " parser.add_argument(\"--model-dir\", type=str, default=os.environ.get(\"SM_MODEL_DIR\"))\n", + " parser.add_argument(\"--train\", type=str, default=os.environ.get(\"SM_CHANNEL_TRAIN\"))\n", + " parser.add_argument(\"--test\", type=str, default=os.environ.get(\"SM_CHANNEL_TEST\"))\n", + " parser.add_argument(\"--train-file\", type=str, default=\"train.csv\")\n", + " parser.add_argument(\"--test-file\", type=str, default=\"test.csv\")\n", + " # Parse the arguments\n", + " return parser.parse_known_args()\n", + "\n", + "# Main Training Loop\n", + "if __name__==\"__main__\":\n", + " # Process arguments\n", + " args, _ = _parse_args()\n", + " # Load the dataset\n", + " train_df = pd.read_csv(os.path.join(args.train, args.train_file))\n", + " test_df = pd.read_csv(os.path.join(args.test, args.test_file))\n", + " # Separate X and y\n", + " X_train, y_train = train_df.drop(train_df.columns[0], axis=1), train_df[train_df.columns[0]]\n", + " X_test, y_test = test_df.drop(test_df.columns[0], axis=1), test_df[test_df.columns[0]]\n", + " # Define the model and train it\n", + " model = RandomForestClassifier(\n", + " n_estimators=args.n_estimators, min_samples_leaf=args.min_samples_leaf, n_jobs=-1\n", + " )\n", + " model.fit(X_train, y_train)\n", + " # Evaluate the model performances\n", + " print(f'Model Accuracy: {accuracy_score(y_test, model.predict(X_test))}')\n", + " dump(model, os.path.join(args.model_dir, 'model.joblib'))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 12\n", + "# We use the Estimator from the SageMaker Python SDK\n", + "from sagemaker import get_execution_role\n", + "from sagemaker.sklearn.estimator import SKLearn\n", + "\n", + "FRAMEWORK_VERSION = \"0.23-1\"\n", + "\n", + "# Define the Estimator from SageMaker (Script Mode)\n", + "sklearn_estimator = SKLearn(\n", + " entry_point=\"sklearn-train.py\",\n", + " role=get_execution_role(),\n", + " instance_count=1,\n", + " instance_type=\"ml.c5.xlarge\",\n", + " framework_version=FRAMEWORK_VERSION,\n", + " base_job_name=\"rf-scikit\",\n", + " metric_definitions=[{\"Name\": \"model_accuracy\", \"Regex\": \"Model Accuracy: ([0-9.]+).*$\"}],\n", + " hyperparameters={\n", + " \"n-estimators\": 100,\n", + " \"min-samples-leaf\": 3,\n", + " \"test-file\": \"validation.csv\"\n", + " },\n", + ")\n", + "\n", + "# Train the model (~5 minutes)\n", + "sklearn_estimator.fit({\"train\": s3_input_train, \"test\": s3_input_validation})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Hosting\n", + "Now that we've trained the algorithm on our data, let's deploy a model that's hosted behind a real-time endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 13\n", + "sklearn_predictor = sklearn_estimator.deploy(initial_instance_count=1,\n", + " instance_type='ml.m4.xlarge')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Evaluation\n", + "There are many ways to compare the performance of a machine learning model, but let's start by simply comparing actual to predicted values. In this case, we're simply predicting whether the customer subscribed to a term deposit (`1`) or not (`0`), which produces a simple confusion matrix.\n", + "\n", + "First we'll need to determine how we pass data into and receive data from our endpoint. Our data is currently stored as NumPy arrays in memory of our notebook instance. To send it in an HTTP POST request, we'll serialize it as a CSV string and then decode the resulting CSV.\n", + "\n", + "*Note: For inference with CSV format, SageMaker XGBoost requires that the data does NOT include the target variable.*" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 14\n", + "sklearn_predictor.serializer = sagemaker.serializers.CSVSerializer()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, we'll use a simple function to:\n", + "1. Loop over our test dataset\n", + "1. Split it into mini-batches of rows \n", + "1. Convert those mini-batches to CSV string payloads (notice, we drop the target variable from our dataset first)\n", + "1. Retrieve mini-batch predictions by invoking the XGBoost endpoint\n", + "1. Collect predictions and convert from the CSV output our model provides into a NumPy array" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 15\n", + "!aws s3 cp $test_path/test_x.csv /tmp/test_x.csv\n", + "!aws s3 cp $test_path/test_y.csv /tmp/test_y.csv" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 16\n", + "test_x = pd.read_csv('/tmp/test_x.csv', names=[f'{i}' for i in range(59)])\n", + "test_y = pd.read_csv('/tmp/test_y.csv', names=['y'])\n", + "predictions = sklearn_predictor.predict(test_x.values)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we'll check our confusion matrix to see how well we predicted versus actuals." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 17\n", + "pd.crosstab(index=test_y['y'].values, columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "So, of the ~4000 potential customers, we predicted 136 would subscribe and 94 of them actually did. We also had 389 subscribers who subscribed that we did not predict would. This is less than desirable, but the model can (and should) be tuned to improve this. Most importantly, note that with minimal effort, our model produced accuracies similar to those published [here](http://media.salford-systems.com/video/tutorial/2015/targeted_marketing.pdf).\n", + "\n", + "_Note that because there is some element of randomness in the algorithm's subsample, your results may differ slightly from the text written above._" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get Inferences for an Entire Dataset with Batch Transform\n", + "\n", + "To get inferences for an entire dataset, use batch transform. With batch transform, you create a batch transform job using a trained model and the dataset, which must be stored in Amazon S3. Amazon SageMaker saves the inferences in an S3 bucket that you specify when you create the batch transform job. Batch transform manages all of the compute resources required to get inferences. This includes launching instances and deleting them after the batch transform job has completed. Batch transform manages interactions between the data and the model with an object within the instance node called an agent.\n", + "\n", + "Use batch transform when you:\n", + "\n", + "- Want to get inferences for an entire dataset and index them to serve inferences in real time\n", + "- Don't need a persistent endpoint that applications (for example, web or mobile apps) can call to get inferences\n", + "- Don't need the subsecond latency that SageMaker hosted endpoints provide\n", + "- You can also use batch transform to preprocess your data before using it to train a new model or generate inferences.\n", + "\n", + "The following diagram shows the workflow of a batch transform job:\n", + "\n", + "![batch_transform](https://docs.aws.amazon.com/sagemaker/latest/dg/images/batch-transform-v2.png)\n", + "\n", + "To perform a batch transform, create a batch transform job using either the SageMaker console or the API. Provide the following:\n", + "\n", + "- The path to the S3 bucket where you've stored the data that you want to transform.\n", + "- The compute resources that you want SageMaker to use for the transform job. Compute resources are machine learning (ML) compute instances that are managed by SageMaker.\n", + "- The path to the S3 bucket where you want to store the output of the job.\n", + "- The name of the SageMaker model that you want to use to create inferences. You must use a model that you have already created either with the [CreateModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateModel.html) operation or the console.\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 18\n", + "transformer_output_path = f\"s3://{bucket}/{prefix}/transformer-output\"\n", + "\n", + "sklearn_transformer = sklearn_estimator.transformer(\n", + " instance_count=1,\n", + " instance_type='ml.m5.large',\n", + " output_path=transformer_output_path\n", + ")\n", + "\n", + "sklearn_transformer.transform(\n", + " data=f'{test_path}/test_x.csv',\n", + " data_type='S3Prefix',\n", + " content_type='text/csv'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 19\n", + "!aws s3 cp $transformer_output_path/test_x.csv.out /tmp/predictions.txt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 20\n", + "import json\n", + "\n", + "with open('/tmp/predictions.txt', 'r') as r:\n", + " a = r.read()[1:-1].split(', ')\n", + " predictions = np.asarray(a)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 21\n", + "!aws s3 cp $test_path/test_y.csv /tmp/test_y.csv", + "test_y = pd.read_csv('/tmp/test_y.csv', names=['y'])\n", + "pd.crosstab(index=test_y['y'].values, columns=predictions, rownames=['actuals'], colnames=['predictions'])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Automatic model Tuning (optional)\n", + "Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose.\n", + "For example, suppose that you want to solve a binary classification problem on this marketing dataset. Your goal is to maximize the area under the curve (auc) metric of the algorithm by training an XGBoost Algorithm model. You don't know which values of the eta, alpha, min_child_weight, and max_depth hyperparameters to use to train the best model. To find the best values for these hyperparameters, you can specify ranges of values that Amazon SageMaker hyperparameter tuning searches to find the combination of values that results in the training job that performs the best as measured by the objective metric that you chose. Hyperparameter tuning launches training jobs that use hyperparameter values in the ranges that you specified, and returns the training job with highest auc.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 22\n", + "from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner\n", + "hyperparameter_ranges = {\"n-estimators\": IntegerParameter(50, 250), \"min-samples-leaf\": IntegerParameter(1, 10)}\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 23\n", + "objective_metric_name = 'model_accuracy'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 24\n", + "tuner = HyperparameterTuner(sklearn_estimator,\n", + " objective_metric_name,\n", + " hyperparameter_ranges,\n", + " metric_definitions=[{\"Name\": \"model_accuracy\", \"Regex\": \"Model Accuracy: ([0-9.]+).*$\"}],\n", + " objective_type='Maximize',\n", + " max_jobs=9,\n", + " max_parallel_jobs=3)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 25\n", + "tuner.fit({'train': s3_input_train, 'test': s3_input_validation})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 26\n", + "boto3.client('sagemaker').describe_hyper_parameter_tuning_job(\n", + "HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 27\n", + "# return the best training job name\n", + "tuner.best_training_job()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 28\n", + "# Deploy the best trained or user specified model to an Amazon SageMaker endpoint\n", + "tuner_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')\n", + "tuner_predictor.serializer = sagemaker.serializers.CSVSerializer()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 29\n", + "# Deploy the best one and predict\n", + "predictions = tuner_predictor.predict(test_x.values)\n", + "pd.crosstab(index=test_y['y'].values, columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Extensions\n", + "\n", + "This example analyzed a relatively small dataset, but utilized Amazon SageMaker features such as distributed, managed training and real-time model hosting, which could easily be applied to much larger problems. In order to improve predictive accuracy further, we could tweak value we threshold our predictions at to alter the mix of false-positives and false-negatives, or we could explore techniques like hyperparameter tuning. In a real-world scenario, we would also spend more time engineering features by hand and would likely look for additional datasets to include which contain customer information not available in our initial dataset." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### (Optional) Clean-up\n", + "\n", + "If you are done with this notebook, please run the cell below. This will remove the hosted endpoint you created and avoid any charges from a stray instance being left on." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 30\n", + "sklearn_predictor.delete_endpoint(delete_endpoint_config=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 31\n", + "tuner_predictor.delete_endpoint(delete_endpoint_config=True)" + ] + } + ], + "metadata": { + "celltoolbar": "Tags", + "kernelspec": { + "display_name": "Python 3 (Data Science)", + "language": "python", + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:eu-west-1:470317259841:image/datascience-1.0" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + }, + "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/feature-engineering-and-training/.gitattributes b/feature-engineering-and-training/.gitattributes new file mode 100644 index 0000000..2f41707 --- /dev/null +++ b/feature-engineering-and-training/.gitattributes @@ -0,0 +1 @@ +*.ipynb filter=strip-notebook-output diff --git a/feature-engineering-and-training/.ipynb_checkpoints/numpy_xgboost_direct_marketing_sagemaker-checkpoint.ipynb b/feature-engineering-and-training/.ipynb_checkpoints/numpy_xgboost_direct_marketing_sagemaker-checkpoint.ipynb new file mode 100644 index 0000000..57d76f3 --- /dev/null +++ b/feature-engineering-and-training/.ipynb_checkpoints/numpy_xgboost_direct_marketing_sagemaker-checkpoint.ipynb @@ -0,0 +1,1113 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Targeting Direct Marketing with Amazon SageMaker XGBoost\n", + "_**Supervised Learning with Gradient Boosted Trees: A Binary Prediction Problem With Unbalanced Classes**_\n", + "\n", + "---\n", + "\n", + "---\n", + "\n", + "## Contents\n", + "\n", + "1. [Background](#Background)\n", + "1. [Prepration](#Preparation)\n", + "1. [Data](#Data)\n", + " 1. [Exploration](#Exploration)\n", + " 1. [Transformation](#Transformation)\n", + "1. [Training](#Training)\n", + "1. [Hosting](#Hosting)\n", + "1. [Evaluation](#Evaluation)\n", + "1. [Exentsions](#Extensions)\n", + "\n", + "---\n", + "\n", + "## Background\n", + "Direct marketing, either through mail, email, phone, etc., is a common tactic to acquire customers. Because resources and a customer's attention is limited, the goal is to only target the subset of prospects who are likely to engage with a specific offer. Predicting those potential customers based on readily available information like demographics, past interactions, and environmental factors is a common machine learning problem.\n", + "\n", + "This notebook presents an example problem to predict if a customer will enroll for a term deposit at a bank, after one or more phone calls. The steps include:\n", + "\n", + "* Preparing your Amazon SageMaker notebook\n", + "* Downloading data from the internet into Amazon SageMaker\n", + "* Investigating and transforming the data so that it can be fed to Amazon SageMaker algorithms\n", + "* Estimating a model using the Gradient Boosting algorithm\n", + "* Evaluating the effectiveness of the model\n", + "* Setting the model up to make on-going predictions\n", + "\n", + "---\n", + "\n", + "## Preparation\n", + "\n", + "_This notebook was created and tested on an ml.m5.xlarge notebook instance._\n", + "\n", + "Let's start by specifying:\n", + "\n", + "- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", + "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "isConfigCell": true, + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 02\n", + "import sagemaker\n", + "bucket=sagemaker.Session().default_bucket()\n", + "prefix = 'sagemaker/DEMO-xgboost-dm'\n", + " \n", + "# Define IAM role\n", + "import boto3\n", + "import re\n", + "from sagemaker import get_execution_role\n", + "\n", + "role = get_execution_role()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now let's bring in the Python libraries that we'll use throughout the analysis" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 03\n", + "import numpy as np # For matrix operations and numerical processing\n", + "import pandas as pd # For munging tabular data\n", + "import matplotlib.pyplot as plt # For charts and visualizations\n", + "from IPython.display import Image # For displaying images in the notebook\n", + "from IPython.display import display # For displaying outputs in the notebook\n", + "from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.\n", + "import sys # For writing outputs to notebook\n", + "import math # For ceiling function\n", + "import json # For parsing hosting outputs\n", + "import os # For manipulating filepath names\n", + "import sagemaker \n", + "import zipfile # Amazon SageMaker's Python SDK provides many helper functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 04\n", + "pd.__version__" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Make sure pandas version is set to 1.2.4 or later. If it is not the case, restart the kernel before going further" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Data\n", + "Let's start by downloading the [direct marketing dataset](https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip) from the sample data s3 bucket. \n", + "\n", + "\\[Moro et al., 2014\\] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 06\n", + "data = pd.read_csv('./bank-additional/bank-additional-full.csv')\n", + "pd.set_option('display.max_columns', 500) # Make sure we can see all of the columns\n", + "pd.set_option('display.max_rows', 20) # Keep the output on one page\n", + "data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's talk about the data. At a high level, we can see:\n", + "\n", + "* We have a little over 40K customer records, and 20 features for each customer\n", + "* The features are mixed; some numeric, some categorical\n", + "* The data appears to be sorted, at least by `time` and `contact`, maybe more\n", + "\n", + "_**Specifics on each of the features:**_\n", + "\n", + "*Demographics:*\n", + "* `age`: Customer's age (numeric)\n", + "* `job`: Type of job (categorical: 'admin.', 'services', ...)\n", + "* `marital`: Marital status (categorical: 'married', 'single', ...)\n", + "* `education`: Level of education (categorical: 'basic.4y', 'high.school', ...)\n", + "\n", + "*Past customer events:*\n", + "* `default`: Has credit in default? (categorical: 'no', 'unknown', ...)\n", + "* `housing`: Has housing loan? (categorical: 'no', 'yes', ...)\n", + "* `loan`: Has personal loan? (categorical: 'no', 'yes', ...)\n", + "\n", + "*Past direct marketing contacts:*\n", + "* `contact`: Contact communication type (categorical: 'cellular', 'telephone', ...)\n", + "* `month`: Last contact month of year (categorical: 'may', 'nov', ...)\n", + "* `day_of_week`: Last contact day of the week (categorical: 'mon', 'fri', ...)\n", + "* `duration`: Last contact duration, in seconds (numeric). Important note: If duration = 0 then `y` = 'no'.\n", + " \n", + "*Campaign information:*\n", + "* `campaign`: Number of contacts performed during this campaign and for this client (numeric, includes last contact)\n", + "* `pdays`: Number of days that passed by after the client was last contacted from a previous campaign (numeric)\n", + "* `previous`: Number of contacts performed before this campaign and for this client (numeric)\n", + "* `poutcome`: Outcome of the previous marketing campaign (categorical: 'nonexistent','success', ...)\n", + "\n", + "*External environment factors:*\n", + "* `emp.var.rate`: Employment variation rate - quarterly indicator (numeric)\n", + "* `cons.price.idx`: Consumer price index - monthly indicator (numeric)\n", + "* `cons.conf.idx`: Consumer confidence index - monthly indicator (numeric)\n", + "* `euribor3m`: Euribor 3 month rate - daily indicator (numeric)\n", + "* `nr.employed`: Number of employees - quarterly indicator (numeric)\n", + "\n", + "*Target variable:*\n", + "* `y`: Has the client subscribed a term deposit? (binary: 'yes','no')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Exploration\n", + "Let's start exploring the data in our data prep widget. First, let's understand how the features are distributed." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Notice that:\n", + "\n", + "* Almost 90% of the values for our target variable `y` are \"no\", so most customers did not subscribe to a term deposit.\n", + "* Many of the predictive features take on values of \"unknown\". Some are more common than others. We should think carefully as to what causes a value of \"unknown\" (are these customers non-representative in some way?) and how we that should be handled.\n", + " * Even if \"unknown\" is included as it's own distinct category, what does it mean given that, in reality, those observations likely fall within one of the other categories of that feature?\n", + "* Many of the predictive features have categories with very few observations in them. If we find a small category to be highly predictive of our target outcome, do we have enough evidence to make a generalization about that?\n", + "* Contact timing is particularly skewed. Almost a third in May and less than 1% in December. What does this mean for predicting our target variable next December?\n", + "* There are no missing values in our numeric features. Or missing values have already been imputed.\n", + " * `pdays` takes a value near 1000 for almost all customers. Likely a placeholder value signifying no previous contact.\n", + "* Several numeric features have a very long tail. Do we need to handle these few observations with extremely large values differently?\n", + "* Several numeric features (particularly the macroeconomic ones) occur in distinct buckets. Should these be treated as categorical?\n", + "\n", + "Next, let's look at how our features relate to the target that we are attempting to predict." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 07\n", + "for column in data.select_dtypes(include=['object']).columns:\n", + " if column != 'y':\n", + " print(pd.crosstab(index=data[column], columns=data['y'], normalize='columns'))\n", + "\n", + "for column in data.select_dtypes(exclude=['object']).columns:\n", + " print(column)\n", + " hist = data[[column, 'y']].hist(by='y', bins=30)\n", + " plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Notice that:\n", + "\n", + "* Customers who are-- \"blue-collar\", \"married\", \"unknown\" default status, contacted by \"telephone\", and/or in \"may\" are a substantially lower portion of \"yes\" than \"no\" for subscribing.\n", + "* Distributions for numeric variables are different across \"yes\" and \"no\" subscribing groups, but the relationships may not be straightforward or obvious.\n", + "\n", + "Now let's look at how our features relate to one another." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 08\n", + "print(data.corr(numeric_only=True))\n", + "pd.plotting.scatter_matrix(data, figsize=(12, 12))\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Notice that:\n", + "* Features vary widely in their relationship with one another. Some with highly negative correlation, others with highly positive correlation.\n", + "* Relationships between features is non-linear and discrete in many cases." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Transformation\n", + "\n", + "Cleaning up data is part of nearly every machine learning project. It arguably presents the biggest risk if done incorrectly and is one of the more subjective aspects in the process. Several common techniques include:\n", + "\n", + "* Handling missing values: Some machine learning algorithms are capable of handling missing values, but most would rather not. Options include:\n", + " * Removing observations with missing values: This works well if only a very small fraction of observations have incomplete information.\n", + " * Removing features with missing values: This works well if there are a small number of features which have a large number of missing values.\n", + " * Imputing missing values: Entire [books](https://www.amazon.com/Flexible-Imputation-Missing-Interdisciplinary-Statistics/dp/1439868247) have been written on this topic, but common choices are replacing the missing value with the mode or mean of that column's non-missing values.\n", + "* Converting categorical to numeric: The most common method is one hot encoding, which for each feature maps every distinct value of that column to its own feature which takes a value of 1 when the categorical feature is equal to that value, and 0 otherwise.\n", + "* Oddly distributed data: Although for non-linear models like Gradient Boosted Trees, this has very limited implications, parametric models like regression can produce wildly inaccurate estimates when fed highly skewed data. In some cases, simply taking the natural log of the features is sufficient to produce more normally distributed data. In others, bucketing values into discrete ranges is helpful. These buckets can then be treated as categorical variables and included in the model when one hot encoded.\n", + "* Handling more complicated data types: Mainpulating images, text, or data at varying grains is left for other notebook templates.\n", + "\n", + "Luckily, some of these aspects have already been handled for us, and the algorithm we are showcasing tends to do well at handling sparse or oddly distributed data. Therefore, let's keep pre-processing simple." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 09\n", + "\n", + "# Note: These transformations can be done through the graphical widget that we generated above. The data prep widget will automatically generate code for transformations that you do.\n", + "data['no_previous_contact'] = np.where(data['pdays'] == 999, 1, 0) # Indicator variable to capture when pdays takes a value of 999\n", + "data['not_working'] = np.where(np.in1d(data['job'], ['student', 'retired', 'unemployed']), 1, 0) # Indicator for individuals not actively employed\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 10\n", + "model_data = pd.get_dummies(data, dtype=float) # Convert categorical variables to sets of indicators" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Another question to ask yourself before building a model is whether certain features will add value in your final use case. For example, if your goal is to deliver the best prediction, then will you have access to that data at the moment of prediction? Knowing it's raining is highly predictive for umbrella sales, but forecasting weather far enough out to plan inventory on umbrellas is probably just as difficult as forecasting umbrella sales without knowledge of the weather. So, including this in your model may give you a false sense of precision.\n", + "\n", + "Following this logic, let's remove the economic features and `duration` from our data as they would need to be forecasted with high precision to use as inputs in future predictions.\n", + "\n", + "Even if we were to use values of the economic indicators from the previous quarter, this value is likely not as relevant for prospects contacted early in the next quarter as those contacted later on." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 11\n", + "model_data = model_data.drop(['duration', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed'], axis=1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When building a model whose primary goal is to predict a target value on new data, it is important to understand overfitting. Supervised learning models are designed to minimize error between their predictions of the target value and actuals, in the data they are given. This last part is key, as frequently in their quest for greater accuracy, machine learning models bias themselves toward picking up on minor idiosyncrasies within the data they are shown. These idiosyncrasies then don't repeat themselves in subsequent data, meaning those predictions can actually be made less accurate, at the expense of more accurate predictions in the training phase.\n", + "\n", + "The most common way of preventing this is to build models with the concept that a model shouldn't only be judged on its fit to the data it was trained on, but also on \"new\" data. There are several different ways of operationalizing this, holdout validation, cross-validation, leave-one-out validation, etc. For our purposes, we'll simply randomly split the data into 3 uneven groups. The model will be trained on 70% of data, it will then be evaluated on 20% of data to give us an estimate of the accuracy we hope to have on \"new\" data, and 10% will be held back as a final testing dataset which will be used later on." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 12\n", + "train_data, validation_data, test_data = np.split(model_data.sample(frac=1, random_state=1729), [int(0.7 * len(model_data)), int(0.9 * len(model_data))]) # Randomly sort the data then split out first 70%, second 20%, and last 10%" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Amazon SageMaker's XGBoost container expects data in the libSVM or CSV data format. For this example, we'll stick to CSV. Note that the first column must be the target variable and the CSV should not include headers. Also, notice that although repetitive it's easiest to do this after the train|validation|test split rather than before. This avoids any misalignment issues due to random reordering." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 13\n", + "pd.concat([train_data['y_yes'], train_data.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('train.csv', index=False, header=False)\n", + "pd.concat([validation_data['y_yes'], validation_data.drop(['y_no', 'y_yes'], axis=1)], axis=1).to_csv('validation.csv', index=False, header=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we'll copy the file to S3 for Amazon SageMaker's managed training to pickup." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# cell 14\n", + "boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')\n", + "boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## End of Lab 1\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Training\n", + "Now we know that most of our features have skewed distributions, some are highly correlated with one another, and some appear to have non-linear relationships with our target variable. Also, for targeting future prospects, good predictive accuracy is preferred to being able to explain why that prospect was targeted. Taken together, these aspects make gradient boosted trees a good candidate algorithm.\n", + "\n", + "There are several intricacies to understanding the algorithm, but at a high level, gradient boosted trees works by combining predictions from many simple models, each of which tries to address the weaknesses of the previous models. By doing this the collection of simple models can actually outperform large, complex models. Other Amazon SageMaker notebooks elaborate on gradient boosting trees further and how they differ from similar algorithms.\n", + "\n", + "`xgboost` is an extremely popular, open-source package for gradient boosted trees. It is computationally powerful, fully featured, and has been successfully used in many machine learning competitions. Let's start with a simple `xgboost` model, trained using Amazon SageMaker's managed, distributed training framework.\n", + "\n", + "First we'll need to specify the ECR container location for Amazon SageMaker's implementation of XGBoost." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 15\n", + "container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='latest')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then, because we're training with the CSV file format, we'll create `s3_input`s that our training function can use as a pointer to the files in S3, which also specify that the content type is CSV." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 16\n", + "s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')\n", + "s3_input_validation = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First we'll need to specify training parameters to the estimator. This includes:\n", + "1. The `xgboost` algorithm container\n", + "1. The IAM role to use\n", + "1. Training instance type and count\n", + "1. S3 location for output data\n", + "1. Algorithm hyperparameters\n", + "\n", + "And then a `.fit()` function which specifies:\n", + "1. S3 location for output data. In this case we have both a training and validation set which are passed in." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 17\n", + "sess = sagemaker.Session()\n", + "\n", + "xgb = sagemaker.estimator.Estimator(container,\n", + " role, \n", + " instance_count=1, \n", + " instance_type='ml.m5.xlarge',\n", + " output_path='s3://{}/{}/output'.format(bucket, prefix),\n", + " sagemaker_session=sess)\n", + "xgb.set_hyperparameters(max_depth=5,\n", + " eta=0.2,\n", + " gamma=4,\n", + " min_child_weight=6,\n", + " subsample=0.8,\n", + " silent=0,\n", + " objective='binary:logistic',\n", + " num_round=100)\n", + "\n", + "xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}, wait=False) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "xgb.logs()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Hosting\n", + "Now that we've trained the `xgboost` algorithm on our data, let's deploy a model that's hosted behind a real-time endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 18\n", + "xgb_predictor = xgb.deploy(initial_instance_count=1,\n", + " instance_type='ml.m5.xlarge')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Evaluation\n", + "\n", + "First we'll need to determine how we pass data into and receive data from our endpoint. Our data is currently stored as NumPy arrays in memory of our notebook instance. To send it in an HTTP POST request, we'll serialize it as a CSV string and then decode the resulting CSV.\n", + "\n", + "*Note: For inference with CSV format, SageMaker XGBoost requires that the data does NOT include the target variable.*" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 19\n", + "xgb_predictor.serializer = sagemaker.serializers.CSVSerializer()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's take a look at the test dataset:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We extract the first test data, drop label y_no and y_yes, and convert to list:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "i = 0\n", + "\n", + "sample_data = test_data.iloc[i].drop(['y_no', 'y_yes']).tolist()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then we invoke SageMaker real-time endpoint with the sample data for prediction (inference):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = xgb_predictor.predict(sample_data).decode('utf-8')\n", + "\n", + "predict = 'Yes' if float(result) > 0.5 else 'No'\n", + "actual = 'Yes' if test_data.iloc[i]['y_yes'] == 1 else 'No'\n", + "\n", + "print(f\"Does the sample client subscribe to term deposit?\")\n", + "print(f\"Prediction: {predict}\")\n", + "print(f\"Actual: {actual}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also invoke SageMaker endpoint via boto3:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "\n", + "endpoint_name = xgb_predictor.endpoint_name\n", + "\n", + "client = boto3.client('sagemaker-runtime', region_name=boto3.Session().region_name)\n", + "\n", + "payload = ','.join(str(e) for e in sample_data).encode(\"utf-8\")\n", + "content_type = 'text/csv'\n", + "response = client.invoke_endpoint(EndpointName=endpoint_name, Body=payload, ContentType=content_type)\n", + "\n", + "result = response['Body'].read().decode('utf-8')\n", + "\n", + "predict = 'Yes' if float(result) > 0.5 else 'No'\n", + "actual = 'Yes' if test_data.iloc[i]['y_yes'] == 1 else 'No'\n", + "\n", + "print(f\"Does the sample client subscribe to term deposit?\")\n", + "print(f\"Prediction: {predict}\")\n", + "print(f\"Actual: {actual}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Clean Up\n", + "Delete any resources you created in this notebook that you no longer wish to use." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 22\n", + "xgb_predictor.delete_endpoint(delete_endpoint_config=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "--\n", + "## Batch Transform \n", + "Apart from deploying SageMaker endpoint for real-time inference, SageMaker also supports batch inference for a list of input data. \n", + "\n", + "First, let drop label y_no and y_yes in the test dataset, then save as CSV file and upload to S3:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_data.drop(['y_no', 'y_yes'], axis=1).to_csv('test.csv', index=False, header=False)\n", + "boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then we launch a batch inference job to do prediction for the set of test data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_path = f\"s3://{bucket}/{prefix}/test/test.csv\"\n", + "transformer_output_path = f\"s3://{bucket}/{prefix}/transformer-output\"\n", + "\n", + "xgb_transformer = xgb.transformer(\n", + " instance_count=1,\n", + " instance_type='ml.m5.large',\n", + " output_path=transformer_output_path\n", + ")\n", + "\n", + "xgb_transformer.transform(\n", + " data=test_path,\n", + " data_type='S3Prefix',\n", + " content_type='text/csv'\n", + ")\n", + "\n", + "print(sagemaker.s3.S3Downloader.read_file(f\"{transformer_output_path}/test.csv.out\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Download the batch inference result." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker.s3.S3Downloader.download(f\"{transformer_output_path}/test.csv.out\", \"batch_result\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "--\n", + "## Serverless Deployment (Optional)\n", + "After training the model, retrieve the model artifacts so that we can deploy the model to an endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Setup clients\n", + "import boto3\n", + "\n", + "client = boto3.client(service_name=\"sagemaker\")\n", + "runtime = boto3.client(service_name=\"sagemaker-runtime\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Retrieve model data from training job\n", + "model_artifacts = xgb.model_data\n", + "model_artifacts" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Model Creation\n", + "Create a model by providing your model artifacts, the container image URI, environment variables for the container (if applicable), a model name, and the SageMaker IAM role." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from time import gmtime, strftime\n", + "\n", + "model_name = \"xgboost-serverless\" + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", + "print(\"Model name: \" + model_name)\n", + "\n", + "# dummy environment variables\n", + "byo_container_env_vars = {\"SAGEMAKER_CONTAINER_LOG_LEVEL\": \"20\", \"SOME_ENV_VAR\": \"myEnvVar\"}\n", + "\n", + "create_model_response = client.create_model(\n", + " ModelName=model_name,\n", + " Containers=[\n", + " {\n", + " \"Image\": container,\n", + " \"Mode\": \"SingleModel\",\n", + " \"ModelDataUrl\": model_artifacts,\n", + " \"Environment\": byo_container_env_vars,\n", + " }\n", + " ],\n", + " ExecutionRoleArn=role,\n", + ")\n", + "\n", + "print(\"Model Arn: \" + create_model_response[\"ModelArn\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Endpoint Configuration Creation\n", + "This is where you can adjust the Serverless Configuration for your endpoint. The current max concurrent invocations for a single endpoint, known as MaxConcurrency, can be any value from 1 to 200, and MemorySize can be any of the following: 1024 MB, 2048 MB, 3072 MB, 4096 MB, 5120 MB, or 6144 MB." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "xgboost_epc_name = \"xgboost-serverless-epc\" + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", + "\n", + "endpoint_config_response = client.create_endpoint_config(\n", + " EndpointConfigName=xgboost_epc_name,\n", + " ProductionVariants=[\n", + " {\n", + " \"VariantName\": \"byoVariant\",\n", + " \"ModelName\": model_name,\n", + " \"ServerlessConfig\": {\n", + " \"MemorySizeInMB\": 4096,\n", + " \"MaxConcurrency\": 1,\n", + " },\n", + " },\n", + " ],\n", + ")\n", + "\n", + "print(\"Endpoint Configuration Arn: \" + endpoint_config_response[\"EndpointConfigArn\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Serverless Endpoint Creation\n", + "Now that we have an endpoint configuration, we can create a serverless endpoint and deploy our model to it. When creating the endpoint, provide the name of your endpoint configuration and a name for the new endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_name = \"xgboost-serverless-ep\" + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", + "\n", + "create_endpoint_response = client.create_endpoint(\n", + " EndpointName=endpoint_name,\n", + " EndpointConfigName=xgboost_epc_name,\n", + ")\n", + "\n", + "print(\"Endpoint Arn: \" + create_endpoint_response[\"EndpointArn\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Wait until the endpoint status is InService before invoking the endpoint." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# wait for endpoint to reach a terminal state (InService) using describe endpoint\n", + "import time\n", + "\n", + "describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)\n", + "\n", + "while describe_endpoint_response[\"EndpointStatus\"] == \"Creating\":\n", + " describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)\n", + " print(describe_endpoint_response[\"EndpointStatus\"])\n", + " time.sleep(15)\n", + "\n", + "describe_endpoint_response" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Endpoint Invocation\n", + "Invoke the endpoint by sending a request to it. The following is a sample data point grabbed from the CSV file downloaded from the Direct Marketing dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "payload =\"29,2,999,0,1,0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0\"\n", + "response = runtime.invoke_endpoint(\n", + " EndpointName=endpoint_name,\n", + " Body=payload,\n", + " ContentType=\"text/csv\",\n", + ")\n", + "\n", + "print(response[\"Body\"].read())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Clean Up\n", + "Delete any resources you created in this notebook that you no longer wish to use." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.delete_model(ModelName=model_name)\n", + "client.delete_endpoint_config(EndpointConfigName=xgboost_epc_name)\n", + "client.delete_endpoint(EndpointName=endpoint_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Automatic model Tuning (optional)\n", + "Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose.\n", + "For example, suppose that you want to solve a binary classification problem on this marketing dataset. Your goal is to maximize the area under the curve (auc) metric of the algorithm by training an XGBoost Algorithm model. You don't know which values of the eta, alpha, min_child_weight, and max_depth hyperparameters to use to train the best model. To find the best values for these hyperparameters, you can specify ranges of values that Amazon SageMaker hyperparameter tuning searches to find the combination of values that results in the training job that performs the best as measured by the objective metric that you chose. Hyperparameter tuning launches training jobs that use hyperparameter values in the ranges that you specified, and returns the training job with highest auc.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 22\n", + "from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner\n", + "\n", + "hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),\n", + " 'min_child_weight': ContinuousParameter(1, 10),\n", + " 'alpha': ContinuousParameter(0, 2),\n", + " 'max_depth': IntegerParameter(1, 10)}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 23\n", + "objective_metric_name = 'validation:auc'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 24\n", + "tuner = HyperparameterTuner(xgb,\n", + " objective_metric_name,\n", + " hyperparameter_ranges,\n", + " max_jobs=9,\n", + " max_parallel_jobs=3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 25\n", + "tuner.fit({'train': s3_input_train, 'validation': s3_input_validation})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 26\n", + "boto3.client('sagemaker').describe_hyper_parameter_tuning_job(\n", + "HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['HyperParameterTuningJobStatus']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 27\n", + "# return the best training job name\n", + "tuner.best_training_job()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 28\n", + "# Deploy the best trained or user specified model to an Amazon SageMaker endpoint\n", + "tuner_predictor = tuner.deploy(initial_instance_count=1,\n", + " instance_type='ml.m5.xlarge')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 29\n", + "# Create a serializer\n", + "tuner_predictor.serializer = sagemaker.serializers.CSVSerializer()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 30\n", + "# Predict\n", + "i = 0\n", + "\n", + "sample_data = test_data.iloc[i].drop(['y_no', 'y_yes']).tolist()\n", + "\n", + "result = tuner_predictor.predict(sample_data).decode('utf-8')\n", + "\n", + "predict = 'Yes' if float(result) > 0.5 else 'No'\n", + "actual = 'Yes' if test_data.iloc[i]['y_yes'] == 1 else 'No'\n", + "\n", + "print(f\"Does the sample client subscribe to term deposit?\")\n", + "print(f\"Prediction: {predict}\")\n", + "print(f\"Actual: {actual}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Extensions\n", + "\n", + "This example analyzed a relatively small dataset, but utilized Amazon SageMaker features such as distributed, managed training and real-time model hosting, which could easily be applied to much larger problems. In order to improve predictive accuracy further, we could tweak value we threshold our predictions at to alter the mix of false-positives and false-negatives, or we could explore techniques like hyperparameter tuning. In a real-world scenario, we would also spend more time engineering features by hand and would likely look for additional datasets to include which contain customer information not available in our initial dataset." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Clean-up\n", + "\n", + "If you are done with this notebook, please run the cell below. This will remove the hosted endpoint you created and avoid any charges from a stray instance being left on." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cell 33\n", + "tuner_predictor.delete_endpoint(delete_endpoint_config=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "celltoolbar": "Tags", + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + }, + "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/feature-engineering-and-training/numpy_xgboost_direct_marketing_sagemaker.ipynb b/feature-engineering-and-training/numpy_xgboost_direct_marketing_sagemaker.ipynb index 99a00b0..57d76f3 100644 --- a/feature-engineering-and-training/numpy_xgboost_direct_marketing_sagemaker.ipynb +++ b/feature-engineering-and-training/numpy_xgboost_direct_marketing_sagemaker.ipynb @@ -41,7 +41,7 @@ "\n", "## Preparation\n", "\n", - "_This notebook was created and tested on an ml.m4.xlarge notebook instance._\n", + "_This notebook was created and tested on an ml.m5.xlarge notebook instance._\n", "\n", "Let's start by specifying:\n", "\n", @@ -482,7 +482,7 @@ "xgb = sagemaker.estimator.Estimator(container,\n", " role, \n", " instance_count=1, \n", - " instance_type='ml.m4.xlarge',\n", + " instance_type='ml.m5.xlarge',\n", " output_path='s3://{}/{}/output'.format(bucket, prefix),\n", " sagemaker_session=sess)\n", "xgb.set_hyperparameters(max_depth=5,\n", @@ -494,7 +494,16 @@ " objective='binary:logistic',\n", " num_round=100)\n", "\n", - "xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}) " + "xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}, wait=False) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "xgb.logs()" ] }, { @@ -515,7 +524,7 @@ "source": [ "# cell 18\n", "xgb_predictor = xgb.deploy(initial_instance_count=1,\n", - " instance_type='ml.m4.xlarge')" + " instance_type='ml.m5.xlarge')" ] }, { @@ -525,7 +534,6 @@ "---\n", "\n", "## Evaluation\n", - "There are many ways to compare the performance of a machine learning model, but let's start by simply comparing actual to predicted values. In this case, we're simply predicting whether the customer subscribed to a term deposit (`1`) or not (`0`), which produces a simple confusion matrix.\n", "\n", "First we'll need to determine how we pass data into and receive data from our endpoint. Our data is currently stored as NumPy arrays in memory of our notebook instance. To send it in an HTTP POST request, we'll serialize it as a CSV string and then decode the resulting CSV.\n", "\n", @@ -546,12 +554,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Now, we'll use a simple function to:\n", - "1. Loop over our test dataset\n", - "1. Split it into mini-batches of rows \n", - "1. Convert those mini-batches to CSV string payloads (notice, we drop the target variable from our dataset first)\n", - "1. Retrieve mini-batch predictions by invoking the XGBoost endpoint\n", - "1. Collect predictions and convert from the CSV output our model provides into a NumPy array" + "Let's take a look at the test dataset:" ] }, { @@ -560,23 +563,32 @@ "metadata": {}, "outputs": [], "source": [ - "# cell 20\n", - "def predict(data, predictor, rows=500 ):\n", - " split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))\n", - " predictions = ''\n", - " for array in split_array:\n", - " predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])\n", - "\n", - " return np.fromstring(predictions[1:], sep=',')\n", + "test_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We extract the first test data, drop label y_no and y_yes, and convert to list:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "i = 0\n", "\n", - "predictions = predict(test_data.drop(['y_no', 'y_yes'], axis=1).to_numpy(), xgb_predictor)" + "sample_data = test_data.iloc[i].drop(['y_no', 'y_yes']).tolist()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Now we'll check our confusion matrix to see how well we predicted versus actuals." + "Then we invoke SageMaker real-time endpoint with the sample data for prediction (inference):" ] }, { @@ -585,17 +597,47 @@ "metadata": {}, "outputs": [], "source": [ - "# cell 21\n", - "pd.crosstab(index=test_data['y_yes'], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])" + "result = xgb_predictor.predict(sample_data).decode('utf-8')\n", + "\n", + "predict = 'Yes' if float(result) > 0.5 else 'No'\n", + "actual = 'Yes' if test_data.iloc[i]['y_yes'] == 1 else 'No'\n", + "\n", + "print(f\"Does the sample client subscribe to term deposit?\")\n", + "print(f\"Prediction: {predict}\")\n", + "print(f\"Actual: {actual}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "So, of the ~4000 potential customers, we predicted 136 would subscribe and 94 of them actually did. We also had 389 subscribers who subscribed that we did not predict would. This is less than desirable, but the model can (and should) be tuned to improve this. Most importantly, note that with minimal effort, our model produced accuracies similar to those published [here](https://core.ac.uk/download/pdf/55631291.pdf).\n", + "You can also invoke SageMaker endpoint via boto3:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "\n", + "endpoint_name = xgb_predictor.endpoint_name\n", + "\n", + "client = boto3.client('sagemaker-runtime', region_name=boto3.Session().region_name)\n", + "\n", + "payload = ','.join(str(e) for e in sample_data).encode(\"utf-8\")\n", + "content_type = 'text/csv'\n", + "response = client.invoke_endpoint(EndpointName=endpoint_name, Body=payload, ContentType=content_type)\n", + "\n", + "result = response['Body'].read().decode('utf-8')\n", + "\n", + "predict = 'Yes' if float(result) > 0.5 else 'No'\n", + "actual = 'Yes' if test_data.iloc[i]['y_yes'] == 1 else 'No'\n", "\n", - "_Note that because there is some element of randomness in the algorithm's subsample, your results may differ slightly from the text written above._" + "print(f\"Does the sample client subscribe to term deposit?\")\n", + "print(f\"Prediction: {predict}\")\n", + "print(f\"Actual: {actual}\")" ] }, { @@ -616,6 +658,74 @@ "xgb_predictor.delete_endpoint(delete_endpoint_config=True)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "--\n", + "## Batch Transform \n", + "Apart from deploying SageMaker endpoint for real-time inference, SageMaker also supports batch inference for a list of input data. \n", + "\n", + "First, let drop label y_no and y_yes in the test dataset, then save as CSV file and upload to S3:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_data.drop(['y_no', 'y_yes'], axis=1).to_csv('test.csv', index=False, header=False)\n", + "boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'test/test.csv')).upload_file('test.csv')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then we launch a batch inference job to do prediction for the set of test data:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_path = f\"s3://{bucket}/{prefix}/test/test.csv\"\n", + "transformer_output_path = f\"s3://{bucket}/{prefix}/transformer-output\"\n", + "\n", + "xgb_transformer = xgb.transformer(\n", + " instance_count=1,\n", + " instance_type='ml.m5.large',\n", + " output_path=transformer_output_path\n", + ")\n", + "\n", + "xgb_transformer.transform(\n", + " data=test_path,\n", + " data_type='S3Prefix',\n", + " content_type='text/csv'\n", + ")\n", + "\n", + "print(sagemaker.s3.S3Downloader.read_file(f\"{transformer_output_path}/test.csv.out\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Download the batch inference result." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker.s3.S3Downloader.download(f\"{transformer_output_path}/test.csv.out\", \"batch_result\")" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -830,10 +940,11 @@ "source": [ "# cell 22\n", "from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner\n", + "\n", "hyperparameter_ranges = {'eta': ContinuousParameter(0, 1),\n", - " 'min_child_weight': ContinuousParameter(1, 10),\n", - " 'alpha': ContinuousParameter(0, 2),\n", - " 'max_depth': IntegerParameter(1, 10)}\n" + " 'min_child_weight': ContinuousParameter(1, 10),\n", + " 'alpha': ContinuousParameter(0, 2),\n", + " 'max_depth': IntegerParameter(1, 10)}" ] }, { @@ -856,8 +967,8 @@ "tuner = HyperparameterTuner(xgb,\n", " objective_metric_name,\n", " hyperparameter_ranges,\n", - " max_jobs=20,\n", - " max_parallel_jobs=3)\n" + " max_jobs=9,\n", + " max_parallel_jobs=3)" ] }, { @@ -901,7 +1012,7 @@ "# cell 28\n", "# Deploy the best trained or user specified model to an Amazon SageMaker endpoint\n", "tuner_predictor = tuner.deploy(initial_instance_count=1,\n", - " instance_type='ml.m4.xlarge')" + " instance_type='ml.m5.xlarge')" ] }, { @@ -923,18 +1034,18 @@ "source": [ "# cell 30\n", "# Predict\n", - "predictions = predict(test_data.drop(['y_no', 'y_yes'], axis=1).to_numpy(),tuner_predictor)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# cell 31\n", - "# Collect predictions and convert from the CSV output our model provides into a NumPy array\n", - "pd.crosstab(index=test_data['y_yes'], columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])" + "i = 0\n", + "\n", + "sample_data = test_data.iloc[i].drop(['y_no', 'y_yes']).tolist()\n", + "\n", + "result = tuner_predictor.predict(sample_data).decode('utf-8')\n", + "\n", + "predict = 'Yes' if float(result) > 0.5 else 'No'\n", + "actual = 'Yes' if test_data.iloc[i]['y_yes'] == 1 else 'No'\n", + "\n", + "print(f\"Does the sample client subscribe to term deposit?\")\n", + "print(f\"Prediction: {predict}\")\n", + "print(f\"Actual: {actual}\")" ] }, { @@ -993,7 +1104,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.11.11" }, "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, diff --git a/mlops/fm-evaluation-at-scale-main/notebooks/.gitattributes b/mlops/fm-evaluation-at-scale-main/notebooks/.gitattributes new file mode 100644 index 0000000..2f41707 --- /dev/null +++ b/mlops/fm-evaluation-at-scale-main/notebooks/.gitattributes @@ -0,0 +1 @@ +*.ipynb filter=strip-notebook-output diff --git a/mlops/fm-evaluation-at-scale-main/notebooks/.ipynb_checkpoints/sagemaker-pipelines-train-pipeline-checkpoint.ipynb b/mlops/fm-evaluation-at-scale-main/notebooks/.ipynb_checkpoints/sagemaker-pipelines-train-pipeline-checkpoint.ipynb new file mode 100644 index 0000000..5be6f1c --- /dev/null +++ b/mlops/fm-evaluation-at-scale-main/notebooks/.ipynb_checkpoints/sagemaker-pipelines-train-pipeline-checkpoint.ipynb @@ -0,0 +1,649 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ce11b1f7-803e-4623-8499-97478594f1e9", + "metadata": {}, + "source": [ + "# Option 1. Train Pipeline (SageMaker Pipelines)" + ] + }, + { + "cell_type": "markdown", + "id": "67f8f66b-5391-484d-af30-273f6039bb8d", + "metadata": {}, + "source": [ + "- [Overview](#overview)\n", + "- [Build the pipeline components](#build-the-pipeline-components)\n", + " 1. [Import statements and declare parameters and constants](#import-statements-and-declare-parameters-and-constants)\n", + " 2. [Collect and prepare data](#collect-and-prepare-data)\n", + " 3. [Define Processing Step](#define-processing-step)\n", + " 4. [Define HyperParameter Tuning Step](#define-hyperparameter-tuning-step)\n", + " 5. [Define the evaluation script and model evaluation step](#define-the-evaluation-script-and-model-evaluation-step)\n", + " 6. [Define a register model step](#define-a-register-model-step)\n", + " 7. [Define a condition step to check AUC score](#define-a-condition-step-to-check-auc-score)\n", + "- [Build and Trigger the pipeline run](#build-and-trigger-the-pipeline-run)" + ] + }, + { + "cell_type": "markdown", + "id": "189b06ac-9c5d-49ca-8df6-dc53283494be", + "metadata": {}, + "source": [ + "## Overview" + ] + }, + { + "cell_type": "markdown", + "id": "2f4cd6d8-b36d-474a-af41-027c26cbff06", + "metadata": {}, + "source": [ + "The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train the model.\n", + "\n", + "![](images/Train_Pipeline_Pattern.png)\n", + "\n", + "Train Pipeline consists of the following steps:\n", + "\n", + "1. Preprocess data to build features required and split data into train, validation, and test datasets.\n", + "2. Apply hyperparameter tuning based on the ranges provided with the SageMaker XGBoost framework to give the best model, which is determined based on AUC score.\n", + "3. Evaluate the trained model using the test dataset and check if the AUC score is above a predefined threshold.\n", + "4. Check if the AUC score is greater than the threshold, if true register the model into SageMaker model registry." + ] + }, + { + "cell_type": "markdown", + "id": "ad0c0074-467f-46e9-a57b-091e8db6d171", + "metadata": {}, + "source": [ + "## Build the pipeline components" + ] + }, + { + "cell_type": "markdown", + "id": "c4d89f34-3526-41df-a964-5f2a7112f48c", + "metadata": {}, + "source": [ + "### Step 1: Import statements and declare parameters and constants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70816378-1c97-4511-931c-b1dbd3d91e4c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import boto3 \n", + "import pandas as pd \n", + "import sagemaker \n", + "from sagemaker.workflow.pipeline_context import PipelineSession \n", + "\n", + "s3_client = boto3.resource('s3') \n", + "pipeline_name = f\"sagemaker-immersion-train-pipeline\" \n", + "sagemaker_session = sagemaker.session.Session() \n", + "region = sagemaker_session.boto_region_name \n", + "role = sagemaker.get_execution_role() \n", + "pipeline_session = PipelineSession() \n", + "default_bucket = sagemaker_session.default_bucket() \n", + "model_package_group_name = f\"ChurnModelPackageGroup\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c0d50be-8cef-45e6-9f7c-827c05a98c27", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.workflow.parameters import ( \n", + " ParameterInteger, \n", + " ParameterString, \n", + " ParameterFloat) \n", + "\n", + "auc_score_threshold = 0.75 \n", + "base_job_prefix = \"churn-example\"\n", + "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", + "processing_instance_type = ParameterString( name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\") \n", + "training_instance_type = ParameterString( name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\") \n", + "input_data = \"storedata_total.csv\" \n", + "model_approval_status = ParameterString( name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "a5dfc86a-7e27-40f3-aa27-0748be17fc6a", + "metadata": {}, + "source": [ + "### Step 2: Collect and prepare data" + ] + }, + { + "cell_type": "markdown", + "id": "d0f50d77-c9c9-48c1-a960-e88fe27d6b3f", + "metadata": {}, + "source": [ + "To follow along with this lab, you need to download and save the [_sample dataset_](https://www.kaggle.com/uttamp/store-data) into the project directly within the SageMaker Studio environment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8782a087", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install openpyxl" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8f3fbb77-805e-4c7f-8c0c-a80f596c7178", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# convert the store_data file into csv format \n", + "store_data = pd.read_excel(\"storedata_total.xlsx\") \n", + "store_data.to_csv(\"storedata_total.csv\")" + ] + }, + { + "cell_type": "markdown", + "id": "fa42ce11-837e-4d3c-bcba-ea37e9e43bed", + "metadata": {}, + "source": [ + "### Step 3: Define Processing Step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e85b6816-3ef0-43ec-a5eb-615a8f804917", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "%%writefile \"churn_preprocess.py\"\n", + "import os\n", + "import tempfile\n", + "import numpy as np\n", + "import pandas as pd\n", + "import datetime as dt\n", + "if __name__ == \"__main__\":\n", + " base_dir = \"/opt/ml/processing\"\n", + " #Read Data\n", + " df = pd.read_csv(\n", + " f\"{base_dir}/input/storedata_total.csv\"\n", + " )\n", + " # convert created column to datetime\n", + " df[\"created\"] = pd.to_datetime(df[\"created\"])\n", + " #Convert firstorder and lastorder to datetime datatype\n", + " df[\"firstorder\"] = pd.to_datetime(df[\"firstorder\"],errors='coerce')\n", + " df[\"lastorder\"] = pd.to_datetime(df[\"lastorder\"],errors='coerce')\n", + " #Drop Rows with Null Values\n", + " df = df.dropna()\n", + " #Create column which gives the days between the last order and the first order\n", + " df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days\n", + " #Create column which gives the days between the customer record was created and the first order\n", + " df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days\n", + " #Drop columns\n", + " df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)\n", + " #Apply one hot encoding on favday and city columns\n", + " df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])\n", + " # Split into train, validation and test datasets\n", + " y = df.pop(\"retained\")\n", + " X_pre = df\n", + " y_pre = y.to_numpy().reshape(len(y), 1)\n", + " X = np.concatenate((y_pre, X_pre), axis=1)\n", + " np.random.shuffle(X)\n", + " # Split in Train, Test and Validation Datasets\n", + " train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])\n", + " train_rows = np.shape(train)[0]\n", + " validation_rows = np.shape(validation)[0]\n", + " test_rows = np.shape(test)[0]\n", + " train = pd.DataFrame(train)\n", + " test = pd.DataFrame(test)\n", + " validation = pd.DataFrame(validation)\n", + " # Convert the label column to integer\n", + " train[0] = train[0].astype(int)\n", + " test[0] = test[0].astype(int)\n", + " validation[0] = validation[0].astype(int)\n", + " # Save the Dataframes as csv files\n", + " train.to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", + " validation.to_csv(f\"{base_dir}/validation/validation.csv\", header=False, index=False)\n", + " test.to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f53f8523-f2d1-4d75-b18e-7bcb5fd2c1b4", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Define Processing Step for Feature Engineering\n", + "from sagemaker.sklearn.processing import SKLearnProcessor\n", + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", + "from sagemaker.workflow.steps import ProcessingStep\n", + "\n", + "framework_version = \"1.0-1\"\n", + "sklearn_processor = SKLearnProcessor(\n", + " framework_version=framework_version,\n", + " instance_type=\"ml.m5.xlarge\",\n", + " instance_count=processing_instance_count,\n", + " base_job_name=\"sklearn-churn-process\",\n", + " role=role,\n", + " sagemaker_session=pipeline_session,\n", + ")\n", + "processor_args = sklearn_processor.run(\n", + " inputs=[\n", + " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"), \n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\",\\\n", + " destination=f\"s3://{default_bucket}/output/train\" ),\n", + " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\",\\\n", + " destination=f\"s3://{default_bucket}/output/validation\"),\n", + " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\",\\\n", + " destination=f\"s3://{default_bucket}/output/test\")\n", + " ],\n", + " code=f\"churn_preprocess.py\",\n", + ")\n", + "step_process = ProcessingStep(name=\"ChurnModelProcess\", step_args=processor_args)" + ] + }, + { + "cell_type": "markdown", + "id": "966e4ef6-2077-4281-a769-d492e156b7f6", + "metadata": {}, + "source": [ + "### Step 4: Define HyperParameter Tuning Step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6fe5621-bc56-479f-a4ca-f7636866c0b2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.estimator import Estimator\n", + "from sagemaker.inputs import TrainingInput\n", + "from sagemaker.tuner import (\n", + " IntegerParameter,\n", + " CategoricalParameter,\n", + " ContinuousParameter,\n", + " HyperparameterTuner,\n", + ")\n", + "from sagemaker.workflow.steps import TuningStep\n", + "\n", + "# training step for generating model artifacts\n", + "model_path = f\"s3://{default_bucket}/output\"\n", + "image_uri = sagemaker.image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=region,\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\",\n", + " instance_type=training_instance_type,\n", + ")\n", + "fixed_hyperparameters = {\n", + "\"eval_metric\":\"auc\",\n", + "\"objective\":\"binary:logistic\",\n", + "\"num_round\":\"100\",\n", + "\"rate_drop\":\"0.3\",\n", + "\"tweedie_variance_power\":\"1.4\"\n", + "}\n", + "xgb_train = Estimator(\n", + " image_uri=image_uri,\n", + " instance_type=training_instance_type,\n", + " instance_count=1,\n", + " hyperparameters=fixed_hyperparameters,\n", + " output_path=model_path,\n", + " base_job_name=f\"churn-train\",\n", + " sagemaker_session=pipeline_session,\n", + " role=role,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fd27af1-28c8-4af5-a80e-08314ecd9088", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "hyperparameter_ranges = {\n", + "\"eta\": ContinuousParameter(0, 1),\n", + "\"min_child_weight\": ContinuousParameter(1, 10),\n", + "\"alpha\": ContinuousParameter(0, 2),\n", + "\"max_depth\": IntegerParameter(1, 10),\n", + "}\n", + "objective_metric_name = \"validation:auc\"\n", + "\n", + "tuner = HyperparameterTuner(\n", + " xgb_train,\n", + " objective_metric_name,\n", + " hyperparameter_ranges,\n", + " max_jobs=2,\n", + " max_parallel_jobs=2,\n", + ")\n", + "\n", + "hpo_args = tuner.fit(\n", + " inputs={\n", + " \"train\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " \"validation\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"validation\"\n", + " ].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " }\n", + ")\n", + "\n", + "step_tuning = TuningStep(\n", + " name=\"ChurnHyperParameterTuning\",\n", + " step_args=hpo_args,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "99223372-35b0-424f-982a-2d9e4f442373", + "metadata": {}, + "source": [ + "### Step 5: Define the evaluation script and model evaluation step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "175e88f4-6894-42ec-9fda-e7ffc9f5ba41", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "%%writefile \"churn_evaluate.py\"\n", + "import json\n", + "import pathlib\n", + "import pickle\n", + "import tarfile\n", + "import joblib\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xgboost\n", + "import datetime as dt\n", + "from sklearn.metrics import roc_curve,auc\n", + "if __name__ == \"__main__\": \n", + " #Read Model Tar File\n", + " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", + " with tarfile.open(model_path) as tar:\n", + " tar.extractall(path=\".\")\n", + " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", + " #Read Test Data using which we evaluate the model\n", + " test_path = \"/opt/ml/processing/test/test.csv\"\n", + " df = pd.read_csv(test_path, header=None)\n", + " y_test = df.iloc[:, 0].to_numpy()\n", + " df.drop(df.columns[0], axis=1, inplace=True)\n", + " X_test = xgboost.DMatrix(df.values)\n", + " #Run Predictions\n", + " predictions = model.predict(X_test)\n", + " #Evaluate Predictions\n", + " fpr, tpr, thresholds = roc_curve(y_test, predictions)\n", + " auc_score = auc(fpr, tpr)\n", + " report_dict = {\n", + " \"classification_metrics\": {\n", + " \"auc_score\": {\n", + " \"value\": auc_score,\n", + " },\n", + " },\n", + " }\n", + " #Save Evaluation Report\n", + " output_dir = \"/opt/ml/processing/evaluation\"\n", + " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", + " evaluation_path = f\"{output_dir}/evaluation.json\"\n", + " with open(evaluation_path, \"w\") as f:\n", + " f.write(json.dumps(report_dict))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cedbe3a-9127-44d7-835d-4aa57a06b061", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# define model evaluation step to evaluate the trained model\n", + "from sagemaker.processing import ScriptProcessor\n", + "script_eval = ScriptProcessor(\n", + " image_uri=image_uri,\n", + " command=[\"python3\"],\n", + " instance_type=processing_instance_type,\n", + " instance_count=1,\n", + " base_job_name=\"script-churn-eval\",\n", + " role=role,\n", + " sagemaker_session=pipeline_session,\n", + ")\n", + "eval_args = script_eval.run(\n", + " inputs=[\n", + " ProcessingInput(\n", + " source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=\"output\"),\n", + " destination=\"/opt/ml/processing/model\"\n", + " ),\n", + " ProcessingInput(\n", + " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"test\"\n", + " ].S3Output.S3Uri,\n", + " destination=\"/opt/ml/processing/test\"\n", + " )\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\",\\\n", + " destination=f\"s3://{default_bucket}/output/evaluation\"),\n", + " ],\n", + " code=f\"churn_evaluate.py\",\n", + ")\n", + "from sagemaker.workflow.properties import PropertyFile\n", + "\n", + "evaluation_report = PropertyFile(\n", + " name=\"ChurnEvaluationReport\", output_name=\"evaluation\", path=\"evaluation.json\"\n", + ")\n", + "step_eval = ProcessingStep(\n", + " name=\"ChurnEvalModel\",\n", + " step_args=eval_args,\n", + " property_files=[evaluation_report],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "37520bf7-21fb-44e9-b741-6c195e2803df", + "metadata": {}, + "source": [ + "### Step 6: Define a register model step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f797e1ed-af79-491e-aac5-3d15101e8ec7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker import Model\n", + "from sagemaker.workflow.model_step import ModelStep\n", + "\n", + "model = Model(\n", + " image_uri=image_uri,\n", + " model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=\"output\"),\n", + " sagemaker_session=pipeline_session,\n", + " role=role,\n", + ")\n", + "from sagemaker.model_metrics import MetricsSource, ModelMetrics\n", + "\n", + "model_metrics = ModelMetrics(\n", + " model_statistics=MetricsSource(\n", + " s3_uri=\"{}/evaluation.json\".format(\n", + " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", + " ),\n", + " content_type=\"application/json\",\n", + " )\n", + ")\n", + "register_args = model.register(\n", + " content_types=[\"text/csv\"],\n", + " response_types=[\"text/csv\"],\n", + " inference_instances=[\"ml.t2.medium\", \"ml.m5.xlarge\"],\n", + " transform_instances=[\"ml.m5.xlarge\"],\n", + " model_package_group_name=model_package_group_name,\n", + " approval_status=model_approval_status,\n", + " model_metrics=model_metrics,\n", + ")\n", + "step_register = ModelStep(name=\"ChurnRegisterModel\", step_args=register_args)" + ] + }, + { + "cell_type": "markdown", + "id": "9e90a2b3-6de9-4e57-8b95-dc2e4c16100b", + "metadata": {}, + "source": [ + "### Step 7: Define a condition step to check AUC score" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "362a1c48-25c4-4c33-bdbe-13779a9efba4", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.workflow.conditions import ConditionGreaterThan\n", + "from sagemaker.workflow.condition_step import ConditionStep\n", + "from sagemaker.workflow.functions import JsonGet\n", + "cond_lte = ConditionGreaterThan(\n", + " left=JsonGet(\n", + " step_name=step_eval.name,\n", + " property_file=evaluation_report,\n", + " json_path=\"classification_metrics.auc_score.value\",\n", + " ),\n", + " right=auc_score_threshold,\n", + ")\n", + "step_cond = ConditionStep(\n", + " name=\"CheckAUCScoreChurnEvaluation\",\n", + " conditions=[cond_lte],\n", + " if_steps=[step_register],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "cdafd7f0-907b-4b55-9c43-e67888e26cc6", + "metadata": {}, + "source": [ + "## Build and Trigger the pipeline run" + ] + }, + { + "cell_type": "markdown", + "id": "d648ad67-80e5-404f-b47e-27df019c6d81", + "metadata": {}, + "source": [ + "After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3123803e-f286-4acc-b016-4620eb2017a7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import json\n", + "from sagemaker.workflow.pipeline import Pipeline\n", + "\n", + "pipeline = Pipeline(\n", + " name=pipeline_name,\n", + " parameters=[\n", + " processing_instance_count,\n", + " processing_instance_type,\n", + " training_instance_type,\n", + " model_approval_status,\n", + " input_data,\n", + " auc_score_threshold,\n", + " ],\n", + " steps=[step_process, step_tuning, step_eval, step_cond],\n", + ") \n", + "definition = json.loads(pipeline.definition())\n", + "print(definition)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ee7f3338-f9d3-49b5-8d70-9bfdb7676770", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create a new or update existing Pipeline\n", + "pipeline.upsert(role_arn=role)\n", + "# start Pipeline execution\n", + "pipeline.start()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6a3c03a9-b445-44c7-90b1-0cc8e451b429", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/mlops/fm-evaluation-at-scale-main/notebooks/sagemaker-pipelines-inference-pipeline.ipynb b/mlops/fm-evaluation-at-scale-main/notebooks/sagemaker-pipelines-inference-pipeline.ipynb new file mode 100644 index 0000000..90d4c82 --- /dev/null +++ b/mlops/fm-evaluation-at-scale-main/notebooks/sagemaker-pipelines-inference-pipeline.ipynb @@ -0,0 +1,556 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "8079f1d5-633a-4142-9f30-e35dd7e293bf", + "metadata": {}, + "source": [ + "# Option 2: Batch Inference Pipeline (SageMaker Pipelines)" + ] + }, + { + "cell_type": "markdown", + "id": "1c8698d9-a7ee-4094-8a7d-f68d2f3a4fea", + "metadata": {}, + "source": [ + "- [Overview](#overview)\n", + "- [Register model into SageMaker Model Registry](#register-model-into-sagemaker-model-registry)\n", + " 1. [Upload Model Artifact to S3 Bucket](#upload-model-artifact-to-s3-bucket)\n", + " 2. [Create Model Group](#create-model-group)\n", + " 3. [Register Model in Model Registry](#register-model-in-model-registry)\n", + " 4. [Approve Model in Model Registry](#approve-model-in-model-registry)\n", + "- [Build the pipeline components](#build-the-pipeline-components)\n", + " 1. [Import statements and declare parameters and constants](#import-statements-and-declare-parameters-and-constants)\n", + " 2. [Generate Data for Inferences](#generate-data-for-inferences)\n", + " 3. [Upload Inferences Data to S3 Bucket](#upload-inferences-data-to-s3-bucket)\n", + " 4. [Info about the Trained Model (An Approved ModelPackage in SageMaker Model Registry)](#info-about-the-trained-model)\n", + " 5. [Define create model step](#define-create-model-step)\n", + " 6. [Define Transform Step to Perform Batch Transformation](#define-transform-step-to-perform-batch-transformation)\n", + "- [Build and Trigger the pipeline run](#build-and-trigger-the-pipeline-run)" + ] + }, + { + "cell_type": "markdown", + "id": "e23a3b9e-7668-48d9-be37-14fba386a3ea", + "metadata": {}, + "source": [ + "## Overview" + ] + }, + { + "cell_type": "markdown", + "id": "0f09cae1-4a09-4169-8a2b-5d668823fcac", + "metadata": {}, + "source": [ + "The following diagram illustrates the high-level architecture of the ML workflow with the different steps to generate inferences using the trained model artifacts.\n", + "\n", + "![](images/Batch_Inference_Pipeline.png)\n", + "\n", + "Inference Pipeline consists of the following steps:\n", + "\n", + "1. Create a model in SageMaker using the latest approved model from SageMaker Model Registry.\n", + "2. Generate Inferences using the trained model artifacts." + ] + }, + { + "cell_type": "markdown", + "id": "1e2e5e88-932a-4be3-87c5-c271d12c5d5a", + "metadata": {}, + "source": [ + "## Register model into SageMaker Model Registry" + ] + }, + { + "cell_type": "markdown", + "id": "a5e31f86-f906-46ca-b901-4502bd8b4067", + "metadata": {}, + "source": [ + "If running through this lab independently, go through the optional step of uploading the model artifact [customer-retention-model.tar.gz](https://github.com/aws-samples/amazon-sagemaker-immersion-day/blob/master/model/customer-retention-model.tar.gz) into S3 Bucket, registering the model into SageMaker Model Registry and approving the model." + ] + }, + { + "cell_type": "markdown", + "id": "9b72edfa-c990-4028-ad49-55c92ccf0580", + "metadata": {}, + "source": [ + "### Upload Model Artifact to S3 Bucket" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ef277d34-efe2-47b7-89bc-907a6b21a39b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import boto3\n", + "import sagemaker \n", + "\n", + "sagemaker_session = sagemaker.session.Session() \n", + "default_bucket = sagemaker_session.default_bucket() \n", + "s3_client = boto3.resource('s3') \n", + "s3_client.Bucket(default_bucket).upload_file(\"model/customer-retention-model.tar.gz\",\"churn/model_artifacts/customer-retention-model.tar.gz\")" + ] + }, + { + "cell_type": "markdown", + "id": "f324de94-ff99-4ea0-933a-c8b7162c6e02", + "metadata": {}, + "source": [ + "### Create Model Group" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "d73a9582-5f5d-4978-9167-04cd58b340af", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import time\n", + "import os\n", + "\n", + "model_package_group_name = f\"ChurnModelPackageGroup\"\n", + "region = sagemaker_session.boto_region_name\n", + "sm_client = boto3.client('sagemaker', region_name=region)\n", + "model_package_group_input_dict = {\n", + " \"ModelPackageGroupName\" : model_package_group_name,\n", + "}\n", + "\n", + "create_model_package_group_response = sm_client.create_model_package_group(**model_package_group_input_dict)\n", + "print('ModelPackageGroup Arn : {}'.format(create_model_package_group_response['ModelPackageGroupArn']))" + ] + }, + { + "cell_type": "markdown", + "id": "da47100f-5cf5-40fc-a759-8a318d0e6791", + "metadata": {}, + "source": [ + "### Register Model in Model Registry" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "19594e56-6bfa-4880-bd95-a1d491636bfe", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# retrieve the image uri used to train model\n", + "image_uri = sagemaker.image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=region,\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\"\n", + ")\n", + "\n", + "# Specify the model source\n", + "model_url = f\"s3://{default_bucket}/churn/model_artifacts/customer-retention-model.tar.gz\"\n", + "\n", + "modelpackage_inference_specification = {\n", + " \"InferenceSpecification\": {\n", + " \"Containers\": [\n", + " {\n", + " \"Image\": image_uri,\n", + " \"ModelDataUrl\": model_url\n", + " }\n", + " ],\n", + " \"SupportedContentTypes\": [ \"text/csv\" ],\n", + " \"SupportedResponseMIMETypes\": [ \"text/csv\" ],\n", + " }\n", + " }\n", + "\n", + "# Alternatively, you can specify the model source like this:\n", + "# modelpackage_inference_specification[\"InferenceSpecification\"][\"Containers\"][0][\"ModelDataUrl\"]=model_url\n", + "\n", + "create_model_package_input_dict = {\n", + " \"ModelPackageGroupName\" : model_package_group_name,\n", + " \"ModelPackageDescription\" : \"Model to detect 3 different types of irises (Setosa, Versicolour, and Virginica)\",\n", + " \"ModelApprovalStatus\" : \"PendingManualApproval\"\n", + "}\n", + "create_model_package_input_dict.update(modelpackage_inference_specification)\n", + "\n", + "create_model_package_response = sm_client.create_model_package(**create_model_package_input_dict)\n", + "model_package_arn = create_model_package_response[\"ModelPackageArn\"]\n", + "print('ModelPackage Version ARN : {}'.format(model_package_arn))" + ] + }, + { + "cell_type": "markdown", + "id": "3d83079d-a7cc-43b6-9f13-5f0ac00ad70b", + "metadata": {}, + "source": [ + "### Approve Model in Model Registry" + ] + }, + { + "cell_type": "markdown", + "id": "e951008e-9d23-4f5f-b952-ccfd70ce8442", + "metadata": { + "tags": [] + }, + "source": [ + "The model registered within model registry can be checked by going to the home screen and choosing the *Models → Model Registry.*\n", + "\n", + "![](images/image1.png)\n", + "\n", + "you can click on the Update Status tab and manually approve the model.\n", + "\n", + "![](images/image2.png)\n", + "\n", + "![](images/image3.png)" + ] + }, + { + "cell_type": "markdown", + "id": "8f300dea-8931-489b-8bda-9f354434ceb3", + "metadata": {}, + "source": [ + "## Build the pipeline components" + ] + }, + { + "cell_type": "markdown", + "id": "c2bf34a2-6166-47ed-8dcb-8e149864356b", + "metadata": {}, + "source": [ + "### Step 1: Import statements and declare parameters and constants" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "be7536c9-9072-4600-9c69-935e07c447a1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import boto3 \n", + "import pandas as pd \n", + "import sagemaker \n", + "from sagemaker.workflow.pipeline_context import PipelineSession \n", + "\n", + "s3_client = boto3.resource('s3') \n", + "pipeline_name = f\"sagemaker-immersion-inference-pipeline\" \n", + "sagemaker_session = sagemaker.session.Session() \n", + "region = sagemaker_session.boto_region_name \n", + "role = sagemaker.get_execution_role() \n", + "pipeline_session = PipelineSession() \n", + "default_bucket = sagemaker_session.default_bucket() \n", + "model_package_group_name = f\"ChurnModelPackageGroup\"" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "d42fe2cb-7206-4e6d-8474-f3f3b021ed39", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.workflow.parameters import ( \n", + " ParameterInteger, \n", + " ParameterString, \n", + " ParameterFloat) \n", + "\n", + "base_job_prefix = \"churn-example\"\n", + "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", + "processing_instance_type = ParameterString( name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\")\n", + "transform_instance_type = ParameterString(name=\"TransformInstanceType\", default_value=\"ml.m5.xlarge\")\n", + "transform_instance_count = ParameterInteger(name=\"TransformInstanceCount\", default_value=1)\n", + "batch_data_path = \"s3://{}/data/batch/batch.csv\".format(default_bucket)\n", + "model_approval_status = ParameterString( name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\")" + ] + }, + { + "cell_type": "markdown", + "id": "c11dfcf9-b27c-4dd4-bbcb-a72c1aebacf2", + "metadata": {}, + "source": [ + "### Step 2: Generate Data for Inferences" + ] + }, + { + "cell_type": "markdown", + "id": "6a31a24b-3197-410b-91a7-3f168df1b010", + "metadata": { + "tags": [] + }, + "source": [ + "If doing this lab independently, you need to download and save the [_sample dataset_](https://www.kaggle.com/uttamp/store-data) into the project directly within the SageMaker Studio environment." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "8165a7f1-d78e-49f9-9276-44b74e1422e5", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def preprocess_batch_data(file_path):\n", + " df = pd.read_csv(file_path)\n", + " ## Convert to datetime columns\n", + " df[\"firstorder\"]=pd.to_datetime(df[\"firstorder\"],errors='coerce')\n", + " df[\"lastorder\"] = pd.to_datetime(df[\"lastorder\"],errors='coerce')\n", + " ## Drop Rows with null values\n", + " df = df.dropna()\n", + " ## Create Column which gives the days between the last order and the first order\n", + " df[\"first_last_days_diff\"] = (df['lastorder']-df['firstorder']).dt.days\n", + " ## Create Column which gives the days between when the customer record was created and the first order\n", + " df['created'] = pd.to_datetime(df['created'])\n", + " df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days\n", + " ## Drop Columns\n", + " df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)\n", + " ## Apply one hot encoding on favday and city columns\n", + " df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])\n", + " return df\n", + " \n", + "# convert the store_data file into csv format\n", + "store_data = pd.read_excel(\"storedata_total.xlsx\")\n", + "store_data.to_csv(\"storedata_total.csv\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "97cc8374-583e-4518-b073-58b46325cc5d", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# preprocess batch data and save into the data folder\n", + "batch_data = preprocess_batch_data(\"storedata_total.csv\")\n", + "batch_data.pop(\"retained\")\n", + "batch_sample = batch_data.sample(frac=0.2)\n", + "pd.DataFrame(batch_sample).to_csv(\"batch.csv\",header=False,index=False)" + ] + }, + { + "cell_type": "markdown", + "id": "d5dfc3e3-279c-49db-81c0-f5812e84a5c9", + "metadata": {}, + "source": [ + "### Step 3: Upload Inferences Data to S3 Bucket" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "a2215572-0e83-4c35-b4ce-8171a83ae361", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "s3_client.Bucket(default_bucket).upload_file(\"batch.csv\",\"data/batch/batch.csv\")" + ] + }, + { + "cell_type": "markdown", + "id": "792d5996-9e9c-41cd-9706-2efc7b0b19ef", + "metadata": {}, + "source": [ + "### Step 4: Info about the Trained Model" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "dc16c8c6-e677-4dbe-b145-c6df4393a7c6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "sm_client = boto3.client(\"sagemaker\") \n", + "\n", + "# get a list of approved model packages from the model package group you specified earlier\n", + "approved_model_packages = sm_client.list_model_packages(\n", + " ModelApprovalStatus='Approved',\n", + " ModelPackageGroupName=model_package_group_name,\n", + " SortBy='CreationTime',\n", + " SortOrder='Descending'\n", + " )\n", + "\n", + "# find the latest approved model package\n", + "try:\n", + " latest_approved_model_package_arn = approved_model_packages['ModelPackageSummaryList'][0]['ModelPackageArn']\n", + "except Exception as e:\n", + " print(\"Failed to retrieve an approved model package:\", e)\n", + " \n", + "print(latest_approved_model_package_arn) \n", + "\n", + " # retrieve required information about the model\n", + "latest_approved_model_package_descr = sm_client.describe_model_package(ModelPackageName = latest_approved_model_package_arn)\n", + "\n", + "# model artifact uri (tar.gz file)\n", + "model_artifact_uri = latest_approved_model_package_descr['InferenceSpecification']['Containers'][0]['ModelDataUrl']\n", + "# sagemaker image in ecr\n", + "image_uri = latest_approved_model_package_descr['InferenceSpecification']['Containers'][0]['Image']" + ] + }, + { + "cell_type": "markdown", + "id": "abfe078c-f1b4-4619-aa3f-88806814e227", + "metadata": {}, + "source": [ + "### Step 5: Define create model step" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "62dbe26b-fdd5-446c-8712-3d05f992a65c", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline_context.py:261: UserWarning: Running within a PipelineSession, there will be No Wait, No Logs, and No Job being started.\n", + " UserWarning,\n" + ] + } + ], + "source": [ + "from sagemaker import Model\n", + "from sagemaker.inputs import CreateModelInput\n", + "from sagemaker.workflow.model_step import ModelStep\n", + "\n", + "model = Model(\n", + "image_uri=image_uri,\n", + "model_data=model_artifact_uri,\n", + "sagemaker_session=pipeline_session,\n", + "role=role\n", + ")\n", + "\n", + "step_create_model = ModelStep(\n", + "name=\"ChurnCreateModel\",\n", + "step_args=model.create(instance_type=\"ml.m5.large\"),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "6f7fd0a9-38a7-4fac-b2fc-0493c6508a3d", + "metadata": { + "tags": [] + }, + "source": [ + "### Step 6: Define Transform Step to Perform Batch Transformation" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "0980c5f7-c0f7-4c6c-bb7e-2bca34891858", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.transformer import Transformer\n", + "from sagemaker.inputs import TransformInput\n", + "from sagemaker.workflow.steps import TransformStep\n", + "\n", + "transformer = Transformer(\n", + " model_name=step_create_model.properties.ModelName,\n", + " instance_type=\"ml.m5.xlarge\",\n", + " instance_count=1,\n", + " output_path=f\"s3://{default_bucket}/ChurnTransform\",\n", + " sagemaker_session=pipeline_session\n", + ")\n", + " \n", + "step_transform = TransformStep(\n", + " name=\"ChurnTransform\", \n", + " step_args=transformer.transform(\n", + " data=batch_data_path,\n", + " content_type=\"text/csv\"\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "95d93c2d-f4e2-4bd0-af39-0e9a359b095f", + "metadata": {}, + "source": [ + "## Build and Trigger the pipeline run" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "22b01d03-2f8b-42b5-ae28-286040e89dcb", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.workflow.pipeline import Pipeline\n", + "\n", + "pipeline = Pipeline(\n", + " name=pipeline_name,\n", + " parameters=[\n", + " processing_instance_type,\n", + " processing_instance_count,\n", + " transform_instance_type,\n", + " transform_instance_count,\n", + " batch_data,\n", + " ],\n", + " steps=[step_create_model,step_transform],\n", + ") " + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "e7545be1-c39c-4f8e-bdc5-407da51b6fed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create a new or update existing Pipeline\n", + "pipeline.upsert(role_arn=role)\n", + "# start Pipeline execution\n", + "pipeline.start()" + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (Data Science)", + "language": "python", + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/datascience-1.0" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/mlops/fm-evaluation-at-scale-main/notebooks/sagemaker-pipelines-train-pipeline.ipynb b/mlops/fm-evaluation-at-scale-main/notebooks/sagemaker-pipelines-train-pipeline.ipynb new file mode 100644 index 0000000..5be6f1c --- /dev/null +++ b/mlops/fm-evaluation-at-scale-main/notebooks/sagemaker-pipelines-train-pipeline.ipynb @@ -0,0 +1,649 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ce11b1f7-803e-4623-8499-97478594f1e9", + "metadata": {}, + "source": [ + "# Option 1. Train Pipeline (SageMaker Pipelines)" + ] + }, + { + "cell_type": "markdown", + "id": "67f8f66b-5391-484d-af30-273f6039bb8d", + "metadata": {}, + "source": [ + "- [Overview](#overview)\n", + "- [Build the pipeline components](#build-the-pipeline-components)\n", + " 1. [Import statements and declare parameters and constants](#import-statements-and-declare-parameters-and-constants)\n", + " 2. [Collect and prepare data](#collect-and-prepare-data)\n", + " 3. [Define Processing Step](#define-processing-step)\n", + " 4. [Define HyperParameter Tuning Step](#define-hyperparameter-tuning-step)\n", + " 5. [Define the evaluation script and model evaluation step](#define-the-evaluation-script-and-model-evaluation-step)\n", + " 6. [Define a register model step](#define-a-register-model-step)\n", + " 7. [Define a condition step to check AUC score](#define-a-condition-step-to-check-auc-score)\n", + "- [Build and Trigger the pipeline run](#build-and-trigger-the-pipeline-run)" + ] + }, + { + "cell_type": "markdown", + "id": "189b06ac-9c5d-49ca-8df6-dc53283494be", + "metadata": {}, + "source": [ + "## Overview" + ] + }, + { + "cell_type": "markdown", + "id": "2f4cd6d8-b36d-474a-af41-027c26cbff06", + "metadata": {}, + "source": [ + "The following diagram illustrates the high-level architecture of the ML workflow with the different steps to train the model.\n", + "\n", + "![](images/Train_Pipeline_Pattern.png)\n", + "\n", + "Train Pipeline consists of the following steps:\n", + "\n", + "1. Preprocess data to build features required and split data into train, validation, and test datasets.\n", + "2. Apply hyperparameter tuning based on the ranges provided with the SageMaker XGBoost framework to give the best model, which is determined based on AUC score.\n", + "3. Evaluate the trained model using the test dataset and check if the AUC score is above a predefined threshold.\n", + "4. Check if the AUC score is greater than the threshold, if true register the model into SageMaker model registry." + ] + }, + { + "cell_type": "markdown", + "id": "ad0c0074-467f-46e9-a57b-091e8db6d171", + "metadata": {}, + "source": [ + "## Build the pipeline components" + ] + }, + { + "cell_type": "markdown", + "id": "c4d89f34-3526-41df-a964-5f2a7112f48c", + "metadata": {}, + "source": [ + "### Step 1: Import statements and declare parameters and constants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70816378-1c97-4511-931c-b1dbd3d91e4c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import boto3 \n", + "import pandas as pd \n", + "import sagemaker \n", + "from sagemaker.workflow.pipeline_context import PipelineSession \n", + "\n", + "s3_client = boto3.resource('s3') \n", + "pipeline_name = f\"sagemaker-immersion-train-pipeline\" \n", + "sagemaker_session = sagemaker.session.Session() \n", + "region = sagemaker_session.boto_region_name \n", + "role = sagemaker.get_execution_role() \n", + "pipeline_session = PipelineSession() \n", + "default_bucket = sagemaker_session.default_bucket() \n", + "model_package_group_name = f\"ChurnModelPackageGroup\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c0d50be-8cef-45e6-9f7c-827c05a98c27", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.workflow.parameters import ( \n", + " ParameterInteger, \n", + " ParameterString, \n", + " ParameterFloat) \n", + "\n", + "auc_score_threshold = 0.75 \n", + "base_job_prefix = \"churn-example\"\n", + "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", + "processing_instance_type = ParameterString( name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\") \n", + "training_instance_type = ParameterString( name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\") \n", + "input_data = \"storedata_total.csv\" \n", + "model_approval_status = ParameterString( name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "a5dfc86a-7e27-40f3-aa27-0748be17fc6a", + "metadata": {}, + "source": [ + "### Step 2: Collect and prepare data" + ] + }, + { + "cell_type": "markdown", + "id": "d0f50d77-c9c9-48c1-a960-e88fe27d6b3f", + "metadata": {}, + "source": [ + "To follow along with this lab, you need to download and save the [_sample dataset_](https://www.kaggle.com/uttamp/store-data) into the project directly within the SageMaker Studio environment." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8782a087", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install openpyxl" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8f3fbb77-805e-4c7f-8c0c-a80f596c7178", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# convert the store_data file into csv format \n", + "store_data = pd.read_excel(\"storedata_total.xlsx\") \n", + "store_data.to_csv(\"storedata_total.csv\")" + ] + }, + { + "cell_type": "markdown", + "id": "fa42ce11-837e-4d3c-bcba-ea37e9e43bed", + "metadata": {}, + "source": [ + "### Step 3: Define Processing Step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e85b6816-3ef0-43ec-a5eb-615a8f804917", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "%%writefile \"churn_preprocess.py\"\n", + "import os\n", + "import tempfile\n", + "import numpy as np\n", + "import pandas as pd\n", + "import datetime as dt\n", + "if __name__ == \"__main__\":\n", + " base_dir = \"/opt/ml/processing\"\n", + " #Read Data\n", + " df = pd.read_csv(\n", + " f\"{base_dir}/input/storedata_total.csv\"\n", + " )\n", + " # convert created column to datetime\n", + " df[\"created\"] = pd.to_datetime(df[\"created\"])\n", + " #Convert firstorder and lastorder to datetime datatype\n", + " df[\"firstorder\"] = pd.to_datetime(df[\"firstorder\"],errors='coerce')\n", + " df[\"lastorder\"] = pd.to_datetime(df[\"lastorder\"],errors='coerce')\n", + " #Drop Rows with Null Values\n", + " df = df.dropna()\n", + " #Create column which gives the days between the last order and the first order\n", + " df['first_last_days_diff'] = (df['lastorder'] - df['firstorder']).dt.days\n", + " #Create column which gives the days between the customer record was created and the first order\n", + " df['created_first_days_diff'] = (df['created'] - df['firstorder']).dt.days\n", + " #Drop columns\n", + " df.drop(['custid', 'created','firstorder','lastorder'], axis=1, inplace=True)\n", + " #Apply one hot encoding on favday and city columns\n", + " df = pd.get_dummies(df, prefix=['favday', 'city'], columns=['favday', 'city'])\n", + " # Split into train, validation and test datasets\n", + " y = df.pop(\"retained\")\n", + " X_pre = df\n", + " y_pre = y.to_numpy().reshape(len(y), 1)\n", + " X = np.concatenate((y_pre, X_pre), axis=1)\n", + " np.random.shuffle(X)\n", + " # Split in Train, Test and Validation Datasets\n", + " train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])\n", + " train_rows = np.shape(train)[0]\n", + " validation_rows = np.shape(validation)[0]\n", + " test_rows = np.shape(test)[0]\n", + " train = pd.DataFrame(train)\n", + " test = pd.DataFrame(test)\n", + " validation = pd.DataFrame(validation)\n", + " # Convert the label column to integer\n", + " train[0] = train[0].astype(int)\n", + " test[0] = test[0].astype(int)\n", + " validation[0] = validation[0].astype(int)\n", + " # Save the Dataframes as csv files\n", + " train.to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", + " validation.to_csv(f\"{base_dir}/validation/validation.csv\", header=False, index=False)\n", + " test.to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f53f8523-f2d1-4d75-b18e-7bcb5fd2c1b4", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Define Processing Step for Feature Engineering\n", + "from sagemaker.sklearn.processing import SKLearnProcessor\n", + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", + "from sagemaker.workflow.steps import ProcessingStep\n", + "\n", + "framework_version = \"1.0-1\"\n", + "sklearn_processor = SKLearnProcessor(\n", + " framework_version=framework_version,\n", + " instance_type=\"ml.m5.xlarge\",\n", + " instance_count=processing_instance_count,\n", + " base_job_name=\"sklearn-churn-process\",\n", + " role=role,\n", + " sagemaker_session=pipeline_session,\n", + ")\n", + "processor_args = sklearn_processor.run(\n", + " inputs=[\n", + " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"), \n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\",\\\n", + " destination=f\"s3://{default_bucket}/output/train\" ),\n", + " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\",\\\n", + " destination=f\"s3://{default_bucket}/output/validation\"),\n", + " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\",\\\n", + " destination=f\"s3://{default_bucket}/output/test\")\n", + " ],\n", + " code=f\"churn_preprocess.py\",\n", + ")\n", + "step_process = ProcessingStep(name=\"ChurnModelProcess\", step_args=processor_args)" + ] + }, + { + "cell_type": "markdown", + "id": "966e4ef6-2077-4281-a769-d492e156b7f6", + "metadata": {}, + "source": [ + "### Step 4: Define HyperParameter Tuning Step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6fe5621-bc56-479f-a4ca-f7636866c0b2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.estimator import Estimator\n", + "from sagemaker.inputs import TrainingInput\n", + "from sagemaker.tuner import (\n", + " IntegerParameter,\n", + " CategoricalParameter,\n", + " ContinuousParameter,\n", + " HyperparameterTuner,\n", + ")\n", + "from sagemaker.workflow.steps import TuningStep\n", + "\n", + "# training step for generating model artifacts\n", + "model_path = f\"s3://{default_bucket}/output\"\n", + "image_uri = sagemaker.image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=region,\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\",\n", + " instance_type=training_instance_type,\n", + ")\n", + "fixed_hyperparameters = {\n", + "\"eval_metric\":\"auc\",\n", + "\"objective\":\"binary:logistic\",\n", + "\"num_round\":\"100\",\n", + "\"rate_drop\":\"0.3\",\n", + "\"tweedie_variance_power\":\"1.4\"\n", + "}\n", + "xgb_train = Estimator(\n", + " image_uri=image_uri,\n", + " instance_type=training_instance_type,\n", + " instance_count=1,\n", + " hyperparameters=fixed_hyperparameters,\n", + " output_path=model_path,\n", + " base_job_name=f\"churn-train\",\n", + " sagemaker_session=pipeline_session,\n", + " role=role,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fd27af1-28c8-4af5-a80e-08314ecd9088", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "hyperparameter_ranges = {\n", + "\"eta\": ContinuousParameter(0, 1),\n", + "\"min_child_weight\": ContinuousParameter(1, 10),\n", + "\"alpha\": ContinuousParameter(0, 2),\n", + "\"max_depth\": IntegerParameter(1, 10),\n", + "}\n", + "objective_metric_name = \"validation:auc\"\n", + "\n", + "tuner = HyperparameterTuner(\n", + " xgb_train,\n", + " objective_metric_name,\n", + " hyperparameter_ranges,\n", + " max_jobs=2,\n", + " max_parallel_jobs=2,\n", + ")\n", + "\n", + "hpo_args = tuner.fit(\n", + " inputs={\n", + " \"train\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " \"validation\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"validation\"\n", + " ].S3Output.S3Uri,\n", + " content_type=\"text/csv\",\n", + " ),\n", + " }\n", + ")\n", + "\n", + "step_tuning = TuningStep(\n", + " name=\"ChurnHyperParameterTuning\",\n", + " step_args=hpo_args,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "99223372-35b0-424f-982a-2d9e4f442373", + "metadata": {}, + "source": [ + "### Step 5: Define the evaluation script and model evaluation step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "175e88f4-6894-42ec-9fda-e7ffc9f5ba41", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "%%writefile \"churn_evaluate.py\"\n", + "import json\n", + "import pathlib\n", + "import pickle\n", + "import tarfile\n", + "import joblib\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xgboost\n", + "import datetime as dt\n", + "from sklearn.metrics import roc_curve,auc\n", + "if __name__ == \"__main__\": \n", + " #Read Model Tar File\n", + " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", + " with tarfile.open(model_path) as tar:\n", + " tar.extractall(path=\".\")\n", + " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", + " #Read Test Data using which we evaluate the model\n", + " test_path = \"/opt/ml/processing/test/test.csv\"\n", + " df = pd.read_csv(test_path, header=None)\n", + " y_test = df.iloc[:, 0].to_numpy()\n", + " df.drop(df.columns[0], axis=1, inplace=True)\n", + " X_test = xgboost.DMatrix(df.values)\n", + " #Run Predictions\n", + " predictions = model.predict(X_test)\n", + " #Evaluate Predictions\n", + " fpr, tpr, thresholds = roc_curve(y_test, predictions)\n", + " auc_score = auc(fpr, tpr)\n", + " report_dict = {\n", + " \"classification_metrics\": {\n", + " \"auc_score\": {\n", + " \"value\": auc_score,\n", + " },\n", + " },\n", + " }\n", + " #Save Evaluation Report\n", + " output_dir = \"/opt/ml/processing/evaluation\"\n", + " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", + " evaluation_path = f\"{output_dir}/evaluation.json\"\n", + " with open(evaluation_path, \"w\") as f:\n", + " f.write(json.dumps(report_dict))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cedbe3a-9127-44d7-835d-4aa57a06b061", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# define model evaluation step to evaluate the trained model\n", + "from sagemaker.processing import ScriptProcessor\n", + "script_eval = ScriptProcessor(\n", + " image_uri=image_uri,\n", + " command=[\"python3\"],\n", + " instance_type=processing_instance_type,\n", + " instance_count=1,\n", + " base_job_name=\"script-churn-eval\",\n", + " role=role,\n", + " sagemaker_session=pipeline_session,\n", + ")\n", + "eval_args = script_eval.run(\n", + " inputs=[\n", + " ProcessingInput(\n", + " source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=\"output\"),\n", + " destination=\"/opt/ml/processing/model\"\n", + " ),\n", + " ProcessingInput(\n", + " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"test\"\n", + " ].S3Output.S3Uri,\n", + " destination=\"/opt/ml/processing/test\"\n", + " )\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\",\\\n", + " destination=f\"s3://{default_bucket}/output/evaluation\"),\n", + " ],\n", + " code=f\"churn_evaluate.py\",\n", + ")\n", + "from sagemaker.workflow.properties import PropertyFile\n", + "\n", + "evaluation_report = PropertyFile(\n", + " name=\"ChurnEvaluationReport\", output_name=\"evaluation\", path=\"evaluation.json\"\n", + ")\n", + "step_eval = ProcessingStep(\n", + " name=\"ChurnEvalModel\",\n", + " step_args=eval_args,\n", + " property_files=[evaluation_report],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "37520bf7-21fb-44e9-b741-6c195e2803df", + "metadata": {}, + "source": [ + "### Step 6: Define a register model step" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f797e1ed-af79-491e-aac5-3d15101e8ec7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker import Model\n", + "from sagemaker.workflow.model_step import ModelStep\n", + "\n", + "model = Model(\n", + " image_uri=image_uri,\n", + " model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix=\"output\"),\n", + " sagemaker_session=pipeline_session,\n", + " role=role,\n", + ")\n", + "from sagemaker.model_metrics import MetricsSource, ModelMetrics\n", + "\n", + "model_metrics = ModelMetrics(\n", + " model_statistics=MetricsSource(\n", + " s3_uri=\"{}/evaluation.json\".format(\n", + " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", + " ),\n", + " content_type=\"application/json\",\n", + " )\n", + ")\n", + "register_args = model.register(\n", + " content_types=[\"text/csv\"],\n", + " response_types=[\"text/csv\"],\n", + " inference_instances=[\"ml.t2.medium\", \"ml.m5.xlarge\"],\n", + " transform_instances=[\"ml.m5.xlarge\"],\n", + " model_package_group_name=model_package_group_name,\n", + " approval_status=model_approval_status,\n", + " model_metrics=model_metrics,\n", + ")\n", + "step_register = ModelStep(name=\"ChurnRegisterModel\", step_args=register_args)" + ] + }, + { + "cell_type": "markdown", + "id": "9e90a2b3-6de9-4e57-8b95-dc2e4c16100b", + "metadata": {}, + "source": [ + "### Step 7: Define a condition step to check AUC score" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "362a1c48-25c4-4c33-bdbe-13779a9efba4", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from sagemaker.workflow.conditions import ConditionGreaterThan\n", + "from sagemaker.workflow.condition_step import ConditionStep\n", + "from sagemaker.workflow.functions import JsonGet\n", + "cond_lte = ConditionGreaterThan(\n", + " left=JsonGet(\n", + " step_name=step_eval.name,\n", + " property_file=evaluation_report,\n", + " json_path=\"classification_metrics.auc_score.value\",\n", + " ),\n", + " right=auc_score_threshold,\n", + ")\n", + "step_cond = ConditionStep(\n", + " name=\"CheckAUCScoreChurnEvaluation\",\n", + " conditions=[cond_lte],\n", + " if_steps=[step_register],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "cdafd7f0-907b-4b55-9c43-e67888e26cc6", + "metadata": {}, + "source": [ + "## Build and Trigger the pipeline run" + ] + }, + { + "cell_type": "markdown", + "id": "d648ad67-80e5-404f-b47e-27df019c6d81", + "metadata": {}, + "source": [ + "After defining all of the component steps, you can assemble them into a Pipelines object. You don’t need to specify the order of pipeline because Pipelines automatically infers the order sequence based on the dependencies between the steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3123803e-f286-4acc-b016-4620eb2017a7", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import json\n", + "from sagemaker.workflow.pipeline import Pipeline\n", + "\n", + "pipeline = Pipeline(\n", + " name=pipeline_name,\n", + " parameters=[\n", + " processing_instance_count,\n", + " processing_instance_type,\n", + " training_instance_type,\n", + " model_approval_status,\n", + " input_data,\n", + " auc_score_threshold,\n", + " ],\n", + " steps=[step_process, step_tuning, step_eval, step_cond],\n", + ") \n", + "definition = json.loads(pipeline.definition())\n", + "print(definition)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ee7f3338-f9d3-49b5-8d70-9bfdb7676770", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create a new or update existing Pipeline\n", + "pipeline.upsert(role_arn=role)\n", + "# start Pipeline execution\n", + "pipeline.start()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6a3c03a9-b445-44c7-90b1-0cc8e451b429", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/mlops/fm-evaluation-at-scale-main/notebooks/storedata_total.xlsx b/mlops/fm-evaluation-at-scale-main/notebooks/storedata_total.xlsx new file mode 100644 index 0000000..15e839e Binary files /dev/null and b/mlops/fm-evaluation-at-scale-main/notebooks/storedata_total.xlsx differ