-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsdk-connection.py
More file actions
73 lines (60 loc) · 2.13 KB
/
sdk-connection.py
File metadata and controls
73 lines (60 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
Couchbase SDK connection singleton — Python
Copy and adapt for your application.
"""
from datetime import timedelta
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, ClusterTimeoutOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
# --- Configuration ---
CB_HOST = "localhost" # or "cb.xxxxx.cloud.couchbase.com" for Capella
CB_USER = "app-service-user"
CB_PASSWORD = "AppSecret123!"
CB_BUCKET = "myapp"
CB_SCOPE = "_default"
CB_COLLECTION = "_default"
# Use couchbases:// for TLS (required for Capella, recommended for production)
CB_SCHEME = "couchbase" # or "couchbases"
_cluster = None
_collection = None
def get_collection():
"""Return a singleton Collection, initializing the cluster on first call."""
global _cluster, _collection
if _collection is not None:
return _collection
auth = PasswordAuthenticator(CB_USER, CB_PASSWORD)
opts = ClusterOptions(
auth,
timeout_options=ClusterTimeoutOptions(
connect_timeout=timedelta(seconds=10),
kv_timeout=timedelta(seconds=2.5),
query_timeout=timedelta(seconds=75),
search_timeout=timedelta(seconds=75),
)
)
_cluster = Cluster(f"{CB_SCHEME}://{CB_HOST}", opts)
_cluster.wait_until_ready(timedelta(seconds=10))
bucket = _cluster.bucket(CB_BUCKET)
_collection = bucket.scope(CB_SCOPE).collection(CB_COLLECTION)
return _collection
def get_cluster():
"""Return the singleton Cluster (needed for queries and transactions)."""
get_collection() # ensures cluster is initialized
return _cluster
# --- Usage example ---
if __name__ == "__main__":
col = get_collection()
cluster = get_cluster()
# KV upsert
col.upsert("doc_1", {"type": "example", "value": 42})
# KV get
result = col.get("doc_1")
print(result.content_as[dict])
# SQL++ query
rows = cluster.query(
"SELECT * FROM `myapp`._default._default WHERE type = $type LIMIT 5",
named_parameters={"type": "example"}
)
for row in rows:
print(row)