-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcloud.py
More file actions
173 lines (147 loc) · 7.77 KB
/
cloud.py
File metadata and controls
173 lines (147 loc) · 7.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from json import dumps, load
from pathlib import Path
from typing import cast
from opentelemetry.trace import get_tracer
from cbltest.api.couchbaseserver import CouchbaseServer
from cbltest.api.error import CblSyncGatewayBadResponseError, CblTestError
from cbltest.api.syncgateway import PutDatabasePayload, SyncGateway
from cbltest.assertions import _assert_not_null
from cbltest.jsonhelper import _get_typed_required
from cbltest.utils import _try_n_times
from cbltest.version import VERSION
class CouchbaseCloud:
"""
A class that performs operations that require coordination between both Sync Gateway and Couchbase Server
"""
def __init__(self, sync_gateway: SyncGateway, server: CouchbaseServer | None):
self.__sync_gateway = sync_gateway
if server:
self.__couchbase_server: CouchbaseServer = server
elif not sync_gateway.using_rosmar:
raise CblTestError(
"Couchbase Server must be provided if Sync Gateway is not using Rosmar"
)
self.__tracer = get_tracer(__name__, VERSION)
@property
def sync_gateway(self) -> SyncGateway:
return self.__sync_gateway
@property
def couchbase_server(self) -> CouchbaseServer:
if not hasattr(self, "_CouchbaseCloud__couchbase_server"):
raise CblTestError(
"Couchbase Server is not available for this Couchbase Cloud instance, configured using rosmar"
)
return self.__couchbase_server
def _create_collections(self, db_payload: PutDatabasePayload) -> None:
if self.sync_gateway.using_rosmar:
return
for scope in db_payload.scopes():
self.__couchbase_server.create_collections(
db_payload.bucket, scope, db_payload.collections(scope)
)
def _check_all_indexes_removed(self, bucket: str) -> None:
count = self.__couchbase_server.indexes_count(bucket)
if count > 0:
raise ValueError(f"{count} indexes remain in '{bucket}' bucket")
def _wait_for_all_indexed_removed(self, bucket: str) -> None:
_try_n_times(10, 2, True, self._check_all_indexes_removed, bucket)
async def configure_dataset(
self,
dataset_path: Path,
dataset_name: str,
sg_config_options: list[str] | None = None,
) -> None:
"""
Creates a database, ensuring that it is in an empty state when finished
:param dataset_path: The path to the folder containing the configuration data
:param dataset_name: The name of the dataset configuration to use
:param sg_config_options: An optional list of options to apply to the base SG config
.. note:: The expected format is a file named <database_name>-sg-config.json
containing a config and users key, for use with the PUT /<db> and
PUT /<db>/<user> endpoints and a file named <database_name>-sg.json
containing the actual data to populate. Any config options that can
be passed to sg_config_options will be in a key called "config_options"
in <database_name>-sg-config.json
"""
with self.__tracer.start_as_current_span(
"configure_dataset", attributes={"cbl.dataset.name": dataset_name}
) as current_span:
_assert_not_null(dataset_path, "dataset_path")
_assert_not_null(dataset_name, "dataset_name")
config_filepath = dataset_path / f"{dataset_name}-sg-config.json"
data_filepath = dataset_path / f"{dataset_name}-sg.json"
if not config_filepath.exists():
raise FileNotFoundError(
f"Configuration file {dataset_name}-sg-config.json not found!"
)
if not data_filepath.exists():
raise FileNotFoundError(f"Data file {dataset_name}-sg.json not found!")
with open(config_filepath, encoding="utf-8") as fin:
dataset_config = cast(dict, load(fin))
if not isinstance(dataset_config, dict):
raise ValueError(
f"Badly formatted {dataset_name}-sg-config.json (not an object)"
)
users = _get_typed_required(dataset_config, "users", dict)
if sg_config_options is not None:
nested_config = _get_typed_required(dataset_config, "config", dict)
valid_options = _get_typed_required(
dataset_config, "config_options", dict
)
for option in sg_config_options:
if option not in valid_options:
raise CblTestError(
f"{option} is not a valid option for {dataset_name} (valid options are {dumps(list(str(k) for k in valid_options.keys()))})"
)
addition = _get_typed_required(valid_options, option, dict)
for k in addition:
nested_config[k] = addition[k]
db_payload: PutDatabasePayload = PutDatabasePayload(dataset_config)
try:
# buckets and collections are implicitly created when using Rosmar
if not self.sync_gateway.using_rosmar:
self.couchbase_server.create_bucket(db_payload.bucket)
self._create_collections(db_payload)
await self.__sync_gateway.put_database(dataset_name, db_payload)
except CblSyncGatewayBadResponseError as e:
if e.code != 412:
raise
current_span.add_event("Handle HTTP 412")
await self.__sync_gateway.delete_database(dataset_name)
await self.drop_bucket(db_payload.bucket)
await self.__sync_gateway.wait_for_no_databases(db_payload.bucket)
if not self.sync_gateway.using_rosmar:
self.__couchbase_server.create_bucket(db_payload.bucket)
self._create_collections(db_payload)
# CBL-4977 :
# The bucket's indexes will be deleted asynchronously after the bucket is dropped.
# When recreating the sg database, sg may wrongly detect that the indexes already exist,
# but later when trying to use the indexes for querying, the index-not-available error occurs
# as the index has already been deleted by that time.
#
# Wait until all indexes are removed will help prevent that problem. It's important
# to wait after the bucket and its collections are created, otherwise, QueryIndexManager
# will not be able to return the pending-to-removed indexes created for the collections.
self._wait_for_all_indexed_removed(db_payload.bucket)
await self.__sync_gateway.put_database(dataset_name, db_payload)
for user in users:
user_dict = _get_typed_required(users, user, dict)
await self.__sync_gateway.add_user(
dataset_name,
user,
user_dict["password"],
user_dict["collection_access"],
)
await self.__sync_gateway.load_dataset(dataset_name, data_filepath)
async def drop_bucket(self, bucket_name: str):
"""Drop the bucket from the backing cluster."""
if self.sync_gateway.using_rosmar:
try:
await self.sync_gateway._send_request(
"delete", f"/_rosmar/{bucket_name}"
)
except CblSyncGatewayBadResponseError as e:
if e.code != 404:
raise
else:
self.couchbase_server.drop_bucket(bucket_name)