Skip to content

Commit cc3c936

Browse files
Notebook - S3 table with embedding (#72)
* Notebook update Signed-off-by: Andy Kwok <andy.kwok@improving.com> Update desc Signed-off-by: Andy Kwok <andy.kwok@improving.com> Update doc Signed-off-by: Andy Kwok <andy.kwok@improving.com> Update notebook Signed-off-by: Andy Kwok <andy.kwok@improving.com> Wordings Signed-off-by: Andy Kwok <andy.kwok@improving.com> Update notebook Signed-off-by: Andy Kwok <andy.kwok@improving.com> Update steps order Signed-off-by: Andy Kwok <andy.kwok@improving.com> # Conflicts: # notebooks/import_s3_table_demo.ipynb # nx_neptune/instance_management.py # nx_neptune/utils/utils.py * Rebase * Minimise diff * Update diff
1 parent f1f6084 commit cc3c936

2 files changed

Lines changed: 389 additions & 0 deletions

File tree

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Neptune Analytics Instance Management With S3 Table Embedding Projections\n",
8+
"\n",
9+
"\n",
10+
"This notebook demonstrates how embedding data stored in a data lake can be imported into Amazon Neptune Analytics and used to leverage the TopK algorithm package. \n",
11+
"\n",
12+
"The goal is to ingest embedding vectors as graph data and enable similarity search, allowing the system to identify products with similar characteristics based on their embedding representations.\n",
13+
"\n",
14+
"The content of this notebook includes:\n",
15+
"1. Download and modify the Kaggle fashion dataset, enriching it with an embedding column generated using Amazon Bedrock, and store the result in Amazon S3\n",
16+
"2. Create an Athena projection from S3 Tables bucket.\n",
17+
"3. Import the projection into Neptune Analytics.\n",
18+
"4. Run topK.byNode to search for similar products and return similarity scores\n"
19+
]
20+
},
21+
{
22+
"cell_type": "markdown",
23+
"metadata": {},
24+
"source": [
25+
"## Setup\n",
26+
"\n",
27+
"Import the necessary libraries and set up logging."
28+
]
29+
},
30+
{
31+
"cell_type": "code",
32+
"execution_count": null,
33+
"metadata": {},
34+
"outputs": [],
35+
"source": [
36+
"import asyncio\n",
37+
"import os\n",
38+
"import pandas as pd\n",
39+
"import boto3\n",
40+
"import dotenv\n",
41+
"\n",
42+
"from nx_neptune import empty_s3_bucket, instance_management, NeptuneGraph, set_config_graph_id\n",
43+
"from nx_neptune.instance_management import _execute_athena_query, _clean_s3_path\n",
44+
"from nx_neptune.utils.utils import get_stdout_logger, validate_and_get_env, _get_bedrock_embedding, read_csv,write_csv, \\\n",
45+
" push_to_s3\n",
46+
"\n",
47+
"dotenv.load_dotenv()\n",
48+
"\n",
49+
"from nx_neptune.session_manager import SessionManager\n",
50+
"\n",
51+
"# Configure logging to see detailed information about the instance creation process\n",
52+
"logger = get_stdout_logger(__name__, [\n",
53+
" 'nx_neptune.instance_management',\n",
54+
" 'nx_neptune.utils.task_future',\n",
55+
" 'nx_neptune.session_manager',\n",
56+
" 'nx_neptune.interface',\n",
57+
" __name__\n",
58+
"])"
59+
]
60+
},
61+
{
62+
"cell_type": "markdown",
63+
"metadata": {},
64+
"source": [
65+
"## Configuration\n",
66+
"\n",
67+
"Check for environment variables necessary for the notebook."
68+
]
69+
},
70+
{
71+
"cell_type": "code",
72+
"execution_count": null,
73+
"metadata": {},
74+
"outputs": [],
75+
"source": [
76+
"# Check for optional environment variables\n",
77+
"\n",
78+
"env_vars = validate_and_get_env([\n",
79+
" 'NETWORKX_S3_DATA_LAKE_BUCKET_PATH',\n",
80+
" 'NETWORKX_S3_LOG_BUCKET_PATH',\n",
81+
" 'NETWORKX_S3_IMPORT_BUCKET_PATH',\n",
82+
" 'NETWORKX_S3_TABLES_DATABASE',\n",
83+
" 'NETWORKX_S3_TABLES_TABLENAME',\n",
84+
"])\n",
85+
"\n",
86+
"(s3_location_data_lake, s3_location_log, s3_location_import, \n",
87+
" s3_tables_database, s3_tables_tablename, graph_id) = env_vars.values()\n"
88+
]
89+
},
90+
{
91+
"cell_type": "markdown",
92+
"metadata": {},
93+
"source": [
94+
"## Data Setup\n",
95+
"\n",
96+
"Fashion product data is sourced from Kaggle, using the dataset available at [kaggle](https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-small).\n",
97+
"\n",
98+
"For this demo, only the styles.csv file is required.\n",
99+
"\n",
100+
"In this section, the dataset is modified to append an additional embedding column generated using Amazon Bedrock. The enriched CSV file is then uploaded to Amazon S3 for downstream processing as part of the data lake projection workflow.\n"
101+
]
102+
},
103+
{
104+
"cell_type": "code",
105+
"execution_count": null,
106+
"metadata": {},
107+
"outputs": [],
108+
"source": [
109+
"# Download the fahsion.csv from Kaggle dataset (Only the style.csv).\n",
110+
"# https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-small\n",
111+
"data_path = \"../example/resources/styles.csv\"\n",
112+
"data_w_embedding_path = \"../example/resources/styles_embedding.csv\"\n",
113+
"\n",
114+
"athena_client = boto3.client('athena')\n",
115+
"\n",
116+
"# Read data from data path\n",
117+
"headers, rows = read_csv(data_path, 10)\n",
118+
"\n",
119+
"# Print out the data file content\n",
120+
"df = pd.DataFrame(rows)\n",
121+
"df\n",
122+
"\n"
123+
]
124+
},
125+
{
126+
"cell_type": "markdown",
127+
"metadata": {},
128+
"source": [
129+
"### Data Enrichment – Embeddings\n",
130+
"\n",
131+
"Next, an append_embedding function is applied to each row to generate an embedding vector from a subset of product attributes (ex: `masterCategory`, `subCategory`.....etc).\n",
132+
"\n",
133+
"The resulting embedding is appended as a new column in the output dataset, and will later be imported into Neptune Analytics for similarity search.\n"
134+
]
135+
},
136+
{
137+
"cell_type": "code",
138+
"execution_count": null,
139+
"metadata": {},
140+
"outputs": [],
141+
"source": [
142+
"def append_embedding(headers, rows):\n",
143+
" # Inject header\n",
144+
" fieldnames = headers + [\"embedding\"]\n",
145+
" # Inject embedding\n",
146+
" bedrock = boto3.client(\"bedrock-runtime\")\n",
147+
"\n",
148+
" # Generate vector embeddings.\n",
149+
" for row in rows:\n",
150+
" embedding = _get_bedrock_embedding(bedrock,\n",
151+
" row[\"masterCategory\"] +\n",
152+
" row[\"subCategory\"] +\n",
153+
" row[\"articleType\"] +\n",
154+
" row[\"baseColour\"])[0]\n",
155+
" row[\"embedding\"] = \";\".join(map(str, embedding))\n",
156+
"\n",
157+
" return fieldnames, rows\n",
158+
"\n",
159+
"\n",
160+
"# Add the embedding\n",
161+
"headers, rows = append_embedding(headers, rows)\n",
162+
"\n",
163+
"# Write to new csv\n",
164+
"write_csv(data_w_embedding_path, headers, rows)\n",
165+
"\n",
166+
"# Print out the data file content\n",
167+
"df = pd.DataFrame(rows)\n",
168+
"df\n"
169+
]
170+
},
171+
{
172+
"cell_type": "markdown",
173+
"metadata": {},
174+
"source": [
175+
"### Upload Dataset and Register in Athena\n",
176+
"\n",
177+
"After the embedding column is added, the enriched dataset is uploaded to Amazon S3.\n",
178+
"\n",
179+
"An external table is then created in Amazon Athena over the uploaded CSV, exposing both the original attributes and the embedding array for SQL-based access."
180+
]
181+
},
182+
{
183+
"cell_type": "code",
184+
"execution_count": null,
185+
"metadata": {},
186+
"outputs": [],
187+
"source": [
188+
"# Push to s3\n",
189+
"empty_s3_bucket(s3_location_data_lake)\n",
190+
"push_to_s3(data_w_embedding_path, _clean_s3_path(s3_location_data_lake),\"styles_embedding.csv\")\n",
191+
"\n",
192+
"# Create external data\n",
193+
"create_csv_table_stmt = f\"\"\"\n",
194+
"CREATE EXTERNAL TABLE IF NOT EXISTS {s3_tables_tablename} (\n",
195+
" `id` int,\n",
196+
" `gender` string,\n",
197+
" `masterCategory` string,\n",
198+
" `subCategory` string,\n",
199+
" `articleType` string,\n",
200+
" `baseColour` string,\n",
201+
" `season` string,\n",
202+
" `year` int,\n",
203+
" `usage` string,\n",
204+
" `productDisplayname` string,\n",
205+
" `embedding` array<float>\n",
206+
")\n",
207+
"ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n",
208+
"WITH SERDEPROPERTIES ('field.delim' = ',', 'collection.delim' = ';')\n",
209+
"STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'\n",
210+
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'\n",
211+
"LOCATION '{s3_location_data_lake}'\n",
212+
"TBLPROPERTIES ('classification' = 'csv', 'skip.header.line.count'='1');\n",
213+
"\"\"\"\n",
214+
"\n",
215+
"_execute_athena_query(athena_client, create_csv_table_stmt, s3_location_log, database=s3_tables_database)\n",
216+
"\n",
217+
"print(\"DataLake preparation completed.\")"
218+
]
219+
},
220+
{
221+
"cell_type": "markdown",
222+
"metadata": {},
223+
"source": [
224+
"## Import Data into Neptune Analytics and Perform Similarity Search\n",
225+
"\n",
226+
"A projection query is executed in Athena to select the required columns, map Neptune-compatible headers, and flatten the embedding array into a vector format.\n",
227+
"\n",
228+
"The resulting CSV is compatible with Amazon Neptune Analytics import requirements and can be ingested directly to enable vector similarity search on the graph.\n"
229+
]
230+
},
231+
{
232+
"cell_type": "code",
233+
"execution_count": null,
234+
"metadata": {},
235+
"outputs": [],
236+
"source": [
237+
"# Clear import directory\n",
238+
"empty_s3_bucket(s3_location_import)\n",
239+
"\n",
240+
"# Projection\n",
241+
"create_csv_table_stmt = f\"\"\"\n",
242+
" SELECT\n",
243+
" \"id\" AS \"~id\",\n",
244+
" \"masterCategory\" AS \"~label\",\n",
245+
" \"baseColour\" AS \"baseColour\",\n",
246+
" array_join(\n",
247+
" transform(embedding, x -> cast(x AS varchar)), ';'\n",
248+
" ) AS \"embedding:vector\"\n",
249+
" FROM {s3_tables_tablename};\n",
250+
"\"\"\"\n",
251+
"\n",
252+
"_execute_athena_query(athena_client, create_csv_table_stmt, s3_location_import, database=s3_tables_database)\n",
253+
"\n",
254+
"# Remove unnecessary .csv.metadata file generated by Athena. \n",
255+
"empty_s3_bucket(s3_location_import, file_extension=\".csv.metadata\")\n",
256+
"\n",
257+
"task_id = await instance_management.import_csv_from_s3(\n",
258+
" NeptuneGraph.from_config(set_config_graph_id(graph_id)),\n",
259+
" s3_location_import,\n",
260+
" reset_graph_ahead=True,\n",
261+
" skip_snapshot=True,\n",
262+
" )\n"
263+
]
264+
},
265+
{
266+
"cell_type": "markdown",
267+
"metadata": {},
268+
"source": [
269+
"### Inspect Embedding\n",
270+
"\n",
271+
"A simple query is used to inspect the imported embeddings by printing the first 5 floating-point values from each node’s embedding vector. \n",
272+
"\n",
273+
"This provides a quick sanity check to verify that the embedding data has been ingested and stored correctly before running similarity queries."
274+
]
275+
},
276+
{
277+
"cell_type": "code",
278+
"execution_count": null,
279+
"metadata": {},
280+
"outputs": [],
281+
"source": [
282+
"TOPK_QUERY = \"\"\"\n",
283+
" MATCH (n) \n",
284+
" CALL neptune.algo.vectors.get(n) \n",
285+
" YIELD embedding RETURN n, embedding[0..5] as embedding_first_five\n",
286+
" limit 3\n",
287+
"\"\"\"\n",
288+
"\n",
289+
"config = set_config_graph_id(graph_id)\n",
290+
"na_graph = NeptuneGraph.from_config(config)\n",
291+
"all_nodes = na_graph.execute_call(TOPK_QUERY)\n",
292+
"for n in all_nodes:\n",
293+
" print(n[\"n\"][\"~id\"] + \": \" + str(n[\"embedding_first_five\"]))"
294+
]
295+
},
296+
{
297+
"cell_type": "markdown",
298+
"metadata": {},
299+
"source": [
300+
"### Similarity Search\n",
301+
"\n",
302+
"You can now run `neptune.algo.vectors.topK.byNode` to perform similarity search using the imported embedding vectors.\n",
303+
"\n",
304+
"This query returns the top-K most similar nodes along with their similarity scores, confirming that the embeddings are correctly integrated and usable for semantic similarity search in Amazon Neptune Analytics."
305+
]
306+
},
307+
{
308+
"cell_type": "code",
309+
"execution_count": null,
310+
"metadata": {},
311+
"outputs": [],
312+
"source": [
313+
"TOPK_QUERY = \"\"\"\n",
314+
" MATCH (n) WHERE id(n) = '30805'\n",
315+
" CALL neptune.algo.vectors.topK.byNode(\n",
316+
" n, {topK: 5})\n",
317+
" YIELD node, score\n",
318+
" RETURN node, score\n",
319+
"\"\"\"\n",
320+
"\n",
321+
"config = set_config_graph_id(graph_id)\n",
322+
"na_graph = NeptuneGraph.from_config(config)\n",
323+
"all_nodes = na_graph.execute_call(TOPK_QUERY)\n",
324+
"for n in all_nodes:\n",
325+
" print(n[\"node\"][\"~id\"] + \", score:\" + str(n[\"score\"]))"
326+
]
327+
},
328+
{
329+
"cell_type": "markdown",
330+
"metadata": {},
331+
"source": [
332+
"## Conclusion\n",
333+
"\n",
334+
"This notebook demonstrated the complete lifecycle of embedding vectors—from ingestion and transformation in the data lake to being made available within Amazon Neptune Analytics for similarity search.\n",
335+
"\n",
336+
"By integrating embedding data directly into the graph, this approach enables scalable and explainable similarity queries using native graph algorithms such as TopK. This is particularly important for recommendation, product discovery, and enrichment workflows, where vector similarity needs to be combined with structured graph relationships and properties, rather than treated as an isolated retrieval step.\n",
337+
"\n",
338+
"In practice, this pattern provides a flexible foundation for building hybrid graph-and-vector analytics pipelines that can evolve alongside existing data lake architectures."
339+
]
340+
}
341+
],
342+
"metadata": {
343+
"kernelspec": {
344+
"display_name": "Python 3 (ipykernel)",
345+
"language": "python",
346+
"name": "python3"
347+
},
348+
"language_info": {
349+
"codemirror_mode": {
350+
"name": "ipython",
351+
"version": 3
352+
},
353+
"file_extension": ".py",
354+
"mimetype": "text/x-python",
355+
"name": "python",
356+
"nbconvert_exporter": "python",
357+
"pygments_lexer": "ipython3",
358+
"version": "3.13.12"
359+
}
360+
},
361+
"nbformat": 4,
362+
"nbformat_minor": 4
363+
}

nx_neptune/utils/utils.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,32 @@ def read_csv(path, limit=None):
178178
return header, rows
179179

180180

181+
def write_csv(path, headers, rows):
182+
"""Writes data to a CSV file.
183+
184+
Parameters
185+
----------
186+
path : str
187+
Path to the CSV file to write
188+
headers : list
189+
List of column names to use as CSV headers
190+
rows : list
191+
List of dictionaries, where each dictionary represents a row with
192+
column names as keys and cell values as values
193+
194+
Examples
195+
--------
196+
>>> headers = ['col1', 'col2']
197+
>>> rows = [{'col1': 'val1', 'col2': 'val2'},
198+
... {'col1': 'val3', 'col2': 'val4'}]
199+
>>> write_csv('output.csv', headers, rows)
200+
"""
201+
with open(path, "w", newline="", encoding="utf-8") as f:
202+
writer = csv.DictWriter(f, fieldnames=headers)
203+
writer.writeheader()
204+
writer.writerows(rows)
205+
206+
181207
def _get_bedrock_embedding(
182208
client, text, dimensions=256, model_id="amazon.titan-embed-text-v2:0"
183209
):

0 commit comments

Comments
 (0)