-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathload_utils.py
More file actions
484 lines (379 loc) · 15.8 KB
/
load_utils.py
File metadata and controls
484 lines (379 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
"""
Shared Utilities for Neo4j Data Loading
=======================================
This module provides common utilities shared across all embedding and load test scripts:
- Configuration management with Databricks Secrets
- Neo4j connection handling with context managers
- Connection testing and validation
- Output formatting utilities
TUTORIAL: Connecting Databricks to Neo4j
----------------------------------------
**Step 1: Store Credentials in Databricks Secrets**
# Create a secret scope (one-time setup)
databricks secrets create-scope my-neo4j-secrets
# Add your Neo4j credentials
databricks secrets put-secret my-neo4j-secrets host
databricks secrets put-secret my-neo4j-secrets username
databricks secrets put-secret my-neo4j-secrets password
**Step 2: Load Configuration in Your Script**
from load_utils import load_config, test_neo4j_connection
config = load_config(dbutils, "my-neo4j-secrets")
if not test_neo4j_connection(config):
raise Exception("Cannot connect to Neo4j!")
**Step 3: Use the Configuration**
from load_utils import neo4j_driver
with neo4j_driver(config) as driver:
with driver.session(database=config.database) as session:
result = session.run("MATCH (n) RETURN count(n)")
print(result.single()[0])
Neo4j Aura Connection Tips:
---------------------------
- Protocol: Use "neo4j+s" for TLS-encrypted connections (required for Aura)
- Database: Use "neo4j" for single-database instances
- Paused Instances: Aura instances pause after inactivity; wait 1-2 minutes after resume
- Allowlisting: Ensure Databricks IP ranges are allowlisted in Aura
Usage:
from load_utils import (
Config,
get_secret_with_default,
load_config,
print_config,
neo4j_driver,
test_neo4j_connection,
print_section_header,
)
References:
- https://neo4j.com/docs/python-manual/current/
- https://docs.databricks.com/en/security/secrets/
"""
from __future__ import annotations
import time
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator, Optional
from neo4j import Driver, GraphDatabase
# =============================================================================
# CONFIGURATION
# =============================================================================
@dataclass
class Config:
"""Configuration for Neo4j connection and embedding endpoint.
Tutorial: Configuration as a Dataclass
---------------------------------------
Using a dataclass for configuration provides:
- **Type Safety**: IDE autocomplete and type checking
- **Immutability**: Reduces accidental modifications
- **Documentation**: Self-documenting field names
- **Validation**: Can add __post_init__ for validation
The `uri` property combines protocol and host into the connection string
that Neo4j drivers expect.
Attributes:
host: Neo4j server hostname (e.g., "xxx.databases.neo4j.io")
username: Neo4j username (usually "neo4j" for Aura)
password: Neo4j password
database: Database name (usually "neo4j" for single-database)
protocol: Connection protocol ("neo4j+s" for TLS, "neo4j" for plain)
embedding_endpoint: Databricks Model Serving endpoint for embeddings
Example:
>>> config = Config(
... host="abc123.databases.neo4j.io",
... username="neo4j",
... password="secret",
... database="neo4j",
... protocol="neo4j+s",
... embedding_endpoint="my-embedding-model",
... )
>>> print(config.uri)
neo4j+s://abc123.databases.neo4j.io
"""
host: str
username: str
password: str
database: str
protocol: str
embedding_endpoint: str
@property
def uri(self) -> str:
"""Construct Neo4j connection URI from protocol and host.
The URI format is: {protocol}://{host}
For Neo4j Aura, use "neo4j+s" protocol for TLS encryption.
For local/self-hosted, use "neo4j" (plain) or "neo4j+s" (TLS).
Returns:
Complete Neo4j connection URI
"""
return f"{self.protocol}://{self.host}"
def get_secret_with_default(
dbutils,
scope: str,
key: str,
default: str,
) -> str:
"""Retrieve a secret from Databricks, falling back to default if not found.
Tutorial: Safe Secret Access
----------------------------
Databricks Secrets raise an exception if a key doesn't exist. This wrapper
provides a safe way to access optional secrets with defaults:
# Required secret (will raise if missing)
password = dbutils.secrets.get(scope="my-scope", key="password")
# Optional secret with default (won't raise)
database = get_secret_with_default(dbutils, "my-scope", "database", "neo4j")
Args:
dbutils: Databricks dbutils object (available in notebooks)
scope: Databricks secret scope name
key: Secret key within the scope
default: Value to return if secret doesn't exist
Returns:
Secret value or default
Example:
>>> # In a Databricks notebook:
>>> database = get_secret_with_default(dbutils, "neo4j-secrets", "database", "neo4j")
"""
try:
return dbutils.secrets.get(scope=scope, key=key)
except Exception:
return default
def load_config(
dbutils,
scope_name: str,
default_database: str = "neo4j",
default_protocol: str = "neo4j+s",
default_embedding_endpoint: str = "airline_embedding_test",
) -> Config:
"""Load configuration from Databricks Secrets.
Tutorial: Loading Neo4j Configuration
-------------------------------------
This function loads Neo4j connection details from Databricks Secrets.
Secrets are the recommended way to store credentials in Databricks
because they are encrypted and access-controlled.
**Required Secrets** (must exist in the scope):
- host: Neo4j hostname (e.g., "xxx.databases.neo4j.io")
- username: Neo4j username
- password: Neo4j password
**Optional Secrets** (have defaults):
- database: Database name (default: "neo4j")
- protocol: Connection protocol (default: "neo4j+s")
The embedding_endpoint is passed as a parameter, not loaded from secrets,
because each script may use a different endpoint.
Args:
dbutils: Databricks dbutils object
scope_name: Name of the Databricks secret scope
default_database: Default database if not in secrets
default_protocol: Default protocol if not in secrets
default_embedding_endpoint: Default embedding endpoint
Returns:
Populated Config object
Raises:
Exception: If required secrets (host, username, password) are missing
Example:
>>> config = load_config(dbutils, "my-neo4j-secrets")
>>> print(f"Connecting to {config.uri}")
"""
return Config(
host=dbutils.secrets.get(scope=scope_name, key="host"),
username=dbutils.secrets.get(scope=scope_name, key="username"),
password=dbutils.secrets.get(scope=scope_name, key="password"),
database=get_secret_with_default(dbutils, scope_name, "database", default_database),
protocol=get_secret_with_default(dbutils, scope_name, "protocol", default_protocol),
embedding_endpoint=default_embedding_endpoint,
)
def print_config(
config: Config,
scope_name: str,
embedding_dimensions: int = 384,
batch_size: int = 5000,
) -> None:
"""Display configuration for verification (excludes sensitive values).
Tutorial: Safe Logging of Configuration
---------------------------------------
When debugging, it's helpful to print configuration. But NEVER print
passwords or sensitive values!
This function prints:
- Secret scope name (safe)
- Neo4j URI (safe - no credentials)
- Database name (safe)
- Username (safe - just the user, not the password)
- Embedding settings (safe)
It does NOT print:
- Password
- Full connection strings with credentials
Args:
config: Configuration object to display
scope_name: Secret scope name for display
embedding_dimensions: Embedding vector size for display
batch_size: Batch size for display
"""
print("\nConfiguration:")
print(f" Secret Scope: {scope_name}")
print(f" Neo4j URI: {config.uri}")
print(f" Database: {config.database}")
print(f" Username: {config.username}")
print(f" Embedding Endpoint: {config.embedding_endpoint}")
print(f" Embedding Dimensions: {embedding_dimensions}")
print(f" Batch Size: ~{batch_size:,} rows")
# =============================================================================
# NEO4J CONNECTION MANAGEMENT
# =============================================================================
@contextmanager
def neo4j_driver(config: Config) -> Generator[Driver, None, None]:
"""Context manager for Neo4j driver lifecycle.
Tutorial: Managing Neo4j Connections
------------------------------------
The Neo4j Python driver maintains a connection pool. It's important to:
1. Create one driver per application (not per query)
2. Close the driver when done (releases connections)
This context manager handles proper cleanup:
with neo4j_driver(config) as driver:
with driver.session() as session:
result = session.run("RETURN 1")
# Driver is automatically closed here, even if an error occurred
**Driver Options:**
- max_connection_lifetime: How long connections stay in the pool (seconds)
- keep_alive: Send periodic pings to prevent connection drops
- max_connection_pool_size: Maximum concurrent connections (default 100)
Args:
config: Configuration with Neo4j connection details
Yields:
Neo4j driver instance
Example:
>>> with neo4j_driver(config) as driver:
... with driver.session(database="neo4j") as session:
... result = session.run("MATCH (n) RETURN count(n) AS count")
... print(result.single()["count"])
"""
driver = GraphDatabase.driver(
config.uri,
auth=(config.username, config.password),
max_connection_lifetime=200, # Seconds before connection is recycled
keep_alive=True, # Prevent idle connection drops
)
try:
yield driver
finally:
driver.close()
# =============================================================================
# CONNECTION TESTING
# =============================================================================
def test_neo4j_connection(config: Config) -> bool:
"""Test Neo4j connection before attempting operations.
Tutorial: Pre-Flight Connection Checks
--------------------------------------
Before running a long ETL job, it's crucial to verify Neo4j connectivity.
A failed connection halfway through wastes time and leaves partial data.
This function performs three tests:
1. **Basic Connectivity**: Can we connect and run a query?
- Tests network connectivity
- Verifies credentials are correct
- Confirms the database exists
2. **Version Check**: What Neo4j version is running?
- Useful for debugging
- Ensures compatibility (need 4.x+ for vector search)
3. **Write Permission**: Can we create and delete nodes?
- Some accounts may have read-only access
- We create and immediately delete a test node
**Common Failure Causes:**
- Neo4j Aura instance is paused (resume and wait 1-2 minutes)
- Wrong credentials in Databricks Secrets
- Databricks IP not allowlisted in Neo4j Aura
- Network connectivity issues
Args:
config: Neo4j configuration
Returns:
True if all tests pass, False otherwise
Example:
>>> if not test_neo4j_connection(config):
... print("Cannot connect to Neo4j! Aborting.")
... return
"""
print_section_header("TESTING NEO4J CONNECTION")
print(f" URI: {config.uri}")
print(f" Database: {config.database}")
try:
with neo4j_driver(config) as driver:
# Test 1: Basic connectivity with simple query
with driver.session(database=config.database) as session:
result = session.run("RETURN 1 AS test")
record = result.single()
if record["test"] != 1:
print(" ERROR: Basic query returned unexpected result")
return False
print(" [OK] Basic connectivity")
# Test 2: Get Neo4j version info
with driver.session(database=config.database) as session:
result = session.run(
"CALL dbms.components() YIELD name, versions RETURN name, versions"
)
record = result.single()
print(f" [OK] Connected to {record['name']} {record['versions'][0]}")
# Test 3: Verify write permissions
with driver.session(database=config.database) as session:
# Create and immediately delete a test node
session.run(
"CREATE (t:_ConnectionTest {ts: $ts}) "
"WITH t DELETE t",
ts=time.time(),
)
print(" [OK] Write operations permitted")
print("\n Connection test PASSED")
return True
except Exception as e:
print(f"\n ERROR: {type(e).__name__}: {e}")
print("\n Troubleshooting tips:")
print(" - Check if Neo4j Aura instance is running (not paused)")
print(" - Verify credentials in Databricks secrets")
print(" - Ensure network access from Databricks to Neo4j")
print(" - Wait 1-2 minutes if instance was just resumed")
return False
# =============================================================================
# OUTPUT FORMATTING
# =============================================================================
def print_section_header(title: str, width: int = 70) -> None:
"""Print a formatted section header for console output.
Tutorial: Clear Console Output
------------------------------
When running long jobs, clear section headers help track progress
in the logs. This creates headers like:
======================================================================
SETTING UP NEO4J SCHEMA
======================================================================
Args:
title: Header text to display
width: Width of the separator line (default 70)
Example:
>>> print_section_header("PROCESSING DATA")
======================================================================
PROCESSING DATA
======================================================================
"""
print(f"\n{'=' * width}")
print(title)
print("=" * width)
def format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string.
Args:
seconds: Duration in seconds
Returns:
Formatted string like "1.5s" or "2.5 minutes"
Example:
>>> format_duration(90)
'90.0s (1.5 minutes)'
>>> format_duration(30)
'30.0s'
"""
if seconds >= 60:
return f"{seconds:.1f}s ({seconds / 60:.1f} minutes)"
return f"{seconds:.1f}s"
def format_rate(count: int, seconds: float) -> str:
"""Format a throughput rate.
Args:
count: Number of items processed
seconds: Time taken in seconds
Returns:
Formatted string like "1,234.5 rows/s"
Example:
>>> format_rate(10000, 5)
'2,000.0 rows/s'
"""
if seconds > 0:
rate = count / seconds
return f"{rate:,.1f} rows/s"
return "N/A"