-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtestserver.py
More file actions
147 lines (126 loc) · 5.47 KB
/
testserver.py
File metadata and controls
147 lines (126 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from typing import cast
from urllib.parse import urljoin, urlparse
from opentelemetry.trace import get_tracer
from cbltest.api.database import Database
from cbltest.assertions import _assert_not_null
from cbltest.globals import CBLPyTestGlobal
from cbltest.requests import RequestFactory, TestServerRequestType
from cbltest.responses import GetRootResponse
from cbltest.v1.requests import (
PostLogRequestBody,
PostNewSessionRequestBody,
PostResetRequestBody,
)
from cbltest.version import VERSION
class TestServer:
"""
A class for interacting with a Couchbase Lite test server
"""
@property
def url(self) -> str:
"""Gets the URL of the test server being communicated with"""
return self.__url
def __init__(self, request_factory: RequestFactory, index: int, url: str):
assert request_factory.version == 1, (
"This version of the CBLTest API requires request API v1"
)
self.__index = index
self.__url = url
self.__request_factory = request_factory
self.__tracer = get_tracer(__name__, VERSION)
self.__info: GetRootResponse | None = None
async def get_info(self) -> GetRootResponse:
"""
Retrieves the information about the running test server.
This is cached after the first call.
"""
if self.__info is None:
with self.__tracer.start_as_current_span("get_info"):
request = self.__request_factory.create_request(
TestServerRequestType.ROOT
)
resp = await self.__request_factory.send_request(self.__index, request)
ret_val = cast(GetRootResponse, resp)
self.__info = ret_val
return self.__info
async def create_and_reset_db(
self,
db_names: list[str],
dataset: str | None = None,
collections: list[str] | None = None,
) -> list[Database]:
"""
Creates and returns a set of Databases based on the given dataset
:param db_names: A list of database names, each of which will become a database based on
the other two args
:param dataset: The name of the dataset to use for creating the databases (if not specified
an empty database will be created). Cannot be combined with collections.
:param collections: The name of the collections to add after creating the database. Cannot
be combined with dataset.
"""
assert collections is None or dataset is None, (
"dataset and collections cannot both be specified"
)
with self.__tracer.start_as_current_span("create_and_reset_db"):
payload = PostResetRequestBody(CBLPyTestGlobal.running_test_name)
if dataset is not None:
payload.add_dataset(dataset, db_names)
else:
payload.add_empty(db_names, collections)
request = self.__request_factory.create_request(
TestServerRequestType.RESET, payload
)
await self.__request_factory.send_request(self.__index, request)
ret_val: list[Database] = []
for db_name in db_names:
ret_val.append(Database(self.__request_factory, self.__index, db_name))
return ret_val
async def new_session(
self, id: str, dataset_version: str, url: str | None, tag: str | None
):
"""
Instructs this test server to log to the given LogSlurp instance
:param url: The URL of the LogSlurp server
:param id: The ID of the log to log to
:param tag: The tag to use for this test server
"""
with self.__tracer.start_as_current_span("new_session"):
payload = PostNewSessionRequestBody(id, dataset_version, url, tag)
request = self.__request_factory.create_request(
TestServerRequestType.NEW_SESSION, payload
)
await self.__request_factory.send_request(self.__index, request)
async def cleanup(self) -> None:
"""
Resets the test server
"""
with self.__tracer.start_as_current_span("create_and_reset_db"):
request = self.__request_factory.create_request(
TestServerRequestType.RESET, PostResetRequestBody()
)
await self.__request_factory.send_request(self.__index, request)
async def log(self, msg: str) -> None:
"""
Sends a message to be logged on the server side. Useful for debugging.
"""
# I'll exclude this from telemetry since it's not really related to any testing
payload = PostLogRequestBody(msg)
request = self.__request_factory.create_request(
TestServerRequestType.LOG, payload
)
await self.__request_factory.send_request(self.__index, request)
def replication_url(self, db_name: str, port: int):
"""
Returns the URL of the replication endpoint for this test server
"""
ws_scheme = "ws://" # For now not using secure
_assert_not_null(db_name, "db_name")
parsed = urlparse(self.url)
if parsed.hostname:
hostname = parsed.hostname
elif parsed.netloc:
hostname = parsed.netloc.split(":")[0]
else:
hostname = self.url.split("://")[-1].split(":")[0]
replication_url = f"{ws_scheme}{hostname}:{port}"
return urljoin(replication_url, db_name)