|
1 | 1 | { |
2 | 2 | "cells": [ |
3 | 3 | { |
4 | | - "metadata": {}, |
5 | 4 | "cell_type": "markdown", |
| 5 | + "id": "0", |
| 6 | + "metadata": {}, |
6 | 7 | "source": [ |
7 | 8 | "# Neptune Analytics Instance Management With S3 Table Projections\n", |
8 | 9 | "\n", |
|
13 | 14 | "2. Import the projection into Neptune Analytics.\n", |
14 | 15 | "3. Run Louvain algorithm on the provisioned instance to create communities.\n", |
15 | 16 | "4. Export the graph back into S3 Tables bucket." |
16 | | - ], |
17 | | - "id": "daa071d5474f0439" |
| 17 | + ] |
18 | 18 | }, |
19 | 19 | { |
20 | | - "metadata": {}, |
21 | 20 | "cell_type": "markdown", |
| 21 | + "id": "1", |
| 22 | + "metadata": {}, |
22 | 23 | "source": [ |
23 | 24 | "## Setup\n", |
24 | 25 | "\n", |
25 | 26 | "Import the necessary libraries and set up logging." |
26 | | - ], |
27 | | - "id": "8db98f850ec409ac" |
| 27 | + ] |
28 | 28 | }, |
29 | 29 | { |
30 | | - "metadata": {}, |
31 | 30 | "cell_type": "code", |
32 | | - "outputs": [], |
33 | 31 | "execution_count": null, |
| 32 | + "id": "2", |
| 33 | + "metadata": {}, |
| 34 | + "outputs": [], |
34 | 35 | "source": [ |
35 | 36 | "# Check the Python version:\n", |
36 | 37 | "import sys\n", |
|
49 | 50 | "dotenv.load_dotenv()\n", |
50 | 51 | "\n", |
51 | 52 | "from nx_neptune.session_manager import SessionManager" |
52 | | - ], |
53 | | - "id": "8e270bbf456a8256" |
| 53 | + ] |
54 | 54 | }, |
55 | 55 | { |
56 | | - "metadata": {}, |
57 | 56 | "cell_type": "code", |
58 | | - "outputs": [], |
59 | 57 | "execution_count": null, |
| 58 | + "id": "3", |
| 59 | + "metadata": {}, |
| 60 | + "outputs": [], |
60 | 61 | "source": [ |
61 | 62 | "# Configure logging to see detailed information about the instance creation process\n", |
62 | 63 | "logging.basicConfig(\n", |
|
72 | 73 | "]:\n", |
73 | 74 | " logging.getLogger(logger_name).setLevel(logging.INFO)\n", |
74 | 75 | "logger = logging.getLogger(__name__)" |
75 | | - ], |
76 | | - "id": "6d97092f7c0e10cd" |
| 76 | + ] |
77 | 77 | }, |
78 | 78 | { |
79 | | - "metadata": {}, |
80 | 79 | "cell_type": "markdown", |
| 80 | + "id": "4", |
| 81 | + "metadata": {}, |
81 | 82 | "source": [ |
82 | 83 | "## Configuration\n", |
83 | 84 | "\n", |
84 | 85 | "Check for environment variables necessary for the notebook." |
85 | | - ], |
86 | | - "id": "76d2a19a4f7b6d1" |
| 86 | + ] |
87 | 87 | }, |
88 | 88 | { |
89 | | - "metadata": {}, |
90 | 89 | "cell_type": "code", |
91 | | - "outputs": [], |
92 | 90 | "execution_count": null, |
| 91 | + "id": "5", |
| 92 | + "metadata": {}, |
| 93 | + "outputs": [], |
93 | 94 | "source": [ |
94 | 95 | "def check_env_vars(var_names):\n", |
95 | 96 | " values = {}\n", |
|
119 | 120 | "s3_tables_database = os.getenv('NETWORKX_S3_TABLES_DATABASE')\n", |
120 | 121 | "s3_tables_tablename = os.getenv('NETWORKX_S3_TABLES_TABLENAME')\n", |
121 | 122 | "session_name = \"nx-athena-test-full\"" |
122 | | - ], |
123 | | - "id": "9d582064efdee720" |
| 123 | + ] |
124 | 124 | }, |
125 | 125 | { |
126 | | - "metadata": {}, |
127 | 126 | "cell_type": "markdown", |
| 127 | + "id": "6", |
| 128 | + "metadata": {}, |
128 | 129 | "source": [ |
129 | 130 | "## Data Setup\n", |
130 | 131 | "\n", |
|
133 | 134 | "Data should be uploaded to an S3 bucket, and an athena table created for that bucket.\n", |
134 | 135 | "\n", |
135 | 136 | "The PaySim dataset includes a simulated mobile money dataset, that involves transactions between client actors and banks. We can use this dataset to detect fraudulent activities in the simulated data." |
136 | | - ], |
137 | | - "id": "3d8c75a19b287ff4" |
| 137 | + ] |
138 | 138 | }, |
139 | 139 | { |
140 | | - "metadata": {}, |
141 | 140 | "cell_type": "code", |
142 | | - "outputs": [], |
143 | 141 | "execution_count": null, |
| 142 | + "id": "7", |
| 143 | + "metadata": {}, |
| 144 | + "outputs": [], |
144 | 145 | "source": [ |
145 | 146 | "paysim_s3_bucket = 'nx-fraud-detection'\n", |
146 | 147 | "paysim_s3_bucket_path = 'data/'\n", |
|
170 | 171 | " paysim_s3_bucket,\n", |
171 | 172 | " f\"{paysim_s3_bucket_path}{file_path.name}\"\n", |
172 | 173 | " )" |
173 | | - ], |
174 | | - "id": "45e778be855ef9ca" |
| 174 | + ] |
175 | 175 | }, |
176 | 176 | { |
177 | | - "metadata": {}, |
178 | 177 | "cell_type": "code", |
179 | | - "outputs": [], |
180 | 178 | "execution_count": null, |
| 179 | + "id": "8", |
| 180 | + "metadata": {}, |
| 181 | + "outputs": [], |
181 | 182 | "source": [ |
182 | 183 | "def _execute_create_table(stmt, catalog, database, s3_logs_location):\n", |
183 | 184 | " athena_client = boto3.client('athena')\n", |
|
250 | 251 | "\"\"\"\n", |
251 | 252 | "\n", |
252 | 253 | "_execute_create_table(create_s3_table_stmt, s3_tables_catalog, s3_tables_database, f\"s3://{paysim_s3_bucket}\")" |
253 | | - ], |
254 | | - "id": "49de921896e76113" |
| 254 | + ] |
255 | 255 | }, |
256 | 256 | { |
257 | | - "metadata": {}, |
258 | 257 | "cell_type": "markdown", |
| 258 | + "id": "9", |
| 259 | + "metadata": {}, |
259 | 260 | "source": [ |
260 | 261 | "## Create a New/Get existing Neptune Analytics Instance\n", |
261 | 262 | "\n", |
262 | 263 | "Provision a new Neptune Analytics instance on demand, or retrieve an existing neptune-graph. Creating a new instance may take several minutes to complete." |
263 | | - ], |
264 | | - "id": "4a417c3dbf24e35" |
| 264 | + ] |
265 | 265 | }, |
266 | 266 | { |
267 | | - "metadata": {}, |
268 | 267 | "cell_type": "code", |
269 | | - "outputs": [], |
270 | 268 | "execution_count": null, |
| 269 | + "id": "10", |
| 270 | + "metadata": {}, |
| 271 | + "outputs": [], |
271 | 272 | "source": [ |
272 | 273 | "session = SessionManager.session(session_name)\n", |
273 | 274 | "graph_list = session.list_graphs()\n", |
274 | 275 | "print(\"The following graphs are available:\")\n", |
275 | 276 | "for g in graph_list:\n", |
276 | 277 | " print(g)" |
277 | | - ], |
278 | | - "id": "9c73c5cb345e6278" |
| 278 | + ] |
279 | 279 | }, |
280 | 280 | { |
281 | | - "metadata": {}, |
282 | 281 | "cell_type": "code", |
283 | | - "outputs": [], |
284 | 282 | "execution_count": null, |
| 283 | + "id": "11", |
| 284 | + "metadata": {}, |
| 285 | + "outputs": [], |
285 | 286 | "source": [ |
286 | 287 | "session = SessionManager.session(session_name)\n", |
287 | 288 | "graph = await session.get_or_create_graph(config={\"provisionedMemory\": 32})\n", |
288 | 289 | "print(f\"Retrieved graph: {graph}\")" |
289 | | - ], |
290 | | - "id": "375d3da5e264214a" |
| 290 | + ] |
291 | 291 | }, |
292 | 292 | { |
293 | | - "metadata": {}, |
294 | 293 | "cell_type": "markdown", |
| 294 | + "id": "12", |
| 295 | + "metadata": {}, |
295 | 296 | "source": [ |
296 | 297 | "## Import Data from S3\n", |
297 | 298 | "\n", |
298 | 299 | "Import data from S3 into the Neptune Analytics graph and wait for the operation to complete. <br>\n", |
299 | 300 | "IAM permisisons required for import: <br>\n", |
300 | 301 | " - s3:GetObject, kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey" |
301 | | - ], |
302 | | - "id": "f0d93a49706c24b1" |
| 302 | + ] |
303 | 303 | }, |
304 | 304 | { |
305 | | - "metadata": {}, |
306 | 305 | "cell_type": "code", |
307 | | - "outputs": [], |
308 | 306 | "execution_count": null, |
| 307 | + "id": "13", |
| 308 | + "metadata": {}, |
| 309 | + "outputs": [], |
309 | 310 | "source": [ |
310 | 311 | "SOURCE_AND_DESTINATION_BANK_CUSTOMERS = f\"\"\"\n", |
311 | 312 | "SELECT DISTINCT \"~id\", 'customer' AS \"~label\"\n", |
|
342 | 343 | " catalog=s3_tables_catalog,\n", |
343 | 344 | " database=s3_tables_database\n", |
344 | 345 | ")" |
345 | | - ], |
346 | | - "id": "f8579278ec4cb534" |
| 346 | + ] |
347 | 347 | }, |
348 | 348 | { |
349 | | - "metadata": {}, |
350 | 349 | "cell_type": "markdown", |
| 350 | + "id": "14", |
| 351 | + "metadata": {}, |
351 | 352 | "source": [ |
352 | 353 | "## Execute Louvain Algorithm\n", |
353 | 354 | "\n", |
|
356 | 357 | "We will run the Louvain Community Detection Algorithm and mutate the graph storing the results of the vertex community in the \"community\" property\n", |
357 | 358 | "\n", |
358 | 359 | "Note: This runs the `mutate` algorithm, that only returns a success/failure in the result." |
359 | | - ], |
360 | | - "id": "badd0dc0eecd5042" |
| 360 | + ] |
361 | 361 | }, |
362 | 362 | { |
363 | | - "metadata": {}, |
364 | 363 | "cell_type": "code", |
365 | | - "outputs": [], |
366 | 364 | "execution_count": null, |
| 365 | + "id": "15", |
| 366 | + "metadata": {}, |
| 367 | + "outputs": [], |
367 | 368 | "source": [ |
368 | 369 | "# sanity check: print out 10 vertices and edges from the Neptune Analytics graph\n", |
369 | 370 | "all_nodes = graph.execute_query(\"MATCH (n) RETURN n LIMIT 10\")\n", |
370 | 371 | "print(f\"all nodes: {all_nodes}\")\n", |
371 | 372 | "\n", |
372 | 373 | "all_edges = graph.execute_query(\"MATCH ()-[r]-() RETURN r LIMIT 10\")\n", |
373 | 374 | "print(f\"all edges: {all_edges}\")" |
374 | | - ], |
375 | | - "id": "3942d5d8d1236a27" |
| 375 | + ] |
376 | 376 | }, |
377 | 377 | { |
378 | | - "metadata": {}, |
379 | 378 | "cell_type": "code", |
380 | | - "outputs": [], |
381 | 379 | "execution_count": null, |
| 380 | + "id": "16", |
| 381 | + "metadata": {}, |
| 382 | + "outputs": [], |
382 | 383 | "source": [ |
383 | 384 | "# using Neptune Analytics, run the Louvain Community Detection Algorithm and mutate\n", |
384 | 385 | "# the graph storing the results of the vertex community in the \"community\" property\n", |
385 | 386 | "louvain_result = graph.execute_query('CALL neptune.algo.louvain.mutate({iterationTolerance:1e-07, writeProperty:\"community\"}) YIELD success AS success RETURN success')\n", |
386 | 387 | "print(f\"Louvain result: {louvain_result}\")" |
387 | | - ], |
388 | | - "id": "b8b4544be8fb3120" |
| 388 | + ] |
389 | 389 | }, |
390 | 390 | { |
391 | | - "metadata": {}, |
392 | 391 | "cell_type": "markdown", |
| 392 | + "id": "17", |
| 393 | + "metadata": {}, |
393 | 394 | "source": [ |
394 | 395 | "## Export the Neptune Analytics data and add it to S3 Tables as an Iceberg table\n", |
395 | 396 | "\n", |
396 | 397 | "Export the Neptune Analytics graph and a CSV export, and convert it to Iceberg format. Use Athena to add it to S3 Tables Bucket." |
397 | | - ], |
398 | | - "id": "a4b96f01915deea1" |
| 398 | + ] |
399 | 399 | }, |
400 | 400 | { |
401 | | - "metadata": {}, |
402 | 401 | "cell_type": "code", |
403 | | - "outputs": [], |
404 | 402 | "execution_count": null, |
| 403 | + "id": "18", |
| 404 | + "metadata": {}, |
| 405 | + "outputs": [], |
405 | 406 | "source": [ |
406 | 407 | "# for the CSV table\n", |
407 | 408 | "csv_catalog = 'AwsDataCatalog'\n", |
|
425 | 426 | " iceberg_catalog,\n", |
426 | 427 | " iceberg_database\n", |
427 | 428 | ")" |
428 | | - ], |
429 | | - "id": "3b9bbe3559ac819c" |
| 429 | + ] |
430 | 430 | }, |
431 | 431 | { |
432 | | - "metadata": {}, |
433 | 432 | "cell_type": "code", |
434 | | - "outputs": [], |
435 | 433 | "execution_count": null, |
| 434 | + "id": "19", |
| 435 | + "metadata": {}, |
| 436 | + "outputs": [], |
436 | 437 | "source": [ |
437 | 438 | "# destroy the session graphs\n", |
438 | 439 | "session.destroy_all_graphs()" |
439 | | - ], |
440 | | - "id": "eec63f4abb4936bf" |
| 440 | + ] |
441 | 441 | }, |
442 | 442 | { |
443 | | - "metadata": {}, |
444 | 443 | "cell_type": "markdown", |
| 444 | + "id": "20", |
| 445 | + "metadata": {}, |
445 | 446 | "source": [ |
446 | 447 | "## Conclusion\n", |
447 | 448 | "\n", |
|
453 | 454 | "4. **Deletion**: We exported the updated data back into the datalake into an iceberg table\n", |
454 | 455 | "\n", |
455 | 456 | "The session manager (`SessionManager`) provides an easy mechanism to execute general datalake functionality." |
456 | | - ], |
457 | | - "id": "80fd1b3c3b7f68db" |
| 457 | + ] |
458 | 458 | }, |
459 | 459 | { |
460 | | - "metadata": {}, |
461 | 460 | "cell_type": "markdown", |
| 461 | + "id": "21", |
| 462 | + "metadata": {}, |
462 | 463 | "source": [ |
463 | 464 | "## Execute Louvain Communities Algorithm using NetworkX\n", |
464 | 465 | "\n", |
465 | 466 | "This library integrates well as a backend for the NetworkX library (see: https://networkx.org/documentation/latest/backends.html).\n", |
466 | 467 | "\n", |
467 | 468 | "Using the following examples shows how to configure the graph id for networkx, and run louvain_communities through the NetworkX API. This will mutate the graph in Neptune Analytics. You can get the full result from Neptune Analytics by removing the `write_property` - which will not run the `mutate` algorithm variant." |
468 | | - ], |
469 | | - "id": "452ef7757e62c1db" |
| 469 | + ] |
470 | 470 | }, |
471 | 471 | { |
472 | | - "metadata": {}, |
473 | 472 | "cell_type": "code", |
474 | | - "outputs": [], |
475 | 473 | "execution_count": null, |
| 474 | + "id": "22", |
| 475 | + "metadata": {}, |
| 476 | + "outputs": [], |
476 | 477 | "source": [ |
477 | 478 | "import networkx as nx\n", |
478 | 479 | "\n", |
|
485 | 486 | "# the graph itself\n", |
486 | 487 | "result = nx.community.louvain_communities(nx.Graph(), backend=\"neptune\", write_property=\"community\")\n", |
487 | 488 | "print(f\"louvain result: \\n{result}\")" |
488 | | - ], |
489 | | - "id": "e5330d3d7027c7a" |
| 489 | + ] |
490 | 490 | } |
491 | 491 | ], |
492 | 492 | "metadata": {}, |
|
0 commit comments