Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions client/src/cbltest/api/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,9 @@ async def configure_dataset(

current_span.add_event("Handle HTTP 412")
await self.__sync_gateway.delete_database(dataset_name)
if self.sync_gateway.using_rosmar:
raise CblTestError(
f"Database {dataset_name} already exists on Sync Gateway and cannot be deleted when "
"using rosmar until CBG-5213 is implemented. To work around, restart a Sync Gateway to "
"delete the rosmar buckets."
)
else:
self.__couchbase_server.drop_bucket(db_payload.bucket)
await self.drop_bucket(db_payload.bucket)
await self.__sync_gateway.wait_for_no_databases(db_payload.bucket)
if not self.sync_gateway.using_rosmar:
self.__couchbase_server.create_bucket(db_payload.bucket)
self._create_collections(db_payload)

Expand All @@ -163,3 +158,37 @@ async def configure_dataset(
)

await self.__sync_gateway.load_dataset(dataset_name, data_filepath)

async def drop_bucket(self, bucket_name: str, *, wait_for_deleted=False):
"""Drop the bucket from the backing cluster. This is an asynchronous operation unless wait_for_deleted is set to True."""
Comment thread
vipbhardwaj marked this conversation as resolved.
if self.sync_gateway.using_rosmar:
try:
await self.sync_gateway._send_request(
"delete", f"/_rosmar/{bucket_name}"
)
except CblSyncGatewayBadResponseError as e:
if e.code != 404:
raise
else:
self.couchbase_server.drop_bucket(bucket_name)
if wait_for_deleted:
await self.couchbase_server.wait_for_bucket_deleted(bucket_name)

if wait_for_deleted:
await self.__sync_gateway.wait_for_no_databases(bucket_name)

async def create_database(
self, db_name: str, db_payload: PutDatabasePayload
) -> None:
"""Create a Sync Gateway database with the given name and payload. Delete the bucket prior to creation of the database."""
await self.drop_bucket(db_payload.bucket, wait_for_deleted=True)
await self.__sync_gateway.wait_for_no_databases(db_payload.bucket)
Comment thread
vipbhardwaj marked this conversation as resolved.
await self.create_bucket(db_payload.bucket)
await self.__sync_gateway.put_database(db_name, db_payload)
await self.__sync_gateway.wait_for_db_up(db_name)

async def create_bucket(self, bucket_name: str) -> None:
"""Create a bucket with the given name. Only applicable if not using Rosmar."""
if self.sync_gateway.using_rosmar:
return None
self.couchbase_server.create_bucket(bucket_name)
96 changes: 67 additions & 29 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import asyncio
import contextlib
import re
import ssl
from abc import ABC, abstractmethod
from json import dumps, loads
from pathlib import Path
from typing import Any, cast
from typing import Any, AsyncIterator, cast
from urllib.parse import urljoin

import requests
import tenacity
from aiohttp import BasicAuth, ClientError, ClientSession, ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientConnectorError
from opentelemetry.trace import get_tracer
Expand All @@ -17,7 +19,7 @@
from cbltest.assertions import _assert_not_null
from cbltest.httplog import get_next_writer
from cbltest.jsonhelper import _get_typed_required
from cbltest.logging import cbl_error, cbl_info, cbl_warning
from cbltest.logging import cbl_error, cbl_info, cbl_trace, cbl_warning
from cbltest.utils import assert_not_null
from cbltest.version import VERSION

Expand Down Expand Up @@ -529,7 +531,7 @@ def __init__(self, response: dict):
class _SyncGatewayBase:
"""
Base class for Sync Gateway clients containing common document and database operations.
This class should not be instantiated directly - use SyncGateway or SyncGatewayPublic instead.
This class should not be instantiated directly - use SyncGateway or SyncGatewayUserClient instead.
"""

def __init__(
Expand All @@ -556,21 +558,6 @@ def __init__(
port,
BasicAuth(username, password, "ascii"),
)
r = requests.get(
f"{scheme}{url}:{port}/_config",
auth=(username, password),
# disable hostname verification as we do in _create_session
verify=False,
timeout=10,
)
r.raise_for_status()
try:
self.using_rosmar = r.json()["bootstrap"]["server"].startswith("rosmar")
except AttributeError:
raise CblTestError(
"Unexpected response {r.json()} from Sync Gateway /_config endpoint, cannot determine if using Rosmar"
) from None
self.using_rosmar = False

@property
def hostname(self) -> str:
Expand All @@ -587,6 +574,11 @@ def secure(self) -> bool:
"""Gets whether the Sync Gateway instance uses TLS"""
return self.__secure

@property
def scheme(self) -> str:
"""Gets the URL scheme to use when connecting to the Sync Gateway instance (http or https)"""
return "https://" if self.secure else "http://"

def _create_session(
self, secure: bool, scheme: str, url: str, port: int, auth: BasicAuth | None
) -> ClientSession:
Expand Down Expand Up @@ -644,11 +636,19 @@ async def _send_request(

return ret_val

async def supports_version_vectors(self) -> bool:
"""Return true if Sync Gateway supports version vectors."""
resp_data = await self._send_request("get", "/_cluster_info")
for bucket in resp_data:
return bucket.get("enable_cross_cluster_versioning")
Comment on lines +642 to +643
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually true, what if the field isn't there, atleast wrapping this in a try/catch is better.

raise CblTestError(
"Could not determine if Sync Gateway supports version vectors, need at least one database"
)

async def get_version(self) -> CouchbaseVersion:
# Telemetry not really important for this call
scheme = "https://" if self.secure else "http://"
async with self._create_session(
self.secure, scheme, self.hostname, 4984, None
self.secure, self.scheme, self.hostname, 4984, None
) as s:
resp = await self._send_request("get", "/", session=s)
assert isinstance(resp, dict)
Expand All @@ -659,7 +659,7 @@ async def get_version(self) -> CouchbaseVersion:

def tls_cert(self) -> str | None:
if not self.secure:
cbl_warning(
cbl_trace(
"Sync Gateway instance not using TLS, returning empty tls_cert..."
)
return None
Expand Down Expand Up @@ -1345,9 +1345,8 @@ async def get_document_revision_public(
)
params = {"rev": revision}

scheme = "https://" if self.secure else "http://"
async with self._create_session(
self.secure, scheme, self.hostname, 4984, auth
self.secure, self.scheme, self.hostname, 4984, auth
) as session:
return await self._send_request("GET", path, params=params, session=session)

Expand Down Expand Up @@ -1615,6 +1614,21 @@ def __init__(
"""
super().__init__(url, username, password, port, secure)
self.__public_port = public_port
r = requests.get(
f"{self.scheme}{url}:{port}/_config",
auth=(username, password),
# disable hostname verification as we do in _create_session
verify=False,
timeout=10,
)
r.raise_for_status()
try:
self.using_rosmar = r.json()["bootstrap"]["server"].startswith("rosmar")
except AttributeError:
raise CblTestError(
"Unexpected response {r.json()} from Sync Gateway /_config endpoint, cannot determine if using Rosmar"
) from None
self.using_rosmar = False
Comment on lines +1626 to +1631
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this an formatted string genuinely.


def create_collection_access_dict(self, input: dict[str, list[str]]) -> dict:
"""
Expand Down Expand Up @@ -1846,9 +1860,8 @@ async def start(self, config_name: str = "bootstrap") -> None:
# Check if SGW is already running by probing the public endpoint (4984)
try:
# Use a short timeout to distinguish "not running" from "slow"
scheme = "https://" if self.secure else "http://"
async with self._create_session(
self.secure, scheme, self.hostname, 4984, None
self.secure, self.scheme, self.hostname, 4984, None
) as session:
async with session.get("/", timeout=ClientTimeout(total=5)) as resp:
if resp.status == 200:
Expand All @@ -1874,6 +1887,22 @@ async def start(self, config_name: str = "bootstrap") -> None:
# Wait a bit for SGW to fully initialize
await asyncio.sleep(5)

@tenacity.retry(
wait=tenacity.wait_fixed(0.1),
# Sync Gateway polling time is 10s, so wait 60s for polling time + any additional work
stop=tenacity.stop_after_delay(60),
reraise=True,
retry=tenacity.retry_if_exception_type(AssertionError),
)
async def wait_for_no_databases(self, bucket_name: str):
with self._tracer.start_as_current_span("get_all_dbs"):
resp = await self._send_request("get", "/_all_dbs?verbose=true")
assert isinstance(resp, list), resp
for db in resp:
assert db["bucket"] != bucket_name, (
f"Database {db=} is still backed by bucket {bucket_name}"
)

async def wait_for_db_gone_clusterwide(
self,
sync_gateways: list["SyncGateway"],
Expand Down Expand Up @@ -1931,23 +1960,28 @@ async def wait_for_db_up(
# Wait for the node to settle down after coming online
await asyncio.sleep(settle_online)

@contextlib.asynccontextmanager
@contextlib.asynccontextmanager
async def create_user_client(
self,
db_name: str,
username: str,
password: str,
channels: list[str],
) -> "SyncGatewayUserClient":
) -> AsyncIterator["SyncGatewayUserClient"]:
"""
Helper method to create a user with channel access and return a user-specific SG client.
Helper method to create a user with channel access and yield a user-specific SG client.

This is a convenience method for tests that need to verify user-level access control.
Use it with ``async with`` only. The yielded client is automatically closed when the
context exits and must not be used outside that block.

:param db_name: The database name
:param username: The username to create
:param password: The password for the user
:param channels: List of channels the user should have access to
:return: A SyncGatewayUserClient instance authenticated as the user (uses public port)
:return: An async context manager yielding a SyncGatewayUserClient authenticated as
the user (uses public port)
"""
# Clean up user if exists from previous run
await self.delete_user(db_name, username)
Expand All @@ -1959,13 +1993,17 @@ async def create_user_client(
)

# Return user-specific SG client for public API access
return SyncGatewayUserClient(
client = SyncGatewayUserClient(
self.hostname,
username,
password,
port=self.__public_port,
secure=self.secure,
)
try:
yield client
finally:
await client.close()
Comment thread
torcolvin marked this conversation as resolved.

async def start_isgr(self, db_name: str, payload: ISGRPayload) -> str:
"""
Expand Down
1 change: 1 addition & 0 deletions environment/local/run_sync_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def main(start, stop, server):
if stop:
bridge.stop("localhost")
else:
bridge.stop("localhost")
bridge.run("localhost")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@
"log_level": "debug",
"log_keys": ["*"]
}
},
"unsupported": {
"rosmar_bucket_management": true
}
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies = [
"paramiko==3.5.0",
"psutil==7.0.0",
"pyyaml==6.0.2",
"tenacity==9.1.4",
"tqdm==4.67.1",
]

Expand Down
Loading