From 2739b30ccf11c43e958ac27249af283c013ea315 Mon Sep 17 00:00:00 2001 From: Eugen N Date: Thu, 13 Jun 2024 09:35:06 -0400 Subject: [PATCH 1/2] PLAT-183978 [aep-cmle:datarobot] Adapt codebase to support AWS along Azure as part of DLZ --- conf/config.ini | 4 +- .../datarobot/Week1Notebook_datarobot.ipynb | 17 +- .../datarobot/Week2Notebook_datarobot.ipynb | 221 +++++++----------- .../datarobot/Week3Notebook_datarobot.ipynb | 150 +++++++----- .../datarobot/Week4Notebook_datarobot.ipynb | 160 +++++++------ .../datarobot/Week5Notebook_datarobot.ipynb | 110 ++++++--- 6 files changed, 339 insertions(+), 323 deletions(-) diff --git a/conf/config.ini b/conf/config.ini index 804c04f..884c7ae 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -20,8 +20,8 @@ compression_type=gzip model_name=cmle_propensity_model [DataRobot] -datarobot_key = -datarobot_endpoint = +datarobot_key = +datarobot_endpoint = [AWS] s3_bucket_name= diff --git a/notebooks/datarobot/Week1Notebook_datarobot.ipynb b/notebooks/datarobot/Week1Notebook_datarobot.ipynb index 0f3e566..d7531fc 100644 --- a/notebooks/datarobot/Week1Notebook_datarobot.ipynb +++ b/notebooks/datarobot/Week1Notebook_datarobot.ipynb @@ -71,12 +71,8 @@ "sandbox_name = config.get(\"Platform\", \"sandbox_name\")\n", "client_id = config.get(\"Authentication\", \"client_id\")\n", "client_secret = config.get(\"Authentication\", \"client_secret\")\n", - "private_key_path = config.get(\"Authentication\", \"private_key_path\")\n", - "tech_account_id = config.get(\"Authentication\", \"tech_acct_id\")\n", - "dataset_id = config.get(\"Platform\", \"dataset_id\")\n", - " \n", - "if not os.path.exists(private_key_path):\n", - " raise Exception(f\"Looking for private key file under {private_key_path} but key not found, please verify path\")" + "scopes = config.get(\"Authentication\", \"scopes\")\n", + "dataset_id = config.get(\"Platform\", \"dataset_id\")" ] }, { @@ -214,9 +210,7 @@ "source": [ "We will now need to configure the aepp library and setup authentication credentials. For this please setup the following pieces of information. For information about how you can get these, please refer to the `Setup` section of the **Readme**:\n", "- Client ID\n", - "- Client secret\n", - "- Private key\n", - "- Technical account ID" + "- Client secret" ] }, { @@ -239,9 +233,8 @@ "import aepp\n", "aepp.configure(\n", " org_id=ims_org_id,\n", - " tech_id=tech_account_id, \n", " secret=client_secret,\n", - " path_to_key=private_key_path,\n", + " scopes=scopes,\n", " client_id=client_id,\n", " environment=environment,\n", " sandbox=sandbox_name\n", @@ -4794,7 +4787,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.12.2" }, "nteract": { "version": "nteract-front-end@1.0.0" diff --git a/notebooks/datarobot/Week2Notebook_datarobot.ipynb b/notebooks/datarobot/Week2Notebook_datarobot.ipynb index 8fc03ec..7e52a4c 100644 --- a/notebooks/datarobot/Week2Notebook_datarobot.ipynb +++ b/notebooks/datarobot/Week2Notebook_datarobot.ipynb @@ -2,7 +2,6 @@ "cells": [ { "cell_type": "markdown", - "id": "ac09ddfb", "metadata": {}, "source": [ "# Scope of Notebook" @@ -10,7 +9,6 @@ }, { "cell_type": "markdown", - "id": "b42d6f83", "metadata": {}, "source": [ "The goal of this notebook is to showcase how you can prepare data for the future goal of consumption by an ML model, and leveraging functionality in the Adobe Experience Platform to generate features at scale and make it available in your choice of cloud storage.\n", @@ -26,7 +24,6 @@ }, { "cell_type": "markdown", - "id": "fc8c97ed", "metadata": {}, "source": [ "# Setup" @@ -34,7 +31,6 @@ }, { "cell_type": "markdown", - "id": "405c4d19", "metadata": {}, "source": [ "This notebook requires some configuration data to properly authenticate to your Adobe Experience Platform instance. You should be able to find all the values required above by following the Setup section of the **README**.\n", @@ -45,7 +41,6 @@ { "cell_type": "code", "execution_count": 1, - "id": "e6591794", "metadata": {}, "outputs": [], "source": [ @@ -68,21 +63,16 @@ "environment = config.get(\"Platform\", \"environment\")\n", "client_id = config.get(\"Authentication\", \"client_id\")\n", "client_secret = config.get(\"Authentication\", \"client_secret\")\n", - "private_key_path = config.get(\"Authentication\", \"private_key_path\")\n", - "tech_account_id = config.get(\"Authentication\", \"tech_acct_id\")\n", + "scopes = config.get(\"Authentication\", \"scopes\")\n", "dataset_id = config.get(\"Platform\", \"dataset_id\")\n", "export_path = config.get(\"Cloud\", \"export_path\")\n", "data_format = config.get(\"Cloud\", \"data_format\")\n", "compression_type = config.get(\"Cloud\", \"compression_type\")\n", - "username=os.getlogin()\n", - " \n", - "if not os.path.exists(private_key_path):\n", - " raise Exception(f\"Looking for private key file under {private_key_path} but key not found, please verify path\")" + "username=os.getlogin() " ] }, { "cell_type": "markdown", - "id": "47c11ff7", "metadata": {}, "source": [ "Some utility functions that will be used throughout this notebook:" @@ -91,7 +81,6 @@ { "cell_type": "code", "execution_count": 2, - "id": "a1b3d5c9", "metadata": {}, "outputs": [], "source": [ @@ -105,7 +94,6 @@ }, { "cell_type": "markdown", - "id": "6dda86eb", "metadata": {}, "source": [ "Before we run anything, make sure to install the following required libraries for this notebook. They are all publicly available libraries and the latest version should work fine.\n", @@ -120,7 +108,6 @@ { "cell_type": "code", "execution_count": 3, - "id": "bf128411", "metadata": {}, "outputs": [ { @@ -150,27 +137,26 @@ ], "source": [ "!pip install aepp\n", - "!pip install pygresql" + "!pip install pygresql\n", + "!pip install s3fs\n", + "!pip install adlfs\n", + "!pip install fsspec" ] }, { "cell_type": "markdown", - "id": "4de58f29", "metadata": {}, "source": [ "We'll be using the [aepp Python library](https://github.com/pitchmuc/aepp) here to interact with AEP APIs and create a schema and dataset suitable for adding our synthetic data further down the line. This library simply provides a programmatic interface around the REST APIs, but all these steps could be completed similarly using the raw APIs directly or even in the UI. For more information on the underlying APIs please see [the API reference guide](https://developer.adobe.com/experience-platform-apis/).\n", "\n", "Before any calls can take place, we need to configure the library and setup authentication credentials. For this you'll need the following piece of information. For information about how you can get these, please refer to the `Setup` section of the **README**:\n", "- Client ID\n", - "- Client secret\n", - "- Private key\n", - "- Technical account ID" + "- Client secret" ] }, { "cell_type": "code", "execution_count": 3, - "id": "527b93be", "metadata": {}, "outputs": [], "source": [ @@ -178,9 +164,8 @@ "\n", "aepp.configure(\n", " org_id=ims_org_id,\n", - " tech_id=tech_account_id, \n", " secret=client_secret,\n", - " path_to_key=private_key_path,\n", + " scopes=scopes,\n", " client_id=client_id,\n", " sandbox=sandbox_name,\n", " environment=environment\n", @@ -189,7 +174,6 @@ }, { "cell_type": "markdown", - "id": "ffa3ae6c", "metadata": {}, "source": [ "# 1. Creating Featurization Query" @@ -197,7 +181,6 @@ }, { "cell_type": "markdown", - "id": "7dc91620", "metadata": {}, "source": [ "In the previous week we created some synthetic data under a dataset in your Adobe Experience Platform instance, and now we're ready to use it to generate features that can then be fed to our ML model. For this purpose we'll be using the [Query Service](https://experienceleague.adobe.com/docs/experience-platform/query/home.html?lang=en#) which lets us access data from any dataset and run queries at scale. The end goal here is to compress this dataset into a small subset of meaningful features that will be relevant to our model." @@ -205,7 +188,6 @@ }, { "cell_type": "markdown", - "id": "49f67fe0", "metadata": {}, "source": [ "Before we can issue queries, we need to find the table name corresponding to our dataset. Please make sure your dataset ID was entered in your configuration as part of the setup, it should be available under the `dataset_id` variable in this notebook.\n", @@ -216,7 +198,6 @@ { "cell_type": "code", "execution_count": 4, - "id": "48136f6c", "metadata": {}, "outputs": [ { @@ -242,7 +223,6 @@ }, { "cell_type": "markdown", - "id": "ca46a9a3", "metadata": {}, "source": [ "And because some of the data we created in the previous week is under a custom field group, we need to fetch your tenant ID since the data will be nested under it. This can be accomplished simply with the code below" @@ -251,7 +231,6 @@ { "cell_type": "code", "execution_count": 5, - "id": "86179fda", "metadata": {}, "outputs": [ { @@ -276,7 +255,6 @@ }, { "cell_type": "markdown", - "id": "a504ec24", "metadata": {}, "source": [ "Now we can use that table to query it via Query Service. Every query is able to run at scale leveraging Spark-based distributed computing power in the backend, so the goal is to take this large dataset, extract meaningful features and only keep a smaller subset to feed into a ML model.\n", @@ -287,7 +265,6 @@ { "cell_type": "code", "execution_count": 6, - "id": "645d030b", "metadata": {}, "outputs": [], "source": [ @@ -301,7 +278,6 @@ }, { "cell_type": "markdown", - "id": "3a37cbf7", "metadata": {}, "source": [ "
\n", @@ -311,7 +287,6 @@ }, { "cell_type": "markdown", - "id": "4e7e2589", "metadata": {}, "source": [ "Let's first define our ML problem scientifically:\n", @@ -326,7 +301,6 @@ { "cell_type": "code", "execution_count": 7, - "id": "4bf7db90", "metadata": {}, "outputs": [ { @@ -440,7 +414,6 @@ }, { "cell_type": "markdown", - "id": "091826e0", "metadata": {}, "source": [ "
\n", @@ -452,7 +425,6 @@ }, { "cell_type": "markdown", - "id": "d5f39fa9", "metadata": {}, "source": [ "Now let's look at our negative labels. Because we just want to retain a random row to avoid bias, we need to introduce randomness into our query:" @@ -461,7 +433,6 @@ { "cell_type": "code", "execution_count": 8, - "id": "44bd2b4f", "metadata": {}, "outputs": [ { @@ -575,7 +546,6 @@ }, { "cell_type": "markdown", - "id": "b81c285f", "metadata": {}, "source": [ "Putting it all together, we can query both our positive and negative classes with the following query:" @@ -584,7 +554,6 @@ { "cell_type": "code", "execution_count": 9, - "id": "944198a2", "metadata": {}, "outputs": [ { @@ -705,7 +674,6 @@ }, { "cell_type": "markdown", - "id": "c39ebdb7", "metadata": {}, "source": [ "Now let's think what kind of features make sense for this kind of problem that we would like to eventually feed to an ML model. There's 2 main kinds of features we're interested in:\n", @@ -736,7 +704,6 @@ { "cell_type": "code", "execution_count": 10, - "id": "f97e2ff6", "metadata": {}, "outputs": [ { @@ -976,7 +943,6 @@ }, { "cell_type": "markdown", - "id": "2ba2468b", "metadata": {}, "source": [ "At that point we have defined all our features, and we also have our classes cleanly defined, so we can tie everything together in a final query that will represent our training set to be used later on on our ML model." @@ -985,7 +951,6 @@ { "cell_type": "code", "execution_count": 11, - "id": "fb182816", "metadata": {}, "outputs": [ { @@ -1273,7 +1238,6 @@ }, { "cell_type": "markdown", - "id": "8830a850", "metadata": {}, "source": [ "# 2. Generating Features Incrementally" @@ -1281,7 +1245,6 @@ }, { "cell_type": "markdown", - "id": "3ad9109f", "metadata": {}, "source": [ "Now in a typical ML workload you'll want to use incremental data to feed to your model, or data between some specific dates. For that purpose we can use snapshot information that is tracked inside Query Service every time a new batch of data is ingested, using the `history_meta` metadata table. For example, you can access the metadata for each batch of your dataset using the query below:" @@ -1290,7 +1253,6 @@ { "cell_type": "code", "execution_count": 12, - "id": "23cdd703", "metadata": {}, "outputs": [ { @@ -1501,7 +1463,6 @@ }, { "cell_type": "markdown", - "id": "f49ae36b", "metadata": {}, "source": [ "Now let's use that information to transform our featurization query into an incremental version of it. We can use [anonymous blocks](https://experienceleague.adobe.com/docs/experience-platform/query/sql/syntax.html?lang=en#anonymous-block) to create variables used to filter on the snapshots. Anonymous blocks are useful to embed multiple queries at once and do things like defining variables and such. We can then use Query Service's `SNAPSHOT BETWEEN x AND y` functionality to query data incrementally.\n", @@ -1512,7 +1473,6 @@ { "cell_type": "code", "execution_count": 13, - "id": "e5653c48", "metadata": {}, "outputs": [ { @@ -1600,7 +1560,6 @@ }, { "cell_type": "markdown", - "id": "aa3e27b0", "metadata": {}, "source": [ "Note that we're not executing it interactively because this anonymous block is actually multiple queries chained together, and there is no simple way to return multiple result sets using PostgreSQL libraries.\n", @@ -1611,7 +1570,6 @@ { "cell_type": "code", "execution_count": 14, - "id": "0521deb9", "metadata": {}, "outputs": [ { @@ -1711,7 +1669,6 @@ { "cell_type": "code", "execution_count": 15, - "id": "f6c8493f", "metadata": {}, "outputs": [], "source": [ @@ -1736,7 +1693,6 @@ { "cell_type": "code", "execution_count": 16, - "id": "799152a5", "metadata": {}, "outputs": [ { @@ -1760,7 +1716,6 @@ }, { "cell_type": "markdown", - "id": "e0e248ca", "metadata": {}, "source": [ "
\n", @@ -1772,7 +1727,6 @@ }, { "cell_type": "markdown", - "id": "725608e3", "metadata": {}, "source": [ "This `CREATE TABLE x AS` statement actually does several steps:\n", @@ -1789,7 +1743,6 @@ { "cell_type": "code", "execution_count": 17, - "id": "14226fb3", "metadata": {}, "outputs": [ { @@ -1811,7 +1764,6 @@ }, { "cell_type": "markdown", - "id": "1b94dbbe", "metadata": {}, "source": [ "![CTAS](../media/CMLE-Notebooks-Week2-CTAS.png)" @@ -1819,7 +1771,6 @@ }, { "cell_type": "markdown", - "id": "cff84519", "metadata": {}, "source": [ "Now we can just query it to see the structure of the data and verify it matches our query:" @@ -1828,7 +1779,6 @@ { "cell_type": "code", "execution_count": 18, - "id": "36b5a5ff", "metadata": {}, "outputs": [ { @@ -2065,7 +2015,6 @@ }, { "cell_type": "markdown", - "id": "78c11a9b", "metadata": {}, "source": [ "# 3. Templatizing the Featurization Query" @@ -2073,7 +2022,6 @@ }, { "cell_type": "markdown", - "id": "1de6f821", "metadata": {}, "source": [ "Now we've got a complete featurization query that can also be used to generate features incrementally, but we still want to go further:\n", @@ -2086,7 +2034,6 @@ }, { "cell_type": "markdown", - "id": "3dad387a", "metadata": {}, "source": [ "The first step is to make make sure we either create the table if it does not exist, otherwise insert into it. This can be done by checking if the table exists using the `table_exists` function and adding a condition in our anonymous block based on that:" @@ -2095,7 +2042,6 @@ { "cell_type": "code", "execution_count": 19, - "id": "6bca2976", "metadata": {}, "outputs": [ { @@ -2257,7 +2203,6 @@ }, { "cell_type": "markdown", - "id": "5996a390", "metadata": {}, "source": [ "The next step is to make the snapshot time window configurable. To do that we can replace the part containing the snapshot boundaries with variables as `$variable` so they can be passed at runtime using Query Service:" @@ -2266,7 +2211,6 @@ { "cell_type": "code", "execution_count": 20, - "id": "a9e2b943", "metadata": {}, "outputs": [], "source": [ @@ -2405,7 +2349,6 @@ }, { "cell_type": "markdown", - "id": "889495c2", "metadata": {}, "source": [ "We're not executing it because it has actual variables in it that will need to be resolved at runtime, so executing it right now would fail. We're ready to turn this into a proper template, which requires the following:\n", @@ -2419,7 +2362,6 @@ { "cell_type": "code", "execution_count": 21, - "id": "e48ae609", "metadata": {}, "outputs": [ { @@ -2444,7 +2386,6 @@ }, { "cell_type": "markdown", - "id": "434e33da", "metadata": {}, "source": [ "![Template](../media/CMLE-Notebooks-Week2-Template.png)" @@ -2452,7 +2393,6 @@ }, { "cell_type": "markdown", - "id": "f03bb751", "metadata": {}, "source": [ "Now that the template is saved, we can refer to it at any time, and passing any kind of values for the snapshots that we want. So for example if you have streaming data coming through your system, you just need to find out the beginning snapshot ID and end snapshot ID, and you can execute this featurization query that will take care of querying between these 2 snapshots.\n", @@ -2463,7 +2403,6 @@ { "cell_type": "code", "execution_count": 22, - "id": "16b4303f", "metadata": {}, "outputs": [ { @@ -2543,7 +2482,6 @@ { "cell_type": "code", "execution_count": 23, - "id": "7e650c17", "metadata": {}, "outputs": [ { @@ -2578,7 +2516,6 @@ }, { "cell_type": "markdown", - "id": "a53dcc26", "metadata": {}, "source": [ "At that point we got our full featurized dataset that is ready to plug into a ML model. But it's still in an Adobe Experience Platform dataset so far, and we need to bring it back to our cloud storage account outside of the Experience Platform to use our tool of choice, which will be covered in the next section.\n", @@ -2589,7 +2526,6 @@ { "cell_type": "code", "execution_count": 24, - "id": "30cd28e7", "metadata": {}, "outputs": [ { @@ -2609,7 +2545,6 @@ }, { "cell_type": "markdown", - "id": "8d255052", "metadata": {}, "source": [ "# 4. Exporting the Featurized Dataset to Cloud Storage" @@ -2617,12 +2552,11 @@ }, { "cell_type": "markdown", - "id": "0bd86f26", "metadata": {}, "source": [ "Now that our featurized data is in a dataset, we need to bring it out to an external cloud storage filesystem from which the ML model training and scoring will be performed. \n", "\n", - "For the purposes of this notebook we will be using the [Data Landing Zone (DLZ)](https://experienceleague.adobe.com/docs/experience-platform/sources/api-tutorials/create/cloud-storage/data-landing-zone.html?lang=en) as the filesystem. Every Adobe Experience Platform has a DLZ already setup as an [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs) container. We'll be using that as a delivery mechanism for the featurized data, but this step can be customized to delivery this data to any cloud storage filesystem.\n", + "For the purposes of this notebook we will be using the [Data Landing Zone (DLZ)](https://experienceleague.adobe.com/docs/experience-platform/sources/api-tutorials/create/cloud-storage/data-landing-zone.html?lang=en) as the filesystem. Every Adobe Experience Platform has a DLZ already setup as an [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs) container or [AWS S3 bucket](https://aws.amazon.com/s3/). We'll be using that as a delivery mechanism for the featurized data, but this step can be customized to delivery this data to any cloud storage filesystem.\n", "\n", "To setup the delivery pipeline, we'll be using the [Flow Service for Destinations](https://developer.adobe.com/experience-platform-apis/references/destinations/) which will be responsible for picking up the featurized data and dump it into the DLZ. There's a few steps involved:\n", "- Creating a **source connection**.\n", @@ -2635,7 +2569,6 @@ { "cell_type": "code", "execution_count": 25, - "id": "8587ad98", "metadata": {}, "outputs": [], "source": [ @@ -2646,7 +2579,6 @@ }, { "cell_type": "markdown", - "id": "74ac5f2f", "metadata": {}, "source": [ "## 4.1 Creating the Source Connection" @@ -2654,7 +2586,6 @@ }, { "cell_type": "markdown", - "id": "70633ae7", "metadata": {}, "source": [ "The source connection is responsible for connecting to your Adobe Experience Platform dataset so that the resulting flow will know exactly where to look for the data and in what format." @@ -2663,7 +2594,6 @@ { "cell_type": "code", "execution_count": 26, - "id": "da6602d9", "metadata": {}, "outputs": [ { @@ -2689,7 +2619,6 @@ }, { "cell_type": "markdown", - "id": "104fed79", "metadata": {}, "source": [ "## 4.2 Creating the Target Connection" @@ -2697,7 +2626,6 @@ }, { "cell_type": "markdown", - "id": "f909b3ee", "metadata": {}, "source": [ "The target connection is responsible for connecting to the destination filesystem. In our case, we want to connect to the DLZ and specify in what format the data will be stored, as well as the type of compression.\n", @@ -2719,7 +2647,6 @@ { "cell_type": "code", "execution_count": 27, - "id": "52e1ff2c", "metadata": {}, "outputs": [ { @@ -2750,7 +2677,6 @@ }, { "cell_type": "markdown", - "id": "2be68712", "metadata": {}, "source": [ "With that base connection, we're ready to create the target connection that will tie to our DLZ directory:" @@ -2759,7 +2685,6 @@ { "cell_type": "code", "execution_count": 28, - "id": "abde3f1e", "metadata": {}, "outputs": [ { @@ -2798,7 +2723,6 @@ }, { "cell_type": "markdown", - "id": "fdd18a9e", "metadata": {}, "source": [ "
\n", @@ -2810,7 +2734,6 @@ }, { "cell_type": "markdown", - "id": "5a07842c", "metadata": {}, "source": [ "## 4.3 Creating the Data Flow" @@ -2818,7 +2741,6 @@ }, { "cell_type": "markdown", - "id": "f58199ff", "metadata": {}, "source": [ "Now that we have the source and target connections setup, we can construct the data flow. A data flow is the \"recipe\" that describes where the data comes from and where it should end up. We can also specify how often checks happen to find new data, but it cannot be lower than 3 hours currently for platform stability reasons. A data flow is tied to a flow spec ID which contains the instructions for transfering data in an optimized way between a source and destination.\n", @@ -2844,7 +2766,6 @@ { "cell_type": "code", "execution_count": 30, - "id": "2f10caed", "metadata": {}, "outputs": [ { @@ -2905,7 +2826,6 @@ }, { "cell_type": "markdown", - "id": "ea9dc178", "metadata": {}, "source": [ "
\n", @@ -2917,7 +2837,6 @@ }, { "cell_type": "markdown", - "id": "42ff24ad", "metadata": {}, "source": [ "After you create the data flow, you should be able to see it in the UI to monitor executions, runtimes and its overall lifecycle. You can get the link below and should be able to see it in the UI as shown in the screenshot as well." @@ -2926,7 +2845,6 @@ { "cell_type": "code", "execution_count": 31, - "id": "20a9828b", "metadata": {}, "outputs": [ { @@ -2944,7 +2862,6 @@ }, { "cell_type": "markdown", - "id": "9b66a686", "metadata": {}, "source": [ "![Dataflow](../media/CMLE-Notebooks-Week2-Dataflow.png)" @@ -2952,7 +2869,6 @@ }, { "cell_type": "markdown", - "id": "79a1bd09", "metadata": {}, "source": [ "## 4.4 Executing the Data Flow" @@ -2960,7 +2876,6 @@ }, { "cell_type": "markdown", - "id": "009ca293", "metadata": {}, "source": [ "At this point we've just created our Data Flow, but it has not executed yet. Please follow the instructions for the option you selected in Section 4.3 :\n", @@ -2972,7 +2887,6 @@ }, { "cell_type": "markdown", - "id": "f6ff4e79", "metadata": {}, "source": [ "
\n", @@ -2983,7 +2897,6 @@ { "cell_type": "code", "execution_count": 35, - "id": "d71e029d", "metadata": {}, "outputs": [ { @@ -3032,7 +2945,6 @@ }, { "cell_type": "markdown", - "id": "e8a78b37", "metadata": {}, "source": [ "
\n", @@ -3044,7 +2956,6 @@ }, { "cell_type": "markdown", - "id": "1d6d182b", "metadata": {}, "source": [ "
\n", @@ -3056,7 +2967,6 @@ }, { "cell_type": "markdown", - "id": "a033dee8", "metadata": {}, "source": [ "
\n", @@ -3068,7 +2978,6 @@ }, { "cell_type": "markdown", - "id": "73e36552", "metadata": {}, "source": [ "Now we can check the execution of our Data Flow to make sure it actually executes. You can run the following cell until you can see the run appear." @@ -3077,7 +2986,6 @@ { "cell_type": "code", "execution_count": 39, - "id": "124fe9e2", "metadata": {}, "outputs": [ { @@ -3113,65 +3021,105 @@ }, { "cell_type": "markdown", - "id": "916beb77", "metadata": {}, "source": [ - "Now that a run of our Data Flow has executed successfully, we're all set! We can do a sanity check to verify that the data indeed made its way into the DLZ. For that, we recommend setting up [Azure Storage Explorer](https://azure.microsoft.com/en-us/products/storage/storage-explorer) to connect to your DLZ container using [this guide](https://experienceleague.adobe.com/docs/experience-platform/destinations/catalog/cloud-storage/data-landing-zone.html?lang=en). To get the credentials, you can execute the code below to get the SAS URL needed:" + "Now that a run of our Data Flow has executed successfully, we're all set! We can do a sanity check to verify that the data indeed made its way into the DLZ. Based on whether DLZ was provisioned on AWS os Azure we will use a generic approach for listing directory structures." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# We will define a few helper method to allow listing DLZ\n", + "\n", + "import fsspec\n", + "\n", + "\n", + "def getDLZFSPath(credentials: dict):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_credentials = {\n", + " 'key' : credentials['credentials']['awsAccessKeyId'],\n", + " 'secret' : credentials['credentials']['awsSecretAccessKey'],\n", + " 'token' : credentials['credentials']['awsSessionToken']\n", + " }\n", + " return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName']\n", + " else:\n", + " abs_credentials = {\n", + " 'account_name' : credentials['storageAccountName'],\n", + " 'sas_token' : credentials['SASToken']\n", + " }\n", + " return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName']\n", + "\n", + "def listDLZ(fs, container, prefix):\n", + " entries = fs.ls(container, detail=True)\n", + " entries_sorted = sorted(entries, key=lambda x: x['type'], reverse=True) # Directories first\n", + " for i, entry in enumerate(entries_sorted):\n", + " entry_name = entry['name'].split('/')[-1]\n", + " if entry['type'] == 'directory':\n", + " entry_name += '/'\n", + " connector = '|-- ' if i < len(entries_sorted) - 1 else '└- '\n", + " print(f\"{prefix}{connector}{entry_name}\")\n", + " if entry['type'] == 'directory':\n", + " new_prefix = prefix + ('| ' if i < len(entries_sorted) - 1 else ' ')\n", + " listDLZ(fs, entry['name'], new_prefix)" ] }, { "cell_type": "code", "execution_count": 40, - "id": "44677122", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "DLZ container: dlz-destination\n", - "DLZ storage account: sndbxdtlndv43vd4kug0dnjn\n", - "DLZ SAS URL: https://sndbxdtlndv43vd4kug0dnjn.blob.core.windows.net/dlz-destination?sv=2020-10-02&si=dlz-5fe94224-8440-428d-9d34-6c734cf147e4&sr=c&sp=rl&sig=mlsGbwD%2Fyy58IcD9wVHACFDRsW%2Fzv4AolJo5bw36Ofw%3D\n" + "|-- _$azuretmpfolder$/\n", + "└- cmle/\n", + " └- egress/\n", + " └- 66018d8312377d2c68545bac/\n", + " └- exportTime=20240405230601/\n", + " |-- part-00000-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102384-1-c000.gz.parquet\n", + " |-- part-00001-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102385-1-c000.gz.parquet\n", + " |-- part-00002-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102386-1-c000.gz.parquet\n", + " |-- part-00003-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102387-1-c000.gz.parquet\n", + " |-- part-00004-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102388-1-c000.gz.parquet\n", + " |-- part-00005-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102389-1-c000.gz.parquet\n", + " |-- part-00006-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102390-1-c000.gz.parquet\n", + " |-- part-00007-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102391-1-c000.gz.parquet\n", + " |-- part-00008-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102392-1-c000.gz.parquet\n", + " |-- part-00009-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102393-1-c000.gz.parquet\n", + " |-- part-00010-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102394-1-c000.gz.parquet\n", + " |-- part-00011-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102395-1-c000.gz.parquet\n", + " |-- part-00012-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102396-1-c000.gz.parquet\n", + " |-- part-00013-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102397-1-c000.gz.parquet\n", + " |-- part-00014-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102398-1-c000.gz.parquet\n", + " └- part-00015-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102399-1-c000.gz.parquet\n" ] } ], "source": [ - "# TODO: use functionality in aepp once released\n", - "from aepp import connector\n", - "\n", - "connector = connector.AdobeRequest(\n", - " config_object=aepp.config.config_object,\n", - " header=aepp.config.header,\n", - " loggingEnabled=False,\n", - " logger=None,\n", - ")\n", + "import aepp\n", + "from aepp import flowservice\n", "\n", - "endpoint = aepp.config.endpoints[\"global\"] + \"/data/foundation/connectors/landingzone/credentials\"\n", + "flow_conn = flowservice.FlowService()\n", + "credentials = flow_conn.getLandingZoneCredential(dlz_type='dlz_destination')\n", "\n", - "dlz_credentials = connector.getData(endpoint=endpoint, params={\n", - " \"type\": \"dlz_destination\"\n", - "})\n", - "dlz_container = dlz_credentials[\"containerName\"]\n", - "dlz_sas_token = dlz_credentials[\"SASToken\"]\n", - "dlz_storage_account = dlz_credentials[\"storageAccountName\"]\n", - "dlz_sas_uri = dlz_credentials[\"SASUri\"]\n", - "print(f\"DLZ container: {dlz_container}\")\n", - "print(f\"DLZ storage account: {dlz_storage_account}\")\n", - "print(f\"DLZ SAS URL: {dlz_sas_uri}\")" + "fs, container = getDLZFSPath(credentials)\n", + "listDLZ(fs, container, '')" ] }, { "cell_type": "markdown", - "id": "7c528cd7", "metadata": {}, "source": [ - "Once setup you should be able to see your featurized data as a set of Parquet files under the following directory structure: `cmle/egress/$DATASETID/exportTime=$TIMESTAMP` - see screenshot below." + "Once setup you should be able to see your featurized data as a set of Parquet files under the following directory structure: `cmle/egress/$DATASETID/exportTime=$TIMESTAMP` - similar to the structure above." ] }, { "cell_type": "code", "execution_count": 18, - "id": "b73838de", "metadata": {}, "outputs": [ { @@ -3188,15 +3136,6 @@ }, { "cell_type": "markdown", - "id": "5aca50e2", - "metadata": {}, - "source": [ - "![DLZ](../media/CMLE-Notebooks-Week2-ExportedDataset.png)" - ] - }, - { - "cell_type": "markdown", - "id": "88fde872", "metadata": {}, "source": [ "## 4.5 Saving the featurized dataset to the configuration" @@ -3204,7 +3143,6 @@ }, { "cell_type": "markdown", - "id": "53fe62d6", "metadata": {}, "source": [ "Now that we got everything working, we just need to save the `created_dataset_id` variable in the original configuration file, so we can refer to it in the following weekly assignments. To do that, execute the code below:" @@ -3213,7 +3151,6 @@ { "cell_type": "code", "execution_count": 42, - "id": "b41c35a7", "metadata": {}, "outputs": [], "source": [ @@ -3240,7 +3177,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.12.2" } }, "nbformat": 4, diff --git a/notebooks/datarobot/Week3Notebook_datarobot.ipynb b/notebooks/datarobot/Week3Notebook_datarobot.ipynb index fcf707e..7fd0002 100644 --- a/notebooks/datarobot/Week3Notebook_datarobot.ipynb +++ b/notebooks/datarobot/Week3Notebook_datarobot.ipynb @@ -95,8 +95,7 @@ "environment = config.get(\"Platform\", \"environment\")\n", "client_id = config.get(\"Authentication\", \"client_id\")\n", "client_secret = config.get(\"Authentication\", \"client_secret\")\n", - "private_key_path = config.get(\"Authentication\", \"private_key_path\")\n", - "tech_account_id = config.get(\"Authentication\", \"tech_acct_id\")\n", + "scopes = config.get(\"Authentication\", \"scopes\")\n", "dataset_id = config.get(\"Platform\", \"dataset_id\")\n", "featurized_dataset_id = config.get(\"Platform\", \"featurized_dataset_id\")\n", "export_path = config.get(\"Cloud\", \"export_path\")\n", @@ -105,12 +104,7 @@ "compression_type = config.get(\"Cloud\", \"compression_type\")\n", "model_name = config.get(\"Cloud\", \"model_name\")\n", "datarobot_key = config.get(\"DataRobot\", 'datarobot_key')\n", - "datarobot_endpoint = config.get(\"DataRobot\", 'datarobot_endpoint')\n", - "\n", - "\n", - " \n", - "if not os.path.exists(private_key_path):\n", - " raise Exception(f\"Looking for private key file under {private_key_path} but key not found, please verify path\")" + "datarobot_endpoint = config.get(\"DataRobot\", 'datarobot_endpoint')" ] }, { @@ -511,6 +505,8 @@ "source": [ "!pip install aepp\n", "!pip install adlfs\n", + "!pip install s3fs\n", + "!pip install fsspec\n", "%pip install datarobot" ] }, @@ -528,24 +524,7 @@ "source": [ "Before any calls can take place, we need to configure the library and setup authentication credentials. For this you'll need the following piece of information. For information about how you can get these, please refer to the `Setup` section of the **Readme**:\n", "- Client ID\n", - "- Client secret\n", - "- Private key\n", - "- Technical account ID" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, - "inputWidgets": {}, - "nuid": "42e91d04-203a-4f1b-be0a-873e02e84484", - "showTitle": false, - "title": "" - } - }, - "source": [ - "The private key needs to be accessible on disk from this notebook. We recommend uploading it to DBFS and refering to it with the `/dbfs` prefix. This can be achieved by clicking in the Databricks notebook interface on `File > Upload data to DBFS` and then selecting the **private.key** file you downloaded during the setup, click `Next` and then you should have the option to copy the path. Make sure it starts with `/dbfs/FileStore` - for example if you uploaded your private key into `/FileStore/shared_upload/your_username` then the final path should be `/dbfs/FileStore/shared_uploads/your_username/private.key`. Copy that value into the cell `Private Key Path` at the very top of this notebook." + "- Client secret" ] }, { @@ -571,9 +550,8 @@ " environment=environment,\n", " sandbox=sandbox_name,\n", " org_id=ims_org_id,\n", - " tech_id=tech_account_id, \n", + " scopes=scopes, \n", " secret=client_secret,\n", - " path_to_key=private_key_path,\n", " client_id=client_id\n", ")" ] @@ -640,24 +618,10 @@ }, "outputs": [], "source": [ - "from aepp import connector\n", - "\n", - "connector = connector.AdobeRequest(\n", - " config_object=aepp.config.config_object,\n", - " header=aepp.config.header,\n", - " loggingEnabled=False,\n", - " logger=None,\n", - ")\n", - "\n", - "endpoint = aepp.config.endpoints[\"global\"] + \"/data/foundation/connectors/landingzone/credentials\"\n", + "from aepp import flowservice\n", "\n", - "dlz_credentials = connector.getData(endpoint=endpoint, params={\n", - " \"type\": \"dlz_destination\"\n", - "})\n", - "dlz_container = dlz_credentials[\"containerName\"]\n", - "dlz_sas_token = dlz_credentials[\"SASToken\"]\n", - "dlz_storage_account = dlz_credentials[\"storageAccountName\"]\n", - "dlz_sas_uri = dlz_credentials[\"SASUri\"]" + "flow_conn = flowservice.FlowService()\n", + "credentials = flow_conn.getLandingZoneCredential(dlz_type='dlz_destination')" ] }, { @@ -700,13 +664,34 @@ } ], "source": [ - "from adlfs import AzureBlobFileSystem\n", + "import fsspec\n", "from fsspec import AbstractFileSystem\n", "\n", - "fs = AzureBlobFileSystem(\n", - " account_name=dlz_storage_account,\n", - " sas_token=dlz_sas_token\n", - ")\n", + "def getDLZFSPath(credentials: dict):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_credentials = {\n", + " 'key' : credentials['credentials']['awsAccessKeyId'],\n", + " 'secret' : credentials['credentials']['awsSecretAccessKey'],\n", + " 'token' : credentials['credentials']['awsSessionToken']\n", + " }\n", + " return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName']\n", + " else:\n", + " abs_credentials = {\n", + " 'account_name' : credentials['storageAccountName'],\n", + " 'sas_token' : credentials['SASToken']\n", + " }\n", + " return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName']\n", + " \n", + "def getDLZDataPath(credentials):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_buket = credentials['dlzPath']['bucketName']\n", + " dlz_folder = credentials['dlzPath']['dlzFolder']\n", + " return f\"s3a://${aws_buket}/{dlz_folder}/\"\n", + " else:\n", + " dlz_storage_account = credentials['storageAccountName']\n", + " dlz_container = credentials['containerName']\n", + " return f\"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/\"\n", + "\n", "\n", "def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str):\n", " featurized_data_base_path = f\"{container_name}/{base_path}/{dataset_id}\"\n", @@ -721,7 +706,11 @@ " featurized_data_export_time = featurized_data_export_path.strip().split(\"/\")[-1].split(\"=\")[-1]\n", " return featurized_data_export_time\n", "\n", - "export_time = get_export_time(fs, dlz_container, export_path, featurized_dataset_id)\n", + "\n", + "fs, container = getDLZFSPath(credentials)\n", + "\n", + "\n", + "export_time = get_export_time(fs, container, export_path, featurized_dataset_id)\n", "print(f\"Using featurized data export time of {export_time}\")" ] }, @@ -737,12 +726,56 @@ } }, "source": [ - "At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Spark needs the following properties to be able to authenticate using SAS:\n", + "At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. \n", + "Based on the provisioned account Landing Zone could be either configured to use azure or aws, \n", + "in case of azure following properties will be used to authenticate using SAS:\n", "- `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`.\n", "- `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`.\n", "- `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier.\n", "\n", - "Let's put that in practice and create a Spark dataframe containing the entire featurized data:" + "in case of aws following properties will be used to access data stored in s3:\n", + "- `fs.s3a.access.key` and `spark.hadoop.fs.s3a.access.key` should be the s3 access key\n", + "- `fs.s3a.secret.key` and `spark.hadoop.fs.s3a.secret.key` should be the s3 secret\n", + "- `fs.s3a.session.token` and `spark.hadoop.fs.s3a.session.token` should be set to s3 session token\n", + "- `fs.s3a.aws.credentials.provider` and `spark.hadoop.fs.s3a.aws.credentials.provider` should be set to `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider`\n", + "- `fs.s3.impl` and `spark.hadoop.fs.s3.impl` should be set to `org.apache.hadoop.fs.s3a.S3AFileSystem`\n", + "\n", + "\n", + "The above properties are calculated based on the landing zone credentials, following util method will set these up:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def configureSparkSessionAndGetPath(credentials):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_key = credentials['credentials']['awsAccessKeyId']\n", + " aws_secret = credentials['credentials']['awsSecretAccessKey']\n", + " aws_token = credentials['credentials']['awsSessionToken']\n", + " aws_buket = credentials['dlzPath']['bucketName']\n", + " dlz_folder = credentials['dlzPath']['dlzFolder']\n", + " spark.conf.set(\"fs.s3a.access.key\", aws_key)\n", + " spark.conf.set(\"fs.s3a.secret.key\", aws_secret)\n", + " spark.conf.set(\"fs.s3a.session.token\", aws_token)\n", + " spark.conf.set(\"fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + " spark.conf.set(\"fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.access.key\", aws_key)\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.secret.key\", aws_secret)\n", + " spark.conf.set(\"fs.s3a.session.token\", aws_token)\n", + " return f\"s3a://${aws_buket}/{dlz_folder}/\"\n", + " else:\n", + " dlz_storage_account = credentials['storageAccountName']\n", + " dlz_sas_token = credentials['SASToken']\n", + " dlz_container = credentials['containerName']\n", + " spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", + " spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", + " spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)\n", + " return f\"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/\"" ] }, { @@ -790,13 +823,12 @@ } ], "source": [ - "spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", - "spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", - "spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)\n", + "# init spark session for provisioned DLZ and get the base path (fs3://bucket_name/folder or abfss://container@account/)\n", + "cloud_base_path = configureSparkSessionAndGetPath(credentials)\n", "\n", - "protocol = \"abfss\"\n", - "input_path = f\"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{export_path}/{featurized_dataset_id}/exportTime={export_time}/\"\n", + "input_path = cloud_base_path + f\"{export_path}/{featurized_dataset_id}/exportTime={export_time}/\"\n", "\n", + "#Let's put that in practice and create a Spark dataframe containing the entire featurized data:\n", "df = spark.read.parquet(input_path)\n", "df.printSchema()" ] @@ -1191,7 +1223,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.12.2" } }, "nbformat": 4, diff --git a/notebooks/datarobot/Week4Notebook_datarobot.ipynb b/notebooks/datarobot/Week4Notebook_datarobot.ipynb index a2e2385..54878ea 100644 --- a/notebooks/datarobot/Week4Notebook_datarobot.ipynb +++ b/notebooks/datarobot/Week4Notebook_datarobot.ipynb @@ -106,8 +106,7 @@ "environment = config.get(\"Platform\", \"environment\")\n", "client_id = config.get(\"Authentication\", \"client_id\")\n", "client_secret = config.get(\"Authentication\", \"client_secret\")\n", - "private_key_path = config.get(\"Authentication\", \"private_key_path\")\n", - "tech_account_id = config.get(\"Authentication\", \"tech_acct_id\")\n", + "scopes = config.get(\"Authentication\", \"scopes\")\n", "dataset_id = config.get(\"Platform\", \"dataset_id\")\n", "featurized_dataset_id = config.get(\"Platform\", \"featurized_dataset_id\")\n", "export_path = config.get(\"Cloud\", \"export_path\")\n", @@ -115,12 +114,10 @@ "data_format = config.get(\"Cloud\", \"data_format\")\n", "compression_type = config.get(\"Cloud\", \"compression_type\")\n", "model_name = config.get(\"Cloud\", \"model_name\")\n", + "\n", "datarobot_key = config.get(\"DataRobot\", 'datarobot_key')\n", "datarobot_endpoint = config.get(\"DataRobot\", 'datarobot_endpoint')\n", - "datarobot_deployment_id = config.get(\"DataRobot\", 'datarobot_deployment_id')\n", - " \n", - "if not os.path.exists(private_key_path):\n", - " raise Exception(f\"Looking for private key file under {private_key_path} but key not found, please verify path\")" + "datarobot_deployment_id = config.get(\"DataRobot\", 'datarobot_deployment_id')" ] }, { @@ -878,7 +875,9 @@ "source": [ "!pip install mlflow\n", "!pip install aepp\n", - "!pip install adlfs" + "!pip install adlfs\n", + "!pip install s3fs\n", + "!pip install fsspec" ] }, { @@ -910,24 +909,7 @@ "source": [ "Before any calls can take place, we need to configure the library and setup authentication credentials. For this you'll need the following piece of information. For information about how you can get these, please refer to the `Setup` section of the **Readme**:\n", "- Client ID\n", - "- Client secret\n", - "- Private key\n", - "- Technical account ID" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, - "inputWidgets": {}, - "nuid": "ebba43c4-a1cc-4e49-8ba1-162a6b24ebba", - "showTitle": false, - "title": "" - } - }, - "source": [ - "The private key needs to be accessible on disk from this notebook. We recommend uploading it to DBFS and refering to it with the `/dbfs` prefix. This can be achieved by clicking in the Databricks notebook interface on `File > Upload data to DBFS` and then selecting the **private.key** file you downloaded during the setup, click `Next` and then you should have the option to copy the path. Make sure it starts with `/dbfs/FileStore` - for example if you uploaded your private key into `/FileStore/shared_upload/your_username` then the final path should be `/dbfs/FileStore/shared_uploads/your_username/private.key`. Make sure this value is properly set in the `private_key_path` variable of your configuration file." + "- Client secret" ] }, { @@ -951,9 +933,8 @@ "\n", "aepp.configure(\n", " org_id=ims_org_id,\n", - " tech_id=tech_account_id, \n", + " scopes=scopes, \n", " secret=client_secret,\n", - " path_to_key=private_key_path,\n", " client_id=client_id,\n", " environment=environment,\n", " sandbox=sandbox_name\n", @@ -1038,25 +1019,10 @@ "outputs": [], "source": [ "import aepp\n", - " \n", - "from aepp import connector\n", - " \n", - "connector = connector.AdobeRequest(\n", - " config_object=aepp.config.config_object,\n", - " header=aepp.config.header,\n", - " loggingEnabled=False,\n", - " logger=None,\n", - ")\n", + "from aepp import flowservice\n", "\n", - "endpoint = aepp.config.endpoints[\"global\"] + \"/data/foundation/connectors/landingzone/credentials\"\n", - " \n", - "dlz_credentials = connector.getData(endpoint=endpoint, params={\n", - " \"type\": \"dlz_destination\"\n", - "})\n", - "dlz_container = dlz_credentials[\"containerName\"]\n", - "dlz_sas_token = dlz_credentials[\"SASToken\"]\n", - "dlz_storage_account = dlz_credentials[\"storageAccountName\"]\n", - "dlz_sas_uri = dlz_credentials[\"SASUri\"]" + "flow_conn = flowservice.FlowService()\n", + "credentials = flow_conn.getLandingZoneCredential(dlz_type='dlz_destination')" ] }, { @@ -1099,14 +1065,25 @@ } ], "source": [ - "from adlfs import AzureBlobFileSystem\n", + "import fsspec\n", "from fsspec import AbstractFileSystem\n", "\n", - "fs = AzureBlobFileSystem(\n", - " account_name=dlz_storage_account,\n", - " sas_token=dlz_sas_token\n", - ")\n", + "def getDLZFSPath(credentials: dict):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_credentials = {\n", + " 'key' : credentials['credentials']['awsAccessKeyId'],\n", + " 'secret' : credentials['credentials']['awsSecretAccessKey'],\n", + " 'token' : credentials['credentials']['awsSessionToken']\n", + " }\n", + " return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName']\n", + " else:\n", + " abs_credentials = {\n", + " 'account_name' : credentials['storageAccountName'],\n", + " 'sas_token' : credentials['SASToken']\n", + " }\n", + " return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName']\n", "\n", + " \n", "def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str):\n", " featurized_data_base_path = f\"{container_name}/{base_path}/{dataset_id}\"\n", " featurized_data_export_paths = fs.ls(featurized_data_base_path)\n", @@ -1120,7 +1097,9 @@ " featurized_data_export_time = featurized_data_export_path.strip().split(\"/\")[-1].split(\"=\")[-1]\n", " return featurized_data_export_time\n", "\n", - "export_time = get_export_time(fs, dlz_container, export_path, featurized_dataset_id)\n", + "fs, container = getDLZFSPath(credentials)\n", + "\n", + "export_time = get_export_time(fs, container, export_path, featurized_dataset_id)\n", "print(f\"Using featurized data export time of {export_time}\")" ] }, @@ -1136,12 +1115,56 @@ } }, "source": [ - "At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Spark needs the following properties to be able to authenticate using SAS:\n", + "At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. \n", + "Based on the provisioned account Landing Zone could be either configured to use azure or aws, in case of azure following properties will be used to authenticate using SAS:\n", + "\n", "- `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`.\n", "- `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`.\n", "- `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier.\n", "\n", - "Let's put that in practice and create a Spark dataframe containing the entire featurized data:" + "in case of aws following properties will be used to access data stored in s3:\n", + "\n", + "- `fs.s3a.access.key` and spark.hadoop.fs.s3a.access.key` should be the s3 access key\n", + "- `fs.s3a.secret.key` and spark.hadoop.fs.s3a.secret.key` should be the s3 secret\n", + "- `fs.s3a.session.token` and `spark.hadoop.fs.s3a.session.token` should be set to s3 session token\n", + "- `fs.s3a.aws.credentials.provider` and `spark.hadoop.fs.s3a.aws.credentials.provider` should be set to `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider`\n", + "- `fs.s3.impl` and `spark.hadoop.fs.s3.impl` should be set to `org.apache.hadoop.fs.s3a.S3AFileSystem`\n", + "\n", + "The above properties are calculated based on the landing zone credentials, following util method will set these up:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def configureSparkSessionAndGetPath(credentials):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_key = credentials['credentials']['awsAccessKeyId']\n", + " aws_secret = credentials['credentials']['awsSecretAccessKey']\n", + " aws_token = credentials['credentials']['awsSessionToken']\n", + " aws_buket = credentials['dlzPath']['bucketName']\n", + " dlz_folder = credentials['dlzPath']['dlzFolder']\n", + " spark.conf.set(\"fs.s3a.access.key\", aws_key)\n", + " spark.conf.set(\"fs.s3a.secret.key\", aws_secret)\n", + " spark.conf.set(\"fs.s3a.session.token\", aws_token)\n", + " spark.conf.set(\"fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + " spark.conf.set(\"fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.access.key\", aws_key)\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.secret.key\", aws_secret)\n", + " spark.conf.set(\"fs.s3a.session.token\", aws_token)\n", + " return f\"s3a://{aws_buket}/{dlz_folder}/\"\n", + " else:\n", + " dlz_storage_account = credentials['storageAccountName']\n", + " dlz_sas_token = credentials['SASToken']\n", + " dlz_container = credentials['containerName'] \n", + " spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", + " spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", + " spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)\n", + " return f\"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/\"" ] }, { @@ -1189,12 +1212,8 @@ } ], "source": [ - "spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", - "spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", - "spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)\n", - "\n", - "protocol = \"abfss\"\n", - "input_path = f\"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{export_path}/{featurized_dataset_id}/exportTime={export_time}/\"\n", + "cloud_base_path = configureSparkSessionAndGetPath(credentials)\n", + "input_path = cloud_base_path + f\"{export_path}/{featurized_dataset_id}/exportTime={export_time}/\"\n", "\n", "df = spark.read.parquet(input_path)\n", "df.printSchema()" @@ -2039,12 +2058,15 @@ "from aepp import flowservice\n", "\n", "flow_conn = flowservice.FlowService()\n", - "\n", "dlz_credentials = flow_conn.getLandingZoneCredential()\n", - "dlz_container = dlz_credentials[\"containerName\"]\n", - "dlz_sas_token = dlz_credentials[\"SASToken\"]\n", - "dlz_storage_account = dlz_credentials[\"storageAccountName\"]\n", - "dlz_sas_uri = dlz_credentials[\"SASUri\"]\n", + "\n", + "def getDLZPath(credentials: dict):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " return credentials['dlzPath']['bucketName'] + '/' + credentials['dlzPath']['dlzFolder']\n", + " else:\n", + " return credentials['containerName']\n", + "\n", + "dlz_container = getDLZPath(dlz_credentials)\n", "print(dlz_container)" ] }, @@ -2506,9 +2528,8 @@ }, "outputs": [], "source": [ - "spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", - "spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", - "spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)" + "dlz_path = configureSparkSessionAndGetPath(dlz_credentials)\n", + "dlz_path" ] }, { @@ -2553,9 +2574,8 @@ "source": [ "from datetime import datetime\n", "\n", - "protocol = \"abfss\"\n", "scoring_export_time = datetime.utcnow().strftime('%Y%m%d%H%M%S')\n", - "output_path = f\"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{import_path}/{ingestion_dataset_id}/exportTime={scoring_export_time}/\"\n", + "output_path = f\"{dlz_path}{import_path}/{ingestion_dataset_id}/exportTime={scoring_export_time}/\"\n", "output_path" ] }, @@ -2852,7 +2872,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.12.2" } }, "nbformat": 4, diff --git a/notebooks/datarobot/Week5Notebook_datarobot.ipynb b/notebooks/datarobot/Week5Notebook_datarobot.ipynb index fa61b93..0cd7c0f 100644 --- a/notebooks/datarobot/Week5Notebook_datarobot.ipynb +++ b/notebooks/datarobot/Week5Notebook_datarobot.ipynb @@ -105,8 +105,7 @@ "environment = config.get(\"Platform\", \"environment\")\n", "client_id = config.get(\"Authentication\", \"client_id\")\n", "client_secret = config.get(\"Authentication\", \"client_secret\")\n", - "private_key_path = config.get(\"Authentication\", \"private_key_path\")\n", - "tech_account_id = config.get(\"Authentication\", \"tech_acct_id\")\n", + "scopes = config.get(\"Authentication\", \"scopes\")\n", "dataset_id = config.get(\"Platform\", \"dataset_id\")\n", "featurized_dataset_id = config.get(\"Platform\", \"featurized_dataset_id\")\n", "scoring_dataset_id = config.get(\"Platform\", \"scoring_dataset_id\")\n", @@ -114,10 +113,7 @@ "import_path = config.get(\"Cloud\", \"import_path\")\n", "data_format = config.get(\"Cloud\", \"data_format\")\n", "compression_type = config.get(\"Cloud\", \"compression_type\")\n", - "model_name = config.get(\"Cloud\", \"model_name\")\n", - "\n", - "if not os.path.exists(private_key_path):\n", - " raise Exception(f\"Looking for private key file under {private_key_path} but key not found, please verify path\")" + "model_name = config.get(\"Cloud\", \"model_name\")\n" ] }, { @@ -557,7 +553,9 @@ ], "source": [ "!pip install aepp\n", - "!pip install adlfs" + "!pip install adlfs\n", + "!pip install s3fs\n", + "!pip install fsspec" ] }, { @@ -589,9 +587,7 @@ "source": [ "Before any calls can take place, we need to configure the library and setup authentication credentials. For this you'll need the following piece of information. For information about how you can get these, please refer to the `Setup` section of the **Readme**:\n", "- Client ID\n", - "- Client secret\n", - "- Private key\n", - "- Technical account ID" + "- Client secret" ] }, { @@ -630,9 +626,8 @@ "\n", "aepp.configure(\n", " org_id=ims_org_id,\n", - " tech_id=tech_account_id, \n", " secret=client_secret,\n", - " path_to_key=private_key_path,\n", + " scopes=scopes,\n", " client_id=client_id,\n", " environment=environment,\n", " sandbox=sandbox_name\n", @@ -724,16 +719,30 @@ } ], "source": [ + "import fsspec\n", "from aepp import flowservice\n", + "from fsspec import AbstractFileSystem\n", "\n", "flow_conn = flowservice.FlowService()\n", - "\n", "dlz_credentials = flow_conn.getLandingZoneCredential()\n", - "dlz_container = dlz_credentials[\"containerName\"]\n", - "dlz_sas_token = dlz_credentials[\"SASToken\"]\n", - "dlz_storage_account = dlz_credentials[\"storageAccountName\"]\n", - "dlz_sas_uri = dlz_credentials[\"SASUri\"]\n", - "print(dlz_container)" + "\n", + "def getDLZFSPath(credentials: dict):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_credentials = {\n", + " 'key' : credentials['credentials']['awsAccessKeyId'],\n", + " 'secret' : credentials['credentials']['awsSecretAccessKey'],\n", + " 'token' : credentials['credentials']['awsSessionToken']\n", + " }\n", + " return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName']\n", + " else:\n", + " abs_credentials = {\n", + " 'account_name' : credentials['storageAccountName'],\n", + " 'sas_token' : credentials['SASToken']\n", + " }\n", + " return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName']\n", + "\n", + "fs, dlzfs_path = getDLZFSPath(dlz_credentials)\n", + "print(dlzfs_path)" ] }, { @@ -762,14 +771,6 @@ } ], "source": [ - "from adlfs import AzureBlobFileSystem\n", - "from fsspec import AbstractFileSystem\n", - "\n", - "fs = AzureBlobFileSystem(\n", - " account_name=dlz_storage_account,\n", - " sas_token=dlz_sas_token\n", - ")\n", - "\n", "def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str):\n", " featurized_data_base_path = f\"{container_name}/{base_path}/{dataset_id}\"\n", " featurized_data_export_paths = fs.ls(featurized_data_base_path)\n", @@ -783,7 +784,7 @@ " featurized_data_export_time = featurized_data_export_path.strip().split(\"/\")[-1].split(\"=\")[-1]\n", " return featurized_data_export_time\n", "\n", - "export_time = get_export_time(fs, dlz_container, import_path, scoring_dataset_id)\n", + "export_time = get_export_time(fs, dlzfs_path, import_path, scoring_dataset_id)\n", "print(f\"Using featurized data export time of {export_time}\")" ] }, @@ -799,12 +800,21 @@ } }, "source": [ - "At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Spark needs the following properties to be able to authenticate using SAS:\n", - "- `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`.\n", - "- `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`.\n", - "- `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier.\n", + "At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Based on the provisioned account Landing Zone could be either configured to use **azure** or **aws**, in case of azure following properties will be used to authenticate using SAS:\n", + "\n", + " - `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`.\n", + " - `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`.\n", + " - `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the `SAS token` retrieved earlier.\n", + "\n", + "in case of aws following properties will be used to access data stored in s3:\n", "\n", - "Let's put that in practice and create a Spark dataframe containing the entire featurized data:" + " - `fs.s3a.access.key` and `spark.hadoop.fs.s3a.access.key` should be the `s3 access key`\n", + " - `fs.s3a.secret.key` and spark.hadoop.fs.s3a.secret.key should be the `s3 secret`\n", + " - `fs.s3a.session.token` and spark.hadoop.fs.s3a.session.token should be set to `s3 session token`\n", + " - `fs.s3a.aws.credentials.provider` and `spark.hadoop.fs.s3a.aws.credentials.provider` should be set to `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider`\n", + " - `fs.s3.impl` and `spark.hadoop.fs.s3.impl` should be set to `org.apache.hadoop.fs.s3a.S3AFileSystem`\n", + "\n", + "The above properties are calculated based on the landing zone credentials, following util method will set these up. Let's put that in practice and create a Spark dataframe containing the entire featurized data:" ] }, { @@ -835,12 +845,36 @@ } ], "source": [ - "spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", - "spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", - "spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)\n", + "def configureSparkSessionAndGetPath(credentials):\n", + " if 'dlzProvider' in credentials.keys() and ['Amazon', 's3'] in credentials['dlzProvider']:\n", + " aws_key = credentials['credentials']['awsAccessKeyId']\n", + " aws_secret = credentials['credentials']['awsSecretAccessKey']\n", + " aws_token = credentials['credentials']['awsSessionToken']\n", + " aws_buket = credentials['dlzPath']['bucketName']\n", + " dlz_folder = credentials['dlzPath']['dlzFolder']\n", + " spark.conf.set(\"fs.s3a.access.key\", aws_key)\n", + " spark.conf.set(\"fs.s3a.secret.key\", aws_secret)\n", + " spark.conf.set(\"fs.s3a.session.token\", aws_token)\n", + " spark.conf.set(\"fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + " spark.conf.set(\"fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider\")\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.access.key\", aws_key)\n", + " spark.conf.set(\"spark.hadoop.fs.s3a.secret.key\", aws_secret)\n", + " spark.conf.set(\"fs.s3a.session.token\", aws_token)\n", + " return f\"s3a://${aws_buket}/{dlz_folder}/\"\n", + " else:\n", + " dlz_storage_account = credentials['storageAccountName']\n", + " dlz_sas_token = credentials['SASToken']\n", + " dlz_container = credentials['containerName']\n", + " spark.conf.set(f\"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net\", \"SAS\")\n", + " spark.conf.set(f\"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net\", \"org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider\")\n", + " spark.conf.set(f\"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net\", dlz_sas_token)\n", + " return f\"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/\"\n", + "\n", + "dlz_path = configureSparkSessionAndGetPath(dlz_credentials)\n", "\n", - "protocol = \"abfss\"\n", - "input_path = f\"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{import_path}/{scoring_dataset_id}/exportTime={export_time}/\"\n", + "input_path = f\"{dlz_path}{import_path}/{scoring_dataset_id}/exportTime={export_time}/\"\n", "\n", "df = spark.read.format(\"csv\").option(\"header\", \"true\").load(input_path)\n", "df.printSchema()" @@ -2306,7 +2340,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.6" + "version": "3.12.2" } }, "nbformat": 4, From c84910d43d80de1cd35c7c78afb7316e6fd3b6cb Mon Sep 17 00:00:00 2001 From: Eugen N Date: Thu, 13 Jun 2024 20:05:46 -0400 Subject: [PATCH 2/2] PLAT-183981 [aep-cmle:databricks] Adapt codebase to support AWS along Azure cloud storage providers --- notebooks/databricks/CommonInclude.py | 19 ++-- notebooks/databricks/RunMe.py | 4 +- notebooks/databricks/Week2Notebook.py | 87 +++++++++++------ notebooks/databricks/Week3Notebook.py | 98 +++++++++++++++++--- notebooks/databricks/Week4Notebook.py | 128 +++++++++++++++++++------- notebooks/databricks/Week5Notebook.py | 92 ++++++++++++------ 6 files changed, 313 insertions(+), 115 deletions(-) diff --git a/notebooks/databricks/CommonInclude.py b/notebooks/databricks/CommonInclude.py index 4198219..209acf9 100644 --- a/notebooks/databricks/CommonInclude.py +++ b/notebooks/databricks/CommonInclude.py @@ -49,8 +49,7 @@ def getDataPatched( environment = config.get("Platform", "environment") client_id = config.get("Authentication", "client_id") client_secret = config.get("Authentication", "client_secret") -private_key_path = config.get("Authentication", "private_key_path") -tech_account_id = config.get("Authentication", "tech_acct_id") +scopes = config.get("Authentication", "scopes") dataset_id = config.get("Platform", "dataset_id") featurized_dataset_id = config.get("Platform", "featurized_dataset_id") scoring_dataset_id = config.get("Platform", "scoring_dataset_id") @@ -60,9 +59,6 @@ def getDataPatched( compression_type = config.get("Cloud", "compression_type") model_name = config.get("Cloud", "model_name") -if not os.path.exists(private_key_path): - raise Exception(f"Looking for private key file under {private_key_path} but key not found, please verify path") - # COMMAND ---------- @@ -82,9 +78,8 @@ def getDataPatched( aepp.configure( org_id=ims_org_id, - tech_id=tech_account_id, + scopes=scopes, secret=client_secret, - path_to_key=private_key_path, client_id=client_id, environment=environment, sandbox=sandbox_name, @@ -201,7 +196,6 @@ def get_dataset_ids_by_name(cat_conn, name): # COMMAND ---------- -from adlfs import AzureBlobFileSystem from fsspec import AbstractFileSystem def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str): @@ -219,6 +213,9 @@ def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, # COMMAND ---------- +from aepp import flowservice + +flow_conn = flowservice.FlowService() connector = aepp.connector.AdobeRequest( config_object=aepp.config.config_object, @@ -230,11 +227,7 @@ def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, aepp.config.endpoints["global"] + "/data/foundation/connectors/landingzone/credentials") -dlz_credentials = connector.getData(endpoint=dlz_endpoint, params={"type": "dlz_destination"}) -dlz_container = dlz_credentials["containerName"] -dlz_sas_token = dlz_credentials["SASToken"] -dlz_storage_account = dlz_credentials["storageAccountName"] -dlz_sas_uri = dlz_credentials["SASUri"] +dlz_credentials = flow_conn.getLandingZoneCredential()) # COMMAND ---------- diff --git a/notebooks/databricks/RunMe.py b/notebooks/databricks/RunMe.py index 719a7e5..91a2a32 100644 --- a/notebooks/databricks/RunMe.py +++ b/notebooks/databricks/RunMe.py @@ -84,7 +84,9 @@ pypi_packages = [ "PyGreSQL==5.2.5", - "adlfs==2023.8.0", + "adlfs==2023.9.0", + "fsspec==2023.9.0", + "s3fs==2023.9.0", "aepp==0.3.1.post5", "mmh3==4.0.1", "rstr==3.2.1", diff --git a/notebooks/databricks/Week2Notebook.py b/notebooks/databricks/Week2Notebook.py index 3f606cd..c6b6ccb 100644 --- a/notebooks/databricks/Week2Notebook.py +++ b/notebooks/databricks/Week2Notebook.py @@ -1349,37 +1349,75 @@ def get_or_create_query_template(template_spec): # COMMAND ---------- # MAGIC %md -# MAGIC Now that a run of our Data Flow has executed successfully, we're all set! We can do a sanity check to verify that the data indeed made its way into the DLZ. For that, we recommend setting up [Azure Storage Explorer](https://azure.microsoft.com/en-us/products/storage/storage-explorer) to connect to your DLZ container using [this guide](https://experienceleague.adobe.com/docs/experience-platform/destinations/catalog/cloud-storage/data-landing-zone.html?lang=en). To get the credentials, you can execute the code below to get the SAS URL needed: +# MAGIC Now that a run of our Data Flow has executed successfully, we're all set! We can do a sanity check to verify that the data indeed made its way into the DLZ. Based on whether DLZ was provisioned on AWS os Azure we will use a generic approach for listing directory structures. # COMMAND ---------- +import aepp +import fsspec +from aepp import flowservice -# TODO: use functionality in aepp once released -from aepp import connector - -connector = connector.AdobeRequest( - config_object=aepp.config.config_object, - header=aepp.config.header, - loggingEnabled=False, - logger=None, -) +def getDLZFSPath(credentials: dict): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_credentials = { + 'key' : credentials['credentials']['awsAccessKeyId'], + 'secret' : credentials['credentials']['awsSecretAccessKey'], + 'token' : credentials['credentials']['awsSessionToken'] + } + return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName'] + else: + abs_credentials = { + 'account_name' : credentials['storageAccountName'], + 'sas_token' : credentials['SASToken'] + } + return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName'] + +def listDLZ(fs, container, prefix): + entries = fs.ls(container, detail=True) + entries_sorted = sorted(entries, key=lambda x: x['type'], reverse=True) # Directories first + for i, entry in enumerate(entries_sorted): + entry_name = entry['name'].split('/')[-1] + if entry['type'] == 'directory': + entry_name += '/' + connector = '|-- ' if i < len(entries_sorted) - 1 else '└- ' + print(f"{prefix}{connector}{entry_name}") + if entry['type'] == 'directory': + new_prefix = prefix + ('| ' if i < len(entries_sorted) - 1 else ' ') + listDLZ(fs, entry['name'], new_prefix) -endpoint = aepp.config.endpoints["global"] + "/data/foundation/connectors/landingzone/credentials" -dlz_credentials = connector.getData(endpoint=endpoint, params={ - "type": "dlz_destination" -}) -dlz_container = dlz_credentials["containerName"] -dlz_sas_token = dlz_credentials["SASToken"] -dlz_storage_account = dlz_credentials["storageAccountName"] -dlz_sas_uri = dlz_credentials["SASUri"] -print(f"DLZ container: {dlz_container}") -print(f"DLZ storage account: {dlz_storage_account}") -print(f"DLZ SAS URL: {dlz_sas_uri}") +flow_conn = flowservice.FlowService() +credentials = flow_conn.getLandingZoneCredential(dlz_type='dlz_destination') +fs, container = getDLZFSPath(credentials) +listDLZ(fs, container, '') + # COMMAND ---------- # MAGIC %md -# MAGIC Once setup you should be able to see your featurized data as a set of Parquet files under the following directory structure: `cmle/egress/$DATASETID/exportTime=$TIMESTAMP` - see screenshot below. +# MAGIC Once setup you should be able to see your featurized data as a set of Parquet files under the following directory structure: `cmle/egress/$DATASETID/exportTime=$TIMESTAMP` - in a tree like structure: +# MAGIC ``` +# MAGIC |-- _$azuretmpfolder$/ +# MAGIC └- cmle/ +# MAGIC └- egress/ +# MAGIC └- 66018d8312377d2c68545bac/ +# MAGIC └- exportTime=20240405230601/ +# MAGIC |-- part-00000-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102384-1-c000.gz.parquet +# MAGIC |-- part-00001-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102385-1-c000.gz.parquet +# MAGIC |-- part-00002-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102386-1-c000.gz.parquet +# MAGIC |-- part-00003-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102387-1-c000.gz.parquet +# MAGIC |-- part-00004-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102388-1-c000.gz.parquet +# MAGIC |-- part-00005-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102389-1-c000.gz.parquet +# MAGIC |-- part-00006-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102390-1-c000.gz.parquet +# MAGIC |-- part-00007-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102391-1-c000.gz.parquet +# MAGIC |-- part-00008-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102392-1-c000.gz.parquet +# MAGIC |-- part-00009-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102393-1-c000.gz.parquet +# MAGIC |-- part-00010-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102394-1-c000.gz.parquet +# MAGIC |-- part-00011-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102395-1-c000.gz.parquet +# MAGIC |-- part-00012-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102396-1-c000.gz.parquet +# MAGIC |-- part-00013-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102397-1-c000.gz.parquet +# MAGIC |-- part-00014-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102398-1-c000.gz.parquet +# MAGIC └- part-00015-tid-6351713407229798623-174d2b9b-87e8-4c29-8a76-ec05b444f26a-102399-1-c000.gz.parquet +# MAGIC ``` # COMMAND ---------- @@ -1387,11 +1425,6 @@ def get_or_create_query_template(template_spec): # COMMAND ---------- -# MAGIC %md -# MAGIC ![DLZ](/files/static/7cf4bf44-5482-4426-a3b3-842be2f737b1/media/CMLE-Notebooks-Week2-ExportedDataset.png) - -# COMMAND ---------- - # MAGIC %md # MAGIC ## 4.5 Saving the featurized dataset to the configuration diff --git a/notebooks/databricks/Week3Notebook.py b/notebooks/databricks/Week3Notebook.py index 6b46d6e..1a45d01 100644 --- a/notebooks/databricks/Week3Notebook.py +++ b/notebooks/databricks/Week3Notebook.py @@ -48,35 +48,105 @@ # COMMAND ---------- -from adlfs import AzureBlobFileSystem +import fsspec from fsspec import AbstractFileSystem -abfs = AzureBlobFileSystem(account_name=dlz_storage_account, sas_token=dlz_sas_token) -export_time = get_export_time(abfs, dlz_container, export_path, featurized_dataset_id) +def getDLZFSPath(credentials: dict): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_credentials = { + 'key' : credentials['credentials']['awsAccessKeyId'], + 'secret' : credentials['credentials']['awsSecretAccessKey'], + 'token' : credentials['credentials']['awsSessionToken'] + } + return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName'] + else: + abs_credentials = { + 'account_name' : credentials['storageAccountName'], + 'sas_token' : credentials['SASToken'] + } + return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName'] + + +def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str): + featurized_data_base_path = f"{container_name}/{base_path}/{dataset_id}" + featurized_data_export_paths = fs.ls(featurized_data_base_path) + + if len(featurized_data_export_paths) == 0: + raise Exception(f"Found no exports for featurized data from dataset ID {dataset_id} under path {featurized_data_base_path}") + elif len(featurized_data_export_paths) > 1: + print(f"Found {len(featurized_data_export_paths)} exports from dataset dataset ID {dataset_id} under path {featurized_data_base_path}, using most recent one") + + featurized_data_export_path = featurized_data_export_paths[-1] + featurized_data_export_time = featurized_data_export_path.strip().split("/")[-1].split("=")[-1] + return featurized_data_export_time + + +fs, container = getDLZFSPath(res) + + +export_time = get_export_time(fs, container, export_path, featurized_dataset_id) print(f"Using featurized data export time of {export_time}") # COMMAND ---------- # MAGIC %md -# MAGIC At this point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Spark needs the following properties to be able to authenticate using SAS: +# MAGIC At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. +# MAGIC ``` +# MAGIC Based on the provisioned account Landing Zone could be either configured to use azure or aws, +# MAGIC in case of azure following properties will be used to authenticate using SAS: # MAGIC - `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`. # MAGIC - `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`. # MAGIC - `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier. -# MAGIC -# MAGIC Let's put that in practice and create a Spark dataframe containing the entire featurized data: - -# COMMAND ---------- +# MAGIC +# MAGIC in case of aws following properties will be used to access data stored in s3: +# MAGIC - `fs.s3a.access.key` and `spark.hadoop.fs.s3a.access.key` should be the s3 access key +# MAGIC - `fs.s3a.secret.key` and `spark.hadoop.fs.s3a.secret.key` should be the s3 secret +# MAGIC - `fs.s3a.session.token` and `spark.hadoop.fs.s3a.session.token` should be set to s3 session token +# MAGIC - `fs.s3a.aws.credentials.provider` and `spark.hadoop.fs.s3a.aws.credentials.provider` should be set to `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider` +# MAGIC - `fs.s3.impl` and `spark.hadoop.fs.s3.impl` should be set to `org.apache.hadoop.fs.s3a.S3AFileSystem` +# MAGIC +# MAGIC +# MAGIC The above properties are calculated based on the landing zone credentials, following util method will set these up: +# MAGIC ``` +# COMMAND ---------- + +def configureSparkSessionAndGetPath(credentials): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_key = credentials['credentials']['awsAccessKeyId'] + aws_secret = credentials['credentials']['awsSecretAccessKey'] + aws_token = credentials['credentials']['awsSessionToken'] + aws_buket = credentials['dlzPath']['bucketName'] + dlz_folder = credentials['dlzPath']['dlzFolder'] + spark.conf.set("fs.s3a.access.key", aws_key) + spark.conf.set("fs.s3a.secret.key", aws_secret) + spark.conf.set("fs.s3a.session.token", aws_token) + spark.conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") + spark.conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + spark.conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") + spark.conf.set("spark.hadoop.fs.s3a.access.key", aws_key) + spark.conf.set("spark.hadoop.fs.s3a.secret.key", aws_secret) + spark.conf.set("fs.s3a.session.token", aws_token) + return f"s3a://${aws_buket}/{dlz_folder}/" + else: + dlz_storage_account = credentials['storageAccountName'] + dlz_sas_token = credentials['SASToken'] + dlz_container = credentials['containerName'] + spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") + spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") + spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) + return f"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/" -spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") -spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") -spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) +# init spark session for provisioned DLZ and get the base path (fs3://bucket_name/folder or abfss://container@account/) +cloud_base_path = configureSparkSessionAndGetPath(credentials) -protocol = "abfss" -input_path = f"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{export_path}/{featurized_dataset_id}/exportTime={export_time}/" +input_path = cloud_base_path + f"{export_path}/{featurized_dataset_id}/exportTime={export_time}/" -dlz_input_df = spark.read.parquet(input_path).na.fill(0) +#Let's put that in practice and create a Spark dataframe containing the entire featurized data: +dlz_input_df = spark.read.parquet(input_path) dlz_input_df.printSchema() + # COMMAND ---------- # MAGIC %md diff --git a/notebooks/databricks/Week4Notebook.py b/notebooks/databricks/Week4Notebook.py index 235473a..e14b8d7 100644 --- a/notebooks/databricks/Week4Notebook.py +++ b/notebooks/databricks/Week4Notebook.py @@ -45,34 +45,97 @@ # COMMAND ---------- -from adlfs import AzureBlobFileSystem -from fsspec import AbstractFileSystem - -azure_blob_fs = AzureBlobFileSystem(account_name=dlz_storage_account, sas_token=dlz_sas_token) - -export_time = get_export_time(azure_blob_fs, dlz_container, export_path, featurized_dataset_id) +import fsspec + +def getDLZFSPath(credentials: dict): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_credentials = { + 'key' : credentials['credentials']['awsAccessKeyId'], + 'secret' : credentials['credentials']['awsSecretAccessKey'], + 'token' : credentials['credentials']['awsSessionToken'] + } + return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName'] + else: + abs_credentials = { + 'account_name' : credentials['storageAccountName'], + 'sas_token' : credentials['SASToken'] + } + return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName'] + + +def get_export_time(fs: AbstractFileSystem, container_name: str, base_path: str, dataset_id: str): + featurized_data_base_path = f"{container_name}/{base_path}/{dataset_id}" + featurized_data_export_paths = fs.ls(featurized_data_base_path) + + if len(featurized_data_export_paths) == 0: + raise Exception(f"Found no exports for featurized data from dataset ID {dataset_id} under path {featurized_data_base_path}") + elif len(featurized_data_export_paths) > 1: + print(f"Found {len(featurized_data_export_paths)} exports from dataset dataset ID {dataset_id} under path {featurized_data_base_path}, using most recent one") + + featurized_data_export_path = featurized_data_export_paths[-1] + featurized_data_export_time = featurized_data_export_path.strip().split("/")[-1].split("=")[-1] + return featurized_data_export_time + +fs, container = getDLZFSPath(credentials) + +export_time = get_export_time(fs, container, export_path, featurized_dataset_id) print(f"Using featurized data export time of {export_time}") # COMMAND ---------- # MAGIC %md -# MAGIC At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Spark needs the following properties to be able to authenticate using SAS: -# MAGIC - `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`. -# MAGIC - `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`. -# MAGIC - `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier. -# MAGIC -# MAGIC Let's put that in practice and create a Spark dataframe containing the entire featurized data: - -# COMMAND ---------- - -spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") -spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") -spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) - -protocol = "abfss" -input_path = f"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{export_path}/{featurized_dataset_id}/exportTime={export_time}/" - -dlz_input_df = spark.read.parquet(input_path).na.fill(0) +# MAGIC At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Based on the provisioned account Landing Zone could be either configured to use azure or aws, in case of azure following properties will be used to authenticate using SAS: +# MAGIC ``` +# MAGIC fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net should be set to SAS. +# MAGIC fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net should be set to org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider. +# MAGIC fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net should be set to the SAS token retrieved earlier. +# MAGIC +# MAGIC in case of aws following properties will be used to access data stored in s3: +# MAGIC +# MAGIC fs.s3a.access.key and spark.hadoop.fs.s3a.access.key should be the s3 access key +# MAGIC fs.s3a.secret.key and spark.hadoop.fs.s3a.secret.key should be the s3 secret +# MAGIC fs.s3a.session.token and spark.hadoop.fs.s3a.session.token should be set to s3 session token +# MAGIC fs.s3a.aws.credentials.provider and spark.hadoop.fs.s3a.aws.credentials.provider should be set to org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider +# MAGIC fs.s3.impl and spark.hadoop.fs.s3.impl should be set to org.apache.hadoop.fs.s3a.S3AFileSystem +# MAGIC +# MAGIC The above properties are calculated based on the landing zone credentials, following util method will set these up: +# MAGIC ``` + +# COMMAND ---------- + +def configureSparkSessionAndGetPath(credentials): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_key = credentials['credentials']['awsAccessKeyId'] + aws_secret = credentials['credentials']['awsSecretAccessKey'] + aws_token = credentials['credentials']['awsSessionToken'] + aws_buket = credentials['dlzPath']['bucketName'] + dlz_folder = credentials['dlzPath']['dlzFolder'] + spark.conf.set("fs.s3a.access.key", aws_key) + spark.conf.set("fs.s3a.secret.key", aws_secret) + spark.conf.set("fs.s3a.session.token", aws_token) + spark.conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") + spark.conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + spark.conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") + spark.conf.set("spark.hadoop.fs.s3a.access.key", aws_key) + spark.conf.set("spark.hadoop.fs.s3a.secret.key", aws_secret) + spark.conf.set("fs.s3a.session.token", aws_token) + return f"s3a://{aws_buket}/{dlz_folder}/" + else: + dlz_storage_account = credentials['storageAccountName'] + dlz_sas_token = credentials['SASToken'] + dlz_container = credentials['containerName'] + spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") + spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") + spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) + return f"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/" + +# init spark session for provisioned DLZ and get the base path (fs3://bucket_name/folder or abfss://container@account/) +cloud_base_path = configureSparkSessionAndGetPath(credentials) + +input_path = cloud_base_path + f"{export_path}/{featurized_dataset_id}/exportTime={export_time}/" + +dlz_input_df = spark.read.parquet(input_path) dlz_input_df.printSchema() # COMMAND ---------- @@ -413,12 +476,15 @@ from aepp import flowservice flow_conn = flowservice.FlowService() - dlz_credentials = flow_conn.getLandingZoneCredential() -dlz_container = dlz_credentials["containerName"] -dlz_sas_token = dlz_credentials["SASToken"] -dlz_storage_account = dlz_credentials["storageAccountName"] -dlz_sas_uri = dlz_credentials["SASUri"] + +def getDLZPath(credentials: dict): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + return credentials['dlzPath']['bucketName'] + '/' + credentials['dlzPath']['dlzFolder'] + else: + return credentials['containerName'] + +dlz_container = getDLZPath(dlz_credentials) print(dlz_container) # COMMAND ---------- @@ -602,9 +668,7 @@ # COMMAND ---------- -spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") -spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") -spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) +dlz_path = configureSparkSessionAndGetPath(dlz_credentials) # COMMAND ---------- @@ -617,7 +681,7 @@ protocol = "abfss" scoring_export_time = datetime.utcnow().strftime('%Y%m%d%H%M%S') -output_path = f"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{import_path}/{ingestion_dataset_id}/exportTime={scoring_export_time}/" +output_path = f"{dlz_path}{import_path}/{ingestion_dataset_id}/exportTime={scoring_export_time}/" output_path # COMMAND ---------- diff --git a/notebooks/databricks/Week5Notebook.py b/notebooks/databricks/Week5Notebook.py index 19a537b..0bfbbc2 100644 --- a/notebooks/databricks/Week5Notebook.py +++ b/notebooks/databricks/Week5Notebook.py @@ -38,48 +38,84 @@ # MAGIC %md # MAGIC ## 1.1 Reading the scored data # MAGIC -# MAGIC To that end we need to read the output of the scoring data that we wrote to the Data Landing Zone previously. We use the regular container `dlz-user-container`, since this is where we wrote the data. # COMMAND ---------- - -from aepp import flowservice - -flow_conn = flowservice.FlowService() - -# Note that this overrides the general DLZ destination container defined in CommonInclude. -dlz_credentials = flow_conn.getLandingZoneCredential() -dlz_container = dlz_credentials["containerName"] -dlz_sas_token = dlz_credentials["SASToken"] -dlz_storage_account = dlz_credentials["storageAccountName"] -dlz_sas_uri = dlz_credentials["SASUri"] -print(f"Reading from container {dlz_container}") - -# COMMAND ---------- - # MAGIC %md -# MAGIC At that point we're ready to read the data. We're using Spark since it could be pretty large as we're not doing any sampling. Spark needs the following properties to be able to authenticate using SAS: +# MAGIC At that point we're ready to read this data. We're using Spark since it could be pretty large as we're not doing any sampling. Based on the provisioned account Landing Zone could be either configured to use azure or aws, in case of azure following properties will be used to authenticate using SAS: +# MAGIC ``` # MAGIC - `fs.azure.account.auth.type.$ACCOUNT.dfs.core.windows.net` should be set to `SAS`. # MAGIC - `fs.azure.sas.token.provider.type.$ACCOUNT.dfs.core.windows.net` should be set to `org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider`. -# MAGIC - `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the SAS token retrieved earlier. -# MAGIC +# MAGIC - `fs.azure.sas.fixed.token.$ACCOUNT.dfs.core.windows.net` should be set to the `SAS token` retrieved earlier. +# MAGIC +# MAGIC in case of aws following properties will be used to access data stored in s3: +# MAGIC +# MAGIC - `fs.s3a.access.key` and `spark.hadoop.fs.s3a.access.key` should be the `s3 access key` +# MAGIC - `fs.s3a.secret.key` and spark.hadoop.fs.s3a.secret.key should be the `s3 secret` +# MAGIC - `fs.s3a.session.token` and spark.hadoop.fs.s3a.session.token should be set to `s3 session token` +# MAGIC - `fs.s3a.aws.credentials.provider` and `spark.hadoop.fs.s3a.aws.credentials.provider` should be set to `org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider` +# MAGIC - `fs.s3.impl` and `spark.hadoop.fs.s3.impl` should be set to `org.apache.hadoop.fs.s3a.S3AFileSystem` +# MAGIC ``` # MAGIC Let's put that in practice and create a Spark dataframe containing the entire featurized data: # COMMAND ---------- -from adlfs import AzureBlobFileSystem +import fsspec +from aepp import flowservice from fsspec import AbstractFileSystem +flow_conn = flowservice.FlowService() +dlz_credentials = flow_conn.getLandingZoneCredential() + +def configureSparkSessionAndGetPath(credentials): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_key = credentials['credentials']['awsAccessKeyId'] + aws_secret = credentials['credentials']['awsSecretAccessKey'] + aws_token = credentials['credentials']['awsSessionToken'] + aws_buket = credentials['dlzPath']['bucketName'] + dlz_folder = credentials['dlzPath']['dlzFolder'] + spark.conf.set("fs.s3a.access.key", aws_key) + spark.conf.set("fs.s3a.secret.key", aws_secret) + spark.conf.set("fs.s3a.session.token", aws_token) + spark.conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") + spark.conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + spark.conf.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") + spark.conf.set("spark.hadoop.fs.s3a.access.key", aws_key) + spark.conf.set("spark.hadoop.fs.s3a.secret.key", aws_secret) + spark.conf.set("fs.s3a.session.token", aws_token) + return f"s3a://${aws_buket}/{dlz_folder}/" + else: + dlz_storage_account = credentials['storageAccountName'] + dlz_sas_token = credentials['SASToken'] + dlz_container = credentials['containerName'] + spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") + spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") + spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) + return f"abfss://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/" + +def getDLZFSPath(credentials: dict): + if 'dlzProvider' in credentials.keys() and 'Amazon S3' in credentials['dlzProvider']: + aws_credentials = { + 'key' : credentials['credentials']['awsAccessKeyId'], + 'secret' : credentials['credentials']['awsSecretAccessKey'], + 'token' : credentials['credentials']['awsSessionToken'] + } + return fsspec.filesystem('s3', **aws_credentials), credentials['dlzPath']['bucketName'] + else: + abs_credentials = { + 'account_name' : credentials['storageAccountName'], + 'sas_token' : credentials['SASToken'] + } + return fsspec.filesystem('abfss', **abs_credentials), credentials['containerName'] + + def read_remote_scores(): - azure_blob_fs = AzureBlobFileSystem(account_name=dlz_storage_account, sas_token=dlz_sas_token) - export_time = get_export_time(azure_blob_fs, dlz_container, import_path, scoring_dataset_id) + fs, dlzfs_path = getDLZFSPath(dlz_credentials) + export_time = get_export_time(fs, dlzfs_path, import_path, scoring_dataset_id) print(f"Using featurized data export time of {export_time}") - spark.conf.set(f"fs.azure.account.auth.type.{dlz_storage_account}.dfs.core.windows.net", "SAS") - spark.conf.set(f"fs.azure.sas.token.provider.type.{dlz_storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") - spark.conf.set(f"fs.azure.sas.fixed.token.{dlz_storage_account}.dfs.core.windows.net", dlz_sas_token) - - protocol = "abfss" - input_path = f"{protocol}://{dlz_container}@{dlz_storage_account}.dfs.core.windows.net/{import_path}/{scoring_dataset_id}/exportTime={export_time}/" + dlz_path = configureSparkSessionAndGetPath(dlz_credentials) + input_path = f"{dlz_path}{import_path}/{scoring_dataset_id}/exportTime={export_time}/" remote_scoring_df = ( spark.read.format("csv")