Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 363 additions & 0 deletions notebooks/import_s3_table_embedding_demo.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Neptune Analytics Instance Management With S3 Table Embedding Projections\n",
"\n",
"\n",
"This notebook demonstrates how embedding data stored in a data lake can be imported into Amazon Neptune Analytics and used to leverage the TopK algorithm package. \n",
"\n",
"The goal is to ingest embedding vectors as graph data and enable similarity search, allowing the system to identify products with similar characteristics based on their embedding representations.\n",
"\n",
"The content of this notebook includes:\n",
"1. Download and modify the Kaggle fashion dataset, enriching it with an embedding column generated using Amazon Bedrock, and store the result in Amazon S3\n",
"2. Create an Athena projection from S3 Tables bucket.\n",
"3. Import the projection into Neptune Analytics.\n",
"4. Run topK.byNode to search for similar products and return similarity scores\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup\n",
"\n",
"Import the necessary libraries and set up logging."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import os\n",
"import pandas as pd\n",
"import boto3\n",
"import dotenv\n",
"\n",
"from nx_neptune import empty_s3_bucket, instance_management, NeptuneGraph, set_config_graph_id\n",
"from nx_neptune.instance_management import _execute_athena_query, _clean_s3_path\n",
"from nx_neptune.utils.utils import get_stdout_logger, validate_and_get_env, _get_bedrock_embedding, read_csv,write_csv, \\\n",
" push_to_s3\n",
"\n",
"dotenv.load_dotenv()\n",
"\n",
"from nx_neptune.session_manager import SessionManager\n",
"\n",
"# Configure logging to see detailed information about the instance creation process\n",
"logger = get_stdout_logger(__name__, [\n",
" 'nx_neptune.instance_management',\n",
" 'nx_neptune.utils.task_future',\n",
" 'nx_neptune.session_manager',\n",
" 'nx_neptune.interface',\n",
" __name__\n",
"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Configuration\n",
"\n",
"Check for environment variables necessary for the notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check for optional environment variables\n",
"\n",
"env_vars = validate_and_get_env([\n",
" 'NETWORKX_S3_DATA_LAKE_BUCKET_PATH',\n",
" 'NETWORKX_S3_LOG_BUCKET_PATH',\n",
" 'NETWORKX_S3_IMPORT_BUCKET_PATH',\n",
" 'NETWORKX_S3_TABLES_DATABASE',\n",
" 'NETWORKX_S3_TABLES_TABLENAME',\n",
"])\n",
"\n",
"(s3_location_data_lake, s3_location_log, s3_location_import, \n",
" s3_tables_database, s3_tables_tablename, graph_id) = env_vars.values()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data Setup\n",
"\n",
"Fashion product data is sourced from Kaggle, using the dataset available at [kaggle](https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-small).\n",
"\n",
"For this demo, only the styles.csv file is required.\n",
"\n",
"In this section, the dataset is modified to append an additional embedding column generated using Amazon Bedrock. The enriched CSV file is then uploaded to Amazon S3 for downstream processing as part of the data lake projection workflow.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Download the fahsion.csv from Kaggle dataset (Only the style.csv).\n",
"# https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-small\n",
"data_path = \"../example/resources/styles.csv\"\n",
"data_w_embedding_path = \"../example/resources/styles_embedding.csv\"\n",
"\n",
"athena_client = boto3.client('athena')\n",
"\n",
"# Read data from data path\n",
"headers, rows = read_csv(data_path, 10)\n",
"\n",
"# Print out the data file content\n",
"df = pd.DataFrame(rows)\n",
"df\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Enrichment – Embeddings\n",
"\n",
"Next, an append_embedding function is applied to each row to generate an embedding vector from a subset of product attributes (ex: `masterCategory`, `subCategory`.....etc).\n",
"\n",
"The resulting embedding is appended as a new column in the output dataset, and will later be imported into Neptune Analytics for similarity search.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def append_embedding(headers, rows):\n",
" # Inject header\n",
" fieldnames = headers + [\"embedding\"]\n",
" # Inject embedding\n",
" bedrock = boto3.client(\"bedrock-runtime\")\n",
"\n",
" # Generate vector embeddings.\n",
" for row in rows:\n",
" embedding = _get_bedrock_embedding(bedrock,\n",
" row[\"masterCategory\"] +\n",
" row[\"subCategory\"] +\n",
" row[\"articleType\"] +\n",
" row[\"baseColour\"])[0]\n",
" row[\"embedding\"] = \";\".join(map(str, embedding))\n",
"\n",
" return fieldnames, rows\n",
"\n",
"\n",
"# Add the embedding\n",
"headers, rows = append_embedding(headers, rows)\n",
"\n",
"# Write to new csv\n",
"write_csv(data_w_embedding_path, headers, rows)\n",
"\n",
"# Print out the data file content\n",
"df = pd.DataFrame(rows)\n",
"df\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Upload Dataset and Register in Athena\n",
"\n",
"After the embedding column is added, the enriched dataset is uploaded to Amazon S3.\n",
"\n",
"An external table is then created in Amazon Athena over the uploaded CSV, exposing both the original attributes and the embedding array for SQL-based access."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Push to s3\n",
"empty_s3_bucket(s3_location_data_lake)\n",
"push_to_s3(data_w_embedding_path, _clean_s3_path(s3_location_data_lake),\"styles_embedding.csv\")\n",
"\n",
"# Create external data\n",
"create_csv_table_stmt = f\"\"\"\n",
"CREATE EXTERNAL TABLE IF NOT EXISTS {s3_tables_tablename} (\n",
" `id` int,\n",
" `gender` string,\n",
" `masterCategory` string,\n",
" `subCategory` string,\n",
" `articleType` string,\n",
" `baseColour` string,\n",
" `season` string,\n",
" `year` int,\n",
" `usage` string,\n",
" `productDisplayname` string,\n",
" `embedding` array<float>\n",
")\n",
"ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n",
"WITH SERDEPROPERTIES ('field.delim' = ',', 'collection.delim' = ';')\n",
"STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'\n",
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\n",
"LOCATION '{s3_location_data_lake}'\n",
"TBLPROPERTIES ('classification' = 'csv', 'skip.header.line.count'='1');\n",
"\"\"\"\n",
"\n",
"_execute_athena_query(athena_client, create_csv_table_stmt, s3_location_log, database=s3_tables_database)\n",
"\n",
"print(\"DataLake preparation completed.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Import Data into Neptune Analytics and Perform Similarity Search\n",
"\n",
"A projection query is executed in Athena to select the required columns, map Neptune-compatible headers, and flatten the embedding array into a vector format.\n",
"\n",
"The resulting CSV is compatible with Amazon Neptune Analytics import requirements and can be ingested directly to enable vector similarity search on the graph.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Clear import directory\n",
"empty_s3_bucket(s3_location_import)\n",
"\n",
"# Projection\n",
"create_csv_table_stmt = f\"\"\"\n",
" SELECT\n",
" \"id\" AS \"~id\",\n",
" \"masterCategory\" AS \"~label\",\n",
" \"baseColour\" AS \"baseColour\",\n",
" array_join(\n",
" transform(embedding, x -> cast(x AS varchar)), ';'\n",
" ) AS \"embedding:vector\"\n",
" FROM {s3_tables_tablename};\n",
"\"\"\"\n",
"\n",
"_execute_athena_query(athena_client, create_csv_table_stmt, s3_location_import, database=s3_tables_database)\n",
"\n",
"# Remove unnecessary .csv.metadata file generated by Athena. \n",
"empty_s3_bucket(s3_location_import, file_extension=\".csv.metadata\")\n",
"\n",
"task_id = await instance_management.import_csv_from_s3(\n",
" NeptuneGraph.from_config(set_config_graph_id(graph_id)),\n",
" s3_location_import,\n",
" reset_graph_ahead=True,\n",
" skip_snapshot=True,\n",
" )\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Inspect Embedding\n",
"\n",
"A simple query is used to inspect the imported embeddings by printing the first 5 floating-point values from each node’s embedding vector. \n",
"\n",
"This provides a quick sanity check to verify that the embedding data has been ingested and stored correctly before running similarity queries."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TOPK_QUERY = \"\"\"\n",
" MATCH (n) \n",
" CALL neptune.algo.vectors.get(n) \n",
" YIELD embedding RETURN n, embedding[0..5] as embedding_first_five\n",
" limit 3\n",
"\"\"\"\n",
"\n",
"config = set_config_graph_id(graph_id)\n",
"na_graph = NeptuneGraph.from_config(config)\n",
"all_nodes = na_graph.execute_call(TOPK_QUERY)\n",
"for n in all_nodes:\n",
" print(n[\"n\"][\"~id\"] + \": \" + str(n[\"embedding_first_five\"]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Similarity Search\n",
"\n",
"You can now run `neptune.algo.vectors.topK.byNode` to perform similarity search using the imported embedding vectors.\n",
"\n",
"This query returns the top-K most similar nodes along with their similarity scores, confirming that the embeddings are correctly integrated and usable for semantic similarity search in Amazon Neptune Analytics."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TOPK_QUERY = \"\"\"\n",
" MATCH (n) WHERE id(n) = '30805'\n",
" CALL neptune.algo.vectors.topK.byNode(\n",
" n, {topK: 5})\n",
" YIELD node, score\n",
" RETURN node, score\n",
"\"\"\"\n",
"\n",
"config = set_config_graph_id(graph_id)\n",
"na_graph = NeptuneGraph.from_config(config)\n",
"all_nodes = na_graph.execute_call(TOPK_QUERY)\n",
"for n in all_nodes:\n",
" print(n[\"node\"][\"~id\"] + \", score:\" + str(n[\"score\"]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"This notebook demonstrated the complete lifecycle of embedding vectors—from ingestion and transformation in the data lake to being made available within Amazon Neptune Analytics for similarity search.\n",
"\n",
"By integrating embedding data directly into the graph, this approach enables scalable and explainable similarity queries using native graph algorithms such as TopK. This is particularly important for recommendation, product discovery, and enrichment workflows, where vector similarity needs to be combined with structured graph relationships and properties, rather than treated as an isolated retrieval step.\n",
"\n",
"In practice, this pattern provides a flexible foundation for building hybrid graph-and-vector analytics pipelines that can evolve alongside existing data lake architectures."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
26 changes: 26 additions & 0 deletions nx_neptune/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,32 @@ def read_csv(path, limit=None):
return header, rows


def write_csv(path, headers, rows):
"""Writes data to a CSV file.

Parameters
----------
path : str
Path to the CSV file to write
headers : list
List of column names to use as CSV headers
rows : list
List of dictionaries, where each dictionary represents a row with
column names as keys and cell values as values

Examples
--------
>>> headers = ['col1', 'col2']
>>> rows = [{'col1': 'val1', 'col2': 'val2'},
... {'col1': 'val3', 'col2': 'val4'}]
>>> write_csv('output.csv', headers, rows)
"""
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=headers)
writer.writeheader()
writer.writerows(rows)


def _get_bedrock_embedding(
client, text, dimensions=256, model_id="amazon.titan-embed-text-v2:0"
):
Expand Down
Loading