Skip to content

Commit f1f6084

Browse files
Notebook - OpenSearch embedding enrichment (#102)
* Update util * Add notebook * Simplify embedding handling * Revert notebook * Refactor * Update dep * Update notebook Diff
1 parent 22d0f08 commit f1f6084

3 files changed

Lines changed: 438 additions & 4 deletions

File tree

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Neptune Analytics Instance Management With S3 Table and OpenSearch Embedding Projections\n",
8+
"This notebook demonstrates an end-to-end workflow where vector embeddings stored in OpenSearch are joined with data lake tabular data (an Iceberg table in S3 Tables), then imported into Amazon Neptune Analytics to perform a TopK similarity search.\n",
9+
"\n",
10+
"### Prerequisite\n",
11+
"\n",
12+
"To enable querying OpenSearch embeddings from Amazon Athena, the Athena OpenSearch Connector must first be deployed in your AWS account.\n",
13+
"Deployment and setup instructions are available in the following resources:\n",
14+
"\n",
15+
"**Installation guide**\n",
16+
"\n",
17+
"https://aws.amazon.com/blogs/big-data/query-data-in-amazon-opensearch-service-using-sql-from-amazon-athena/\n",
18+
"\n",
19+
"**Official documentation**\n",
20+
"\n",
21+
"https://docs.aws.amazon.com/athena/latest/ug/connectors-opensearch.html\n",
22+
"\n",
23+
"### Current Limitations\n",
24+
"\n",
25+
"At the time of writing, the Athena OpenSearch Connector supports embedding vectors stored as multi-value float fields (for example, arrays of floating-point values).\n",
26+
"\n",
27+
"The connector does not yet support the native knn_vector field type in OpenSearch. Queries against indices containing knn_vector fields may fail during schema resolution.\n",
28+
"\n",
29+
"Support for the native knn_vector type is currently under development, see \n",
30+
"[here](https://github.com/awslabs/aws-athena-query-federation/issues/3315).\n",
31+
"\n",
32+
"### What this notebook covers:\n",
33+
"1. Download the [Kaggle fashion dataset](https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-dataset?resource=download), generate embeddings from item attributes using Amazon Bedrock, and write the embeddings to OpenSearch index.\n",
34+
"\n",
35+
"2. Upload the original Kaggle dataset to simulate a source dataset from an enterprise data lake\n",
36+
"3. Use Amazon Athena to create a projection that joins the S3 Tables dataset with OpenSearch index embeddings to enrich the data\n",
37+
"4. Import the enriched projection into Amazon Neptune Analytics\n",
38+
"5. Run `topK.byNode` to retrieve similar products and return similarity scores\n"
39+
]
40+
},
41+
{
42+
"cell_type": "markdown",
43+
"metadata": {},
44+
"source": [
45+
"## Setup\n",
46+
"\n",
47+
"Import the necessary libraries and set up logging."
48+
]
49+
},
50+
{
51+
"cell_type": "code",
52+
"execution_count": null,
53+
"metadata": {},
54+
"outputs": [],
55+
"source": [
56+
"import pandas as pd\n",
57+
"import dotenv\n",
58+
"\n",
59+
"from nx_neptune import empty_s3_bucket, instance_management, NeptuneGraph, set_config_graph_id\n",
60+
"from nx_neptune.instance_management import execute_athena_query, _clean_s3_path\n",
61+
"from nx_neptune.utils.utils import get_stdout_logger, validate_and_get_env, read_csv, \\\n",
62+
" push_to_s3, to_embedding_entries, generate_create_table_ddl, generate_projection_stmt, \\\n",
63+
" push_to_opensearch\n",
64+
"\n",
65+
"from nx_neptune.session_manager import SessionManager\n",
66+
"\n",
67+
"# Configure logging to see detailed information about the instance creation process\n",
68+
"logger = get_stdout_logger(__name__, [\n",
69+
" 'nx_neptune.instance_management',\n",
70+
" 'nx_neptune.utils.task_future',\n",
71+
" 'nx_neptune.session_manager',\n",
72+
" 'nx_neptune.interface',\n",
73+
" __name__\n",
74+
"])"
75+
]
76+
},
77+
{
78+
"cell_type": "markdown",
79+
"metadata": {},
80+
"source": [
81+
"## Configuration\n",
82+
"\n",
83+
"Check for environment variables necessary for the notebook."
84+
]
85+
},
86+
{
87+
"cell_type": "code",
88+
"execution_count": null,
89+
"metadata": {},
90+
"outputs": [],
91+
"source": [
92+
"# Check for optional environment variables\n",
93+
"dotenv.load_dotenv()\n",
94+
"env_vars = validate_and_get_env([\n",
95+
" 'NETWORKX_S3_DATA_LAKE_BUCKET_PATH',\n",
96+
" 'NETWORKX_S3_NA_IMPORT_BUCKET_PATH',\n",
97+
" 'NETWORKX_S3_LOG_BUCKET_PATH',\n",
98+
" 'NETWORKX_S3_TABLES_DATABASE',\n",
99+
" 'NETWORKX_S3_TABLES_TABLENAME',\n",
100+
" 'NETWORKX_GRAPH_ID',\n",
101+
" 'OPEN_SEARCH_CONNECTOR',\n",
102+
" 'OPEN_SEARCH_ENDPOINT',\n",
103+
" 'OPEN_SEARCH_INDEX',\n",
104+
"])\n",
105+
"\n",
106+
"(s3_location_data_lake, s3_location_na_import, s3_location_log, \n",
107+
" s3_tables_database, s3_tables_tablename, graph_id, \n",
108+
" opensearch_connector, opensearch_endpoint, opensearch_index) = env_vars.values()\n"
109+
]
110+
},
111+
{
112+
"cell_type": "markdown",
113+
"metadata": {},
114+
"source": [
115+
"## Test Data Setup\n",
116+
"\n",
117+
"The fashion product dataset used in this demo is sourced from Kaggle: [kaggle](https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-small).\n",
118+
"\n",
119+
"For this workflow, only the styles.csv file is required.\n",
120+
"\n",
121+
"In this section, we prepare two parallel data paths to simulate a realistic enterprise architecture:\n",
122+
"\n",
123+
"1. **Structured Data Lake Source**\n",
124+
"\n",
125+
" The original fashion dataset is uploaded to Amazon S3 to simulate a typical Iceberg-backed table in an enterprise data lake.\n",
126+
"\n",
127+
"2. **Embedding Generation Path**\n",
128+
"\n",
129+
" A subset of rows is extracted from the dataset to generate embedding vectors. These embeddings are then stored in OpenSearch cluster, simulating a vector enrichment workflow.\n",
130+
"\n",
131+
"This separation reflects a common production pattern where:\n",
132+
"* Structured business data resides in the data lake\n",
133+
"* Embeddings are generated asynchronously\n",
134+
"* Vector data is stored independently and later joined during query time\n"
135+
]
136+
},
137+
{
138+
"cell_type": "code",
139+
"execution_count": null,
140+
"metadata": {},
141+
"outputs": [],
142+
"source": [
143+
"# Download the styles.csv from Kaggle dataset (Only the style.csv).\n",
144+
"# https://www.kaggle.com/datasets/paramaggarwal/fashion-product-images-small\n",
145+
"data_path = \"../example/resources/styles.csv\"\n",
146+
"\n",
147+
"# Read data from data path\n",
148+
"_, rows = read_csv(data_path)\n",
149+
"\n",
150+
"# Inspect test data content.\n",
151+
"df = pd.DataFrame(rows)\n",
152+
"# To show the table on notebook.\n",
153+
"df"
154+
]
155+
},
156+
{
157+
"cell_type": "markdown",
158+
"metadata": {},
159+
"source": [
160+
"### Test Data Upload – Embeddings\n",
161+
"\n",
162+
"In this step, each selected row from the dataset is transformed to generate an embedding vector based on a subset of relevant product attributes.\n",
163+
"\n",
164+
"The embedding is generated using Amazon Bedrock, then converted into a format compatible with OpenSearch service.\n",
165+
"\n",
166+
"The resulting vector entries are subsequently uploaded to the OpenSearch service, where they can later be queried and joined with structured data during Athena projection.\n"
167+
]
168+
},
169+
{
170+
"cell_type": "code",
171+
"execution_count": null,
172+
"metadata": {},
173+
"outputs": [],
174+
"source": [
175+
"# Add the embedding\n",
176+
"columns_to_embed = [\"masterCategory\", \"subCategory\", \"articleType\",\n",
177+
" \"baseColour\", \"season\", \"year\", \"usage\", \"productDisplayName\"]\n",
178+
"embedding_limit = 200\n",
179+
"\n",
180+
"# Only produce embedding for first n items\n",
181+
"items = to_embedding_entries(rows[:embedding_limit], columns_to_embed)\n",
182+
"print(\"Embedding data preparation - Completed\")\n",
183+
"\n",
184+
"# Writing embedding OpenSearch\n",
185+
"push_to_opensearch(items, opensearch_index, recreate=True)\n",
186+
"print(\"Pusing data to OpenSearch - Completed\")"
187+
]
188+
},
189+
{
190+
"cell_type": "markdown",
191+
"metadata": {},
192+
"source": [
193+
"### Test Data Upload – Business Dataset (Data Lake Simulation)\n",
194+
"\n",
195+
"In this step, the original styles.csv file is uploaded to Amazon S3 to simulate structured business data stored in a data lake.\n",
196+
"\n",
197+
"An external table is then created in Amazon Athena using the defined schema, making the dataset queryable via SQL.\n",
198+
"\n",
199+
"After this completes, the business data is ready to be joined with embedding data for enrichment."
200+
]
201+
},
202+
{
203+
"cell_type": "code",
204+
"execution_count": null,
205+
"metadata": {},
206+
"outputs": [],
207+
"source": [
208+
"# Push to s3\n",
209+
"empty_s3_bucket(s3_location_data_lake)\n",
210+
"push_to_s3(data_path, _clean_s3_path(s3_location_data_lake), \"styles.csv\")\n",
211+
"\n",
212+
"# Create the table representation on Athena with selected set of attributes\n",
213+
"columns = [\n",
214+
" (\"id\", \"string\"),\n",
215+
" (\"gender\", \"string\"),\n",
216+
" (\"masterCategory\", \"string\"),\n",
217+
" (\"subCategory\", \"string\"),\n",
218+
" (\"articleType\", \"string\"),\n",
219+
" (\"baseColour\", \"string\"),\n",
220+
" (\"season\", \"string\"),\n",
221+
" (\"year\", \"int\"),\n",
222+
" (\"usage\", \"string\"),\n",
223+
" (\"productDisplayName\", \"string\")\n",
224+
"]\n",
225+
"stmt_s3_table = generate_create_table_ddl(s3_tables_tablename, s3_location_data_lake, columns)\n",
226+
"\n",
227+
"await execute_athena_query(stmt_s3_table, s3_location_log, database=s3_tables_database)\n",
228+
"\n",
229+
"print(\"DataLake preparation completed.\")"
230+
]
231+
},
232+
{
233+
"cell_type": "markdown",
234+
"metadata": {},
235+
"source": [
236+
"### Data Enrichment and Graph Import\n",
237+
"\n",
238+
"In this step, Amazon Athena is used to join the structured data lake table with embeddings stored in OpenSearch service.\n",
239+
"\n",
240+
"The enriched projection (including IDs, attributes, and embeddings) is written to S3 in a CSV format compatible with Amazon Neptune Analytics import requirements, cleaned, and then imported into Neptune Analytics.\n",
241+
"\n",
242+
"After completion, the graph contains nodes enriched with embedding vectors, ready for similarity search.\n",
243+
"\n",
244+
" **Note:** If the Athena federated connector is not configured properly (e.g., the Lambda \n",
245+
"function does not exist or the connector name is incorrect), you will receive a \n",
246+
"**GENERIC_USER_ERROR** with a **ResourceNotFoundException** indicating the Lambda function was not \n",
247+
"found. Ensure the connector Lambda function is deployed and the connector name in your query \n",
248+
"matches the registered function name."
249+
]
250+
},
251+
{
252+
"cell_type": "code",
253+
"execution_count": null,
254+
"metadata": {},
255+
"outputs": [],
256+
"source": [
257+
"# Clear import directory\n",
258+
"empty_s3_bucket(s3_location_na_import)\n",
259+
"\n",
260+
"# Generate SQL statement to join tabuler data with embedding entries\n",
261+
"open_search_table_ref=f'\"lambda:{opensearch_connector}\".\"default\".\"{opensearch_index}\"'\n",
262+
"stmt_projection = generate_projection_stmt(\n",
263+
" col_id=\"t.id\",\n",
264+
" col_label=\"t.masterCategory\",\n",
265+
" col_embedding=\"v.embedding\",\n",
266+
" columns=[\"t.gender\", \"t.subCategory\", \"t.articleType\",\n",
267+
" \"t.baseColour\", \"t.season\", \"t.year\",\n",
268+
" \"t.usage\", \"t.productDisplayName\"],\n",
269+
" base_table=\"test_embedding_table as t\",\n",
270+
" joins=[(f\"{open_search_table_ref} v\", \"t.id = v.id\")])\n",
271+
"\n",
272+
"await execute_athena_query(stmt_projection, s3_location_na_import, database=s3_tables_database)\n",
273+
"\n",
274+
"# Remove unnecessary .csv.metadata file generated by Athena. \n",
275+
"empty_s3_bucket(s3_location_na_import, file_extension=\".csv.metadata\")\n",
276+
"\n",
277+
"task_id = await instance_management.import_csv_from_s3(\n",
278+
" NeptuneGraph.from_config(set_config_graph_id(graph_id)),\n",
279+
" s3_location_na_import)"
280+
]
281+
},
282+
{
283+
"cell_type": "markdown",
284+
"metadata": {},
285+
"source": [
286+
"### Inspect Embeddings\n",
287+
"\n",
288+
"A simple query is used to inspect the imported embeddings by printing the first 5 floating-point values from each node’s embedding vector. \n",
289+
"\n",
290+
"This provides a quick sanity check to verify that the embedding data has been ingested and stored correctly before running similarity queries."
291+
]
292+
},
293+
{
294+
"cell_type": "code",
295+
"execution_count": null,
296+
"metadata": {},
297+
"outputs": [],
298+
"source": [
299+
"config = set_config_graph_id(graph_id)\n",
300+
"na_graph = NeptuneGraph.from_config(config)\n",
301+
"\n",
302+
"SHOW_EMBEDDING_QUERY = \"\"\"\n",
303+
" MATCH (n) \n",
304+
" CALL neptune.algo.vectors.get(n) \n",
305+
" YIELD embedding RETURN n, embedding[0..5] as embedding_first_five\n",
306+
" limit 3\n",
307+
"\"\"\"\n",
308+
"\n",
309+
"all_nodes = na_graph.execute_call(SHOW_EMBEDDING_QUERY)\n",
310+
"for n in all_nodes:\n",
311+
" print(n[\"n\"][\"~id\"] + \": \" + str(n[\"embedding_first_five\"]))"
312+
]
313+
},
314+
{
315+
"cell_type": "markdown",
316+
"metadata": {},
317+
"source": [
318+
"### Similarity Search\n",
319+
"\n",
320+
"You can now run `neptune.algo.vectors.topK.byNode` to perform similarity search using the imported embedding vectors.\n",
321+
"\n",
322+
"The following query first retrieves the embedding of the node with ID 30805, then executes the Top-K algorithm to identify other products that share similar semantic characteristics. Unlike conventional exact-match search, this approach can surface related items even when their attributes do not match exactly.\n",
323+
"\n",
324+
"This confirms that the embeddings have been successfully imported and integrated into Amazon Neptune Analytics, and they are fully usable for semantic similarity search."
325+
]
326+
},
327+
{
328+
"cell_type": "code",
329+
"execution_count": null,
330+
"metadata": {},
331+
"outputs": [],
332+
"source": [
333+
"TOPK_QUERY = \"\"\"\n",
334+
" MATCH (n) WHERE id(n) = '30805'\n",
335+
" CALL neptune.algo.vectors.topK.byNode(\n",
336+
" n, {topK: 5})\n",
337+
" YIELD node, score\n",
338+
" RETURN node, score\n",
339+
"\"\"\"\n",
340+
"\n",
341+
"all_nodes = na_graph.execute_call(TOPK_QUERY)\n",
342+
"rows = [{\"score\": n.get(\"score\"), **n[\"node\"][\"~properties\"]} for n in all_nodes]\n",
343+
"\n",
344+
"df = pd.DataFrame(rows)\n",
345+
"df"
346+
]
347+
},
348+
{
349+
"cell_type": "markdown",
350+
"metadata": {},
351+
"source": [
352+
"## Conclusion\n",
353+
"\n",
354+
"This notebook demonstrated the complete lifecycle of embedding vectors — from being stored and managed in OpenSearch service, to being joined with structured data in the data lake, and ultimately imported into Amazon Neptune Analytics for similarity search.\n",
355+
"\n",
356+
"By integrating embeddings sourced from OpenSearch service directly into the graph, this approach enables scalable and explainable similarity queries using native graph algorithms such as TopK. This is especially valuable for recommendation, product discovery, and data enrichment scenarios, where vector similarity must work alongside structured properties and graph relationships rather than operate as an isolated retrieval layer.\n",
357+
"\n",
358+
"In practice, this pattern provides a flexible foundation for building hybrid graph-and-vector analytics pipelines that integrate cleanly with existing data lake architectures."
359+
]
360+
}
361+
],
362+
"metadata": {
363+
"kernelspec": {
364+
"display_name": "Python 3 (ipykernel)",
365+
"language": "python",
366+
"name": "python3"
367+
},
368+
"language_info": {
369+
"codemirror_mode": {
370+
"name": "ipython",
371+
"version": 3
372+
},
373+
"file_extension": ".py",
374+
"mimetype": "text/x-python",
375+
"name": "python",
376+
"nbconvert_exporter": "python",
377+
"pygments_lexer": "ipython3",
378+
"version": "3.13.12"
379+
}
380+
},
381+
"nbformat": 4,
382+
"nbformat_minor": 4
383+
}

0 commit comments

Comments
 (0)