Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supporting content for ingesting-data-with-big-query #367

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,391 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "LlrEjmtJNpuX"
},
"source": [
"# Ingesting data with BigQuery\n",
"\n",
"This notebook demonstrates how to consume data contained in BigQuery and index it into Elasticsearch. This notebook is based on the article [Ingesting data with BigQuery](https://www.elastic.co/search-labs/blog/ingesting-data-with-big-query)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GNaAN-GNO5qp"
},
"source": [
"## Installing dependencies and importing packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "qgclZayCk1Ct",
"outputId": "7da1e962-ead6-4016-b2e5-12a019885d86"
},
"outputs": [],
"source": [
"!pip install google-cloud-bigquery elasticsearch==8.16 google-auth"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "rAesontNXpLu"
},
"outputs": [],
"source": [
"from elasticsearch import Elasticsearch, exceptions\n",
"from google.cloud import bigquery\n",
"from google.colab import auth\n",
"from getpass import getpass\n",
"\n",
"import json"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NwOmnk99Pfh3"
},
"source": [
"## Declaring variables"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-4sV9fiXdBwj"
},
"source": [
"This code will create inputs where you can enter your credentials.\n",
"Here you can learn how to retrieve your Elasticsearch credentials: [Finding Your Cloud ID](https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "GVKJKfFpPWuj",
"outputId": "21c6f4e3-8cb5-4a2c-8efe-0e45c3c6b1c4"
},
"outputs": [],
"source": [
"ELASTIC_CLUSTER_ID = getpass(\"Elastic Cloud ID: \")\n",
"ELASTIC_API_KEY = getpass(\"Elastic Api Key: \")\n",
"\n",
"\n",
"# Google Cloud project name and BigQuery dataset name\n",
"PROJECT_ID = \"elasticsearch-bigquery\"\n",
"# dataset_id in format <your-project-name>.<your-dataset-name>\n",
"DATASET_ID = f\"{PROJECT_ID}.server_logs\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3O2HclcYHEsS"
},
"source": [
"## Instance a Elasticsearch client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1LWiop8NYiQF"
},
"outputs": [],
"source": [
"auth.authenticate_user()\n",
"\n",
"# Elasticsearch client\n",
"es_client = Elasticsearch(\n",
" cloud_id=ELASTIC_CLUSTER_ID,\n",
" api_key=ELASTIC_API_KEY,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "9lvPHaXjPlfu"
},
"source": [
"## Creating mappings"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "tc88YzAYw31e"
},
"outputs": [],
"source": [
"try:\n",
" es_client.indices.create(\n",
" index=\"bigquery-logs\",\n",
" body={\n",
" \"mappings\": {\n",
" \"properties\": {\n",
" \"status_code_description\": {\"type\": \"match_only_text\"},\n",
" \"status_code\": {\"type\": \"keyword\"},\n",
" \"@timestamp\": {\"type\": \"date\"},\n",
" \"ip_address\": {\"type\": \"ip\"},\n",
" \"http_method\": {\"type\": \"keyword\"},\n",
" \"endpoint\": {\"type\": \"keyword\"},\n",
" \"response_time\": {\"type\": \"integer\"},\n",
" }\n",
" }\n",
" },\n",
" )\n",
"except exceptions.RequestError as e:\n",
" if e.error == \"resource_already_exists_exception\":\n",
" print(\"Index already exists.\")\n",
" else:\n",
" raise e"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "AOtwAfPXP38Z"
},
"source": [
"## Getting data from BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "DyKtsjXRXB2S",
"outputId": "7734ad7e-cb11-42cc-b97d-e9241cab6b17"
},
"outputs": [],
"source": [
"client = bigquery.Client(project=PROJECT_ID)\n",
"# Getting tables from dataset\n",
"tables = client.list_tables(DATASET_ID)\n",
"\n",
"data = {}\n",
"\n",
"for table in tables:\n",
" # Table id must be in format <dataset_name>.<table_name>\n",
" table_id = f\"{DATASET_ID}.{table.table_id}\"\n",
"\n",
" print(f\"Processing table: {table.table_id}\")\n",
"\n",
" # Query to retrieve BigQuery tables data\n",
" query = f\"\"\"\n",
" SELECT *\n",
" FROM `{table_id}`\n",
" \"\"\"\n",
"\n",
" query_job = client.query(query)\n",
"\n",
" results = query_job.result()\n",
"\n",
" print(f\"Results for table: {table.table_id}:\")\n",
"\n",
" data[table.table_id] = []\n",
"\n",
" for row in results:\n",
" # Saving data with key=table_id\n",
" data[table.table_id].append(dict(row))\n",
" print(row)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "UAznwqXStJ39",
"outputId": "508a3255-43f7-4828-87d5-997cd04ca427"
},
"outputs": [],
"source": [
"# variable with data\n",
"logs_data = data[\"logs\"]\n",
"\n",
"\n",
"print(logs_data)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2s4Tr6wBP773"
},
"source": [
"## Indexing to Elasticsearch"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "C4goyH6ZbDJK",
"outputId": "cfb4af41-1ef1-40da-a2da-6c0aa83633d3"
},
"outputs": [],
"source": [
"bulk_data = []\n",
"\n",
"for log_entry in logs_data:\n",
" # Convert timestamp to ISO 8601 string\n",
" timestamp_iso8601 = log_entry[\"_timestamp\"].isoformat()\n",
"\n",
" # Prepare action metadata\n",
" action_metadata = {\n",
" \"index\": {\n",
" \"_index\": \"bigquery-logs\",\n",
" \"_id\": f\"{log_entry['ip_address']}-{timestamp_iso8601}\",\n",
" }\n",
" }\n",
"\n",
" # Prepare document\n",
" document = {\n",
" \"ip_address\": log_entry[\"ip_address\"],\n",
" \"status_code\": log_entry[\"status_code\"],\n",
" \"@timestamp\": timestamp_iso8601,\n",
" \"http_method\": log_entry[\"http_method\"],\n",
" \"endpoint\": log_entry[\"endpoint\"],\n",
" \"response_time\": log_entry[\"response_time\"],\n",
" \"status_code_description\": log_entry[\"status_code_description\"],\n",
" }\n",
"\n",
" # Append to bulk data\n",
" bulk_data.append(action_metadata)\n",
" bulk_data.append(document)\n",
"\n",
"print(bulk_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "WPEwsJrFbDHQ",
"outputId": "ab5904f7-21c1-4596-fb06-55569cd9eb17"
},
"outputs": [],
"source": [
"try:\n",
" # Indexing data\n",
" response = es_client.bulk(body=bulk_data)\n",
"\n",
" if response[\"errors\"]:\n",
" print(\"Errors while indexing:\")\n",
" for item in response[\"items\"]:\n",
" if \"error\" in item[\"index\"]:\n",
" print(item[\"index\"][\"error\"])\n",
" else:\n",
" print(\"Documents indexed successfully.\")\n",
"except Exception as e:\n",
" print(f\"Bulk indexing failed: {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uxix_o8LQDup"
},
"source": [
"# Searching data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "285_MwI8yknk",
"outputId": "67d43aca-b05d-43f7-c730-1fa3bc0c4662"
},
"outputs": [],
"source": [
"response = es_client.search(\n",
" index=\"bigquery-logs\",\n",
" body={\n",
" \"query\": {\"match\": {\"status_code_description\": \"error\"}},\n",
" \"sort\": [{\"@timestamp\": {\"order\": \"desc\"}}],\n",
" \"aggs\": {\"by_ip\": {\"terms\": {\"field\": \"ip_address\", \"size\": 10}}},\n",
" },\n",
")\n",
"\n",
"# Print results\n",
"formatted_json = json.dumps(response.body, indent=4)\n",
"print(formatted_json)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "S6WZMJayyzxh"
},
"source": [
"## Deleting\n",
"\n",
"Finally, we can delete the resources used to prevent them from consuming resources."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "9UcQwa41yy_x",
"outputId": "02a33d89-b57c-4273-ba93-5b6441a4f91e"
},
"outputs": [],
"source": [
"# Cleanup - Delete Index\n",
"es_client.indices.delete(index=\"bigquery-logs\", ignore=[400, 404])"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}