diff --git a/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb b/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb new file mode 100644 index 000000000000..9344371b619e --- /dev/null +++ b/examples/notebooks/beam-ml/anomaly_detection/anomaly_detection_zscore.ipynb @@ -0,0 +1,653 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "view-in-github", + "colab_type": "text" + }, + "source": [ + "\"Open" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2d79fe3a-952b-478f-ba78-44cafddc91d1", + "metadata": { + "cellView": "form", + "id": "2d79fe3a-952b-478f-ba78-44cafddc91d1" + }, + "outputs": [], + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ] + }, + { + "cell_type": "markdown", + "source": [ + "# Use Apache Beam for Anomaly Detection on Streaming Data (Z-Score)\n", + "This notebook demonstrates how to perform anomaly detection on streaming data using the `AnomalyDetection` PTransform, a new feature introduced in Apache Beam 2.64.0.\n", + "\n", + "We will first generate a synthetic dataset that incorporates various types of concept drift (changes in the underlying data distribution). This data will then be published to a Pub/Sub topic to simulate a real-time input stream. Our Beam pipeline will read from this topic, apply the `AnomalyDetection` PTransform with the Z-Score algorithm, and publish the detected anomalies to a second Pub/Sub topic. Finally, we will visualize the labeled data points in an animated plot." + ], + "metadata": { + "id": "vOwDXurVLkxO" + }, + "id": "vOwDXurVLkxO" + }, + { + "cell_type": "markdown", + "source": [ + "## Preparation\n", + "To get started with this notebook, you'll need to install the Apache Beam Python SDK and its associated extras. Make sure your installation is version 2.64.0 or later." + ], + "metadata": { + "id": "UQx1jKZ2LHHR" + }, + "id": "UQx1jKZ2LHHR" + }, + { + "cell_type": "code", + "source": [ + "! pip install apache_beam[interactive,gcp]==2.64.0rc2 --quiet" + ], + "metadata": { + "collapsed": true, + "id": "SafqA6dALKvo" + }, + "id": "SafqA6dALKvo", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "To proceed, import the essential modules: matplotlib, numpy, pandas, Beam, and others as needed." + ], + "metadata": { + "id": "h5gJCJGpStSb" + }, + "id": "h5gJCJGpStSb" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8fb71376-b0eb-474b-ab51-2161dfa60e2d", + "metadata": { + "id": "8fb71376-b0eb-474b-ab51-2161dfa60e2d" + }, + "outputs": [], + "source": [ + "# Imports Required for the notebook\n", + "import json\n", + "import os\n", + "import random\n", + "import threading\n", + "import time\n", + "import warnings\n", + "from typing import Any\n", + "from typing import Iterable\n", + "from typing import Tuple\n", + "\n", + "import matplotlib.animation\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import pandas as pd\n", + "from IPython.display import HTML, Javascript\n", + "from google.api_core import retry\n", + "from google.api_core.exceptions import AlreadyExists\n", + "from google.cloud import pubsub_v1\n", + "from google.cloud.exceptions import NotFound\n", + "\n", + "import apache_beam as beam\n", + "from apache_beam.io.gcp.pubsub import PubsubMessage\n", + "from apache_beam.ml.anomaly.base import AnomalyResult\n", + "from apache_beam.ml.anomaly.base import AnomalyPrediction\n", + "from apache_beam.ml.anomaly.detectors.zscore import ZScore\n", + "from apache_beam.ml.anomaly.transforms import AnomalyDetection\n", + "from apache_beam.ml.anomaly.univariate.mean import IncSlidingMeanTracker\n", + "from apache_beam.ml.anomaly.univariate.stdev import IncSlidingStdevTracker\n", + "from apache_beam.options.pipeline_options import PipelineOptions\n", + "\n", + "# Suppress logging warnings\n", + "os.environ[\"GRPC_VERBOSITY\"] = \"ERROR\"\n", + "os.environ[\"GLOG_minloglevel\"] = \"2\"\n", + "warnings.filterwarnings('ignore')" + ] + }, + { + "cell_type": "markdown", + "source": [ + " Next, replace `` with your Google Cloud project ID." + ], + "metadata": { + "id": "C2GhkkvSUXdu" + }, + "id": "C2GhkkvSUXdu" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29cda7f0-a24e-4e74-ba6e-166413ab532c", + "metadata": { + "id": "29cda7f0-a24e-4e74-ba6e-166413ab532c" + }, + "outputs": [], + "source": [ + "# GCP-related constant are listed below\n", + "\n", + "# GCP project id\n", + "PROJECT_ID = '' # @param {type:'string'}\n", + "\n", + "SUFFIX = str(random.randint(0, 10000))\n", + "\n", + "# Pubsub topic and subscription for retrieving input data\n", + "INPUT_TOPIC = 'anomaly-input-' + SUFFIX\n", + "INPUT_SUB = INPUT_TOPIC + '-sub'\n", + "\n", + "# Pubsub topic and subscription for collecting output result\n", + "OUTPUT_TOPIC = 'anomaly-output-' + SUFFIX\n", + "OUTPUT_SUB = OUTPUT_TOPIC + '-sub'" + ] + }, + { + "cell_type": "markdown", + "source": [ + "The last preparation step needs to authenticate your Google account and authorize your Colab notebook to access Google Cloud Platform (GCP) resources associated with the project set above." + ], + "metadata": { + "id": "coZF7R8zTBsF" + }, + "id": "coZF7R8zTBsF" + }, + { + "cell_type": "code", + "source": [ + "from google.colab import auth\n", + "auth.authenticate_user(project_id=PROJECT_ID)" + ], + "metadata": { + "id": "51jml7JvMpbD" + }, + "id": "51jml7JvMpbD", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "0064575d-5e60-4f8b-a970-9dc39db8d331", + "metadata": { + "id": "0064575d-5e60-4f8b-a970-9dc39db8d331" + }, + "source": [ + "## Generating Synthetic Data with Concept Drifting" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "37c1613e-e2ef-4f2c-8999-cce01563e180", + "metadata": { + "id": "37c1613e-e2ef-4f2c-8999-cce01563e180" + }, + "outputs": [], + "source": [ + "# The size of a segment in the synthetic data set. Each segment represents\n", + "# a collection of data points generated from either a fixed distribution\n", + "# or a drift from one distribution to another.\n", + "# The idea is inspired by https://github.com/yixiaoma666/SCAR.\n", + "seg_size = 200\n", + "\n", + "# The ratio of global outliers introduced in the synthetic data set.\n", + "outlier_ratio = 0.01\n", + "\n", + "# Random seed\n", + "seed = 1234\n", + "\n", + "np.random.seed(seed)\n", + "\n", + "# starting from a fixed distribution\n", + "data_seg1 = np.random.normal(loc=0, scale=1, size=seg_size)\n", + "\n", + "# a sudden change between data_seg1 and data_seg2\n", + "data_seg2 = np.random.normal(loc=3, scale=3, size=seg_size)\n", + "\n", + "# a gradual change in data_seg3\n", + "data_seg3 = []\n", + "for i in range(seg_size):\n", + " prob = 1 - 1.0 * i / seg_size\n", + " c = np.random.choice(2, 1, p = [prob, 1 - prob])\n", + " if c == 0:\n", + " data_seg3.append(np.random.normal(loc=3, scale=3, size=1))\n", + " else:\n", + " data_seg3.append(np.random.normal(loc=0, scale=1, size=1))\n", + "data_seg3 = np.array(data_seg3).ravel()\n", + "\n", + "# an incremental change in data_seg4\n", + "data_seg4 = []\n", + "for i in range(seg_size):\n", + " loc = 0 + 3.0 * i / seg_size\n", + " scale = 1 + 2.0 * i / seg_size\n", + " data_seg4.append(np.random.normal(loc=loc, scale=scale, size=1))\n", + "data_seg4 = np.array(data_seg4).ravel()\n", + "\n", + "# back to a fixed distribution\n", + "data_seg5 = np.random.normal(loc=3, scale=3, size=seg_size)\n", + "\n", + "data = np.concatenate((data_seg1, data_seg2, data_seg3, data_seg4, data_seg5))\n", + "\n", + "# adding outliers\n", + "outlier_idx = np.random.choice(len(data), size=int(outlier_ratio * len(data)), replace = False)\n", + "\n", + "for idx in outlier_idx:\n", + " data[idx] = np.random.normal(loc=15, scale=1, size=1).item()\n", + "\n", + "df = pd.Series(data, name='f1')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699", + "metadata": { + "id": "8e6f4f59-c6e5-4991-84d9-14eab18eb699" + }, + "outputs": [], + "source": [ + "plt.figure(figsize=(12, 4))\n", + "plt.xlim(0, 1000)\n", + "plt.ylim(-10, 20)\n", + "plt.scatter(x=range(len(df)), y=df, s=2)" + ] + }, + { + "cell_type": "markdown", + "id": "32e7cdf4-a795-47d1-b5f1-9ae5e924a427", + "metadata": { + "id": "32e7cdf4-a795-47d1-b5f1-9ae5e924a427" + }, + "source": [ + "## Setting Up Input/Output Pubsubs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11017009-f97e-4805-9cbb-6a9d4ddb68d3", + "metadata": { + "id": "11017009-f97e-4805-9cbb-6a9d4ddb68d3" + }, + "outputs": [], + "source": [ + "def create_topic_if_not_exists(project_id:str, topic_name:str, enable_message_ordering=False):\n", + " if enable_message_ordering:\n", + " # see https://cloud.google.com/pubsub/docs/ordering#python for details.\n", + " publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)\n", + " # Sending messages to the same region ensures they are received in order\n", + " client_options = {\"api_endpoint\": \"us-east1-pubsub.googleapis.com:443\"}\n", + " publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options, client_options=client_options)\n", + " else:\n", + " publisher = pubsub_v1.PublisherClient()\n", + "\n", + " topic_path = publisher.topic_path(project_id, topic_name)\n", + " try:\n", + " topic = publisher.create_topic(request={\"name\": topic_path})\n", + " print(f\"Created topic: {topic.name}\")\n", + " except AlreadyExists:\n", + " print(f\"Topic {topic_path} already exists.\")\n", + "\n", + " return publisher\n", + "\n", + "def create_subscription_if_not_exists(project_id:str, subscription_name:str, topic_name: str, enable_message_ordering=False):\n", + " topic_path = pubsub_v1.PublisherClient.topic_path(project_id, topic_name)\n", + " subscriber = pubsub_v1.SubscriberClient()\n", + " subscription_path = subscriber.subscription_path(project_id, subscription_name)\n", + " try:\n", + " subscription = subscriber.create_subscription(\n", + " request={\"name\": subscription_path, \"topic\": topic_path, \"enable_message_ordering\": enable_message_ordering}\n", + " )\n", + " print(f\"Created subscription: {subscription.name}\")\n", + " except AlreadyExists:\n", + " print(f\"Subscription {subscription_path} already exists.\")\n", + "\n", + " return subscriber" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce", + "metadata": { + "id": "66784c36-9f9e-410e-850b-3b8da29ff5ce" + }, + "outputs": [], + "source": [ + "# for input data\n", + "input_publisher = create_topic_if_not_exists(PROJECT_ID, INPUT_TOPIC, True)\n", + "create_subscription_if_not_exists(PROJECT_ID, INPUT_SUB, INPUT_TOPIC, True)\n", + "\n", + "# for output data\n", + "create_topic_if_not_exists(PROJECT_ID, OUTPUT_TOPIC)\n", + "output_subscriber = create_subscription_if_not_exists(PROJECT_ID, OUTPUT_SUB, OUTPUT_TOPIC)" + ] + }, + { + "cell_type": "markdown", + "id": "dc4afa04-fb39-40cd-a8d7-f9d1c461648a", + "metadata": { + "id": "dc4afa04-fb39-40cd-a8d7-f9d1c461648a" + }, + "source": [ + "## Publishing Input to Pub/Sub" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9", + "metadata": { + "id": "907f2469-1687-4ef3-bafd-9c4ec963b7e9" + }, + "outputs": [], + "source": [ + "def publish_data(publisher, project_id: str, topic: str, data: Iterable[Any], delay=0.01, enable_message_ordering=False) -> None:\n", + " topic_path = publisher.topic_path(project_id, topic)\n", + " for i in range(len(data)):\n", + " kv = {\"f1\": data.iloc[i]}\n", + " kv[\"id\"] = i # add event id\n", + " msg = json.dumps(kv).encode('utf-8')\n", + " if enable_message_ordering:\n", + " # set ordering key to a fixed string so messages with the same ordering key will be published in order\n", + " publisher.publish(topic_path, data=msg, ordering_key=\"my-order-key\")\n", + " else:\n", + " publisher.publish(topic_path, data=msg)\n", + " time.sleep(delay)\n", + "\n", + "publisher_thread = threading.Thread(\n", + " target=publish_data,\n", + " args=(input_publisher, PROJECT_ID, INPUT_TOPIC, df, 0.001, True),\n", + ")\n", + "publisher_thread.start()\n", + "print(f\"Started to publish data to {INPUT_TOPIC}\")" + ] + }, + { + "cell_type": "markdown", + "source": [ + "## Launching the Beam Pipeline" + ], + "metadata": { + "id": "9RjcaxzDN5Tv" + }, + "id": "9RjcaxzDN5Tv" + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0", + "metadata": { + "scrolled": true, + "id": "6e8b069d-9d94-4938-a87e-bd5f9f7620c0" + }, + "outputs": [], + "source": [ + "def message_to_beam_row(msg: bytes) -> beam.Row:\n", + " try:\n", + " r = beam.Row(**json.loads(msg.decode('utf-8')))\n", + " except Exception as e:\n", + " print(\"Wrong msg: %s\" % msg)\n", + " print(e)\n", + " return r\n", + "\n", + "def keyed_result_to_message(t: Tuple[Any, AnomalyResult]) -> bytes:\n", + " idx = t[1].example.id\n", + " value = t[1].example.f1\n", + " label = next(iter(t[1].predictions)).label\n", + " return json.dumps({\"id\":idx, \"value\": value, \"label\": label}).encode('utf-8')\n", + "\n", + "def run_beam_pipeline():\n", + " input_sub = pubsub_v1.SubscriberClient.subscription_path(PROJECT_ID, INPUT_SUB)\n", + " output_topic = pubsub_v1.PublisherClient.topic_path(PROJECT_ID, OUTPUT_TOPIC)\n", + "\n", + " options = PipelineOptions([\"--streaming\"])\n", + " with beam.Pipeline(options=options) as p:\n", + " _ = (p | beam.io.ReadFromPubSub(subscription=input_sub)\n", + " | \"Convert Pubsub Messages to Rows\" >> beam.Map(message_to_beam_row).with_output_types(beam.Row)\n", + " | \"Add key\" >> beam.Map(lambda x: (0, x)).with_output_types(Tuple[int, beam.Row])\n", + " | AnomalyDetection(ZScore(features=[\"f1\"], sub_stat_tracker=IncSlidingMeanTracker(100),\n", + " stdev_tracker=IncSlidingStdevTracker(100)))\n", + " | \"Convert output to Pubsub Messages\" >> beam.Map(keyed_result_to_message)\n", + " | beam.io.WriteToPubSub(topic=output_topic, with_attributes=False)\n", + " )\n", + "\n", + "pipeline_thread = threading.Thread(\n", + " target=run_beam_pipeline,\n", + ")\n", + "\n", + "pipeline_thread.start()\n", + "print(f\"Started to run beam pipeline for anomaly detection\")" + ] + }, + { + "cell_type": "markdown", + "id": "1b785e34-a035-4148-9b58-f364ce0aed08", + "metadata": { + "id": "1b785e34-a035-4148-9b58-f364ce0aed08" + }, + "source": [ + "## Collecting Results and Plotting" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4ca8af3-d74c-4d95-aeba-c34cb791525f", + "metadata": { + "id": "e4ca8af3-d74c-4d95-aeba-c34cb791525f" + }, + "outputs": [], + "source": [ + "x = []\n", + "y = []\n", + "c = []" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6bf369f-2b20-4834-b457-e9b1f0a596ca", + "metadata": { + "id": "b6bf369f-2b20-4834-b457-e9b1f0a596ca" + }, + "outputs": [], + "source": [ + "def collect_result(subscriber):\n", + " subscription_path = pubsub_v1.SubscriberClient.subscription_path(PROJECT_ID, OUTPUT_SUB)\n", + "\n", + " NUM_MESSAGES = 100\n", + " while True:\n", + " response = subscriber.pull(\n", + " request={\"subscription\": subscription_path, \"max_messages\": NUM_MESSAGES},\n", + " retry=retry.Retry(deadline=300),\n", + " )\n", + "\n", + " ack_ids = []\n", + " for received_message in response.received_messages:\n", + " ack_ids.append(received_message.ack_id)\n", + " msg = json.loads(received_message.message.data.decode('utf-8'))\n", + " x.append(msg['id'])\n", + " y.append(msg['value'])\n", + " c.append('red' if msg['label'] == 1 else 'green')\n", + "\n", + " if len(ack_ids) > 0:\n", + " # Acknowledges the received messages so they will not be sent again.\n", + " subscriber.acknowledge(\n", + " request={\"subscription\": subscription_path, \"ack_ids\": ack_ids}\n", + " )\n", + "\n", + "result_thread = threading.Thread(\n", + " target=collect_result,\n", + " args=(output_subscriber,),\n", + ")\n", + "\n", + "result_thread.start()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a3433ea1-70ae-408d-84b2-27118a3fd898", + "metadata": { + "id": "a3433ea1-70ae-408d-84b2-27118a3fd898" + }, + "outputs": [], + "source": [ + "print(len(x))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31f24bfc-b91d-4e67-b804-732dc65e7525", + "metadata": { + "id": "31f24bfc-b91d-4e67-b804-732dc65e7525" + }, + "outputs": [], + "source": [ + "# Refresh this cell as more results will be retrieved from the output pubsub\n", + "\n", + "matplotlib.rcParams['animation.embed_limit'] = 300\n", + "\n", + "data = np.array(list(zip(x,y)))\n", + "\n", + "fig, ax = plt.subplots()\n", + "fig.set_size_inches(12, 4)\n", + "\n", + "ax.axis([0,1000,-10,20])\n", + "l = ax.scatter([],[])\n", + "l.set_sizes([3])\n", + "\n", + "def animate(i):\n", + " i = i * 10\n", + " l.set_offsets(data[:i+1])\n", + " l.set_color(c)\n", + "\n", + "plt.close() # to avoid extra frame after animation\n", + "\n", + "ani = matplotlib.animation.FuncAnimation(fig, animate, frames=int(len(x)/10), interval=50, repeat=False)\n", + "display(HTML(ani.to_jshtml()))\n", + "time.sleep(1)\n", + "\n", + "# auto start the animation\n", + "Javascript('document.querySelector(\".anim-buttons > button:nth-child(6)\").click()')" + ] + }, + { + "cell_type": "markdown", + "id": "304dac43-2f3b-4a7b-861a-39666bc258c9", + "metadata": { + "id": "304dac43-2f3b-4a7b-861a-39666bc258c9" + }, + "source": [ + "## Cleaning Up Resources" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2", + "metadata": { + "id": "350b8b1a-3010-4ecd-924f-010308bb5eb2" + }, + "outputs": [], + "source": [ + "# deleting input and output subscriptions\n", + "subscriber = pubsub_v1.SubscriberClient()\n", + "with subscriber:\n", + " try:\n", + " subscription_path = subscriber.subscription_path(PROJECT_ID, INPUT_SUB)\n", + " subscriber.delete_subscription(request={\"subscription\": subscription_path})\n", + " print(f\"Input subscription deleted: {subscription_path}.\")\n", + " except NotFound:\n", + " pass\n", + "\n", + " try:\n", + " subscription_path = subscriber.subscription_path(PROJECT_ID, OUTPUT_SUB)\n", + " subscriber.delete_subscription(request={\"subscription\": subscription_path})\n", + " print(f\"Output subscription deleted: {subscription_path}.\")\n", + " except NotFound:\n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10dc95cf-94ab-4a51-882b-88559340d4d2", + "metadata": { + "id": "10dc95cf-94ab-4a51-882b-88559340d4d2" + }, + "outputs": [], + "source": [ + "# deleting input and output topics\n", + "publisher = pubsub_v1.PublisherClient()\n", + "with publisher:\n", + " try:\n", + " topic_path = publisher.topic_path(PROJECT_ID, INPUT_TOPIC)\n", + " publisher.delete_topic(request={\"topic\": topic_path})\n", + " print(f\"Input topic deleted: {topic_path}\")\n", + " except NotFound:\n", + " pass\n", + "\n", + " try:\n", + " topic_path = publisher.topic_path(PROJECT_ID, OUTPUT_TOPIC)\n", + " publisher.delete_topic(request={\"topic\": topic_path})\n", + " print(f\"Output topic deleted: {topic_path}\")\n", + " except NotFound:\n", + " pass" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.8" + }, + "colab": { + "provenance": [], + "include_colab_link": true + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file