Skip to content
4 changes: 2 additions & 2 deletions client/src/cbltest/api/syncgateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ async def _delete_database(self, db_name: str, retry_count: int = 0) -> None:
current_span.add_event("SGW returned 500, retry")
await asyncio.sleep(2)
await self._delete_database(db_name, retry_count + 1)
elif e.code == 403:
pass
elif e.code == 403 or e.code == 404:
pass # Database doesn't exist anyway.
Comment thread
vipbhardwaj marked this conversation as resolved.
Comment thread
vipbhardwaj marked this conversation as resolved.
else:
raise

Expand Down
155 changes: 155 additions & 0 deletions client/src/cbltest/api/upgrade_test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import asyncio
from collections.abc import Callable
from pathlib import Path
from typing import TypeAlias

from cbltest import CBLPyTest, CouchbaseServer
from cbltest.api.cbltestclass import CBLTestClass
from cbltest.api.database import Database, GetDocumentResult
from cbltest.api.database_types import DocumentEntry
from cbltest.api.replicator import Replicator
from cbltest.api.replicator_types import (
ReplicatorActivityLevel,
ReplicatorBasicAuthenticator,
ReplicatorCollectionEntry,
ReplicatorConflictResolver,
ReplicatorType,
WaitForDocumentEventEntry,
)
from cbltest.api.syncgateway import RemoteDocument
from cbltest.api.test_functions import compare_local_and_remote
from cbltest.logging import cbl_info


class DocSnapshot:
def __init__(self, local: GetDocumentResult, remote: RemoteDocument):
self.local = local
self.remote = remote


DocValidator: TypeAlias = Callable[[DocSnapshot, DocSnapshot], None]


def tools_path() -> Path:
# client/src/cbltest/api/upgrade_test_helpers.py -> parents[4] is repo root
return Path(__file__).resolve().parents[4] / "tests" / ".tools"


async def setup_upgrade_env(
test_case: CBLTestClass, cblpytest: CBLPyTest, dataset_path: Path
) -> Database:
await test_case.skip_if_cbl_not(cblpytest.test_servers[0], ">= 4.0.0")

dataset_ver = cblpytest.test_servers[0].dataset_version
test_case.skip_if_not(
dataset_ver == "4.0", f"Requires dataset v4.0 (current: {dataset_ver})."
)

test_case.mark_test_step("Delete Sync Gateway 'upgrade' database if exists")
await cblpytest.sync_gateways[0].delete_database("upgrade")

test_case.mark_test_step("Restore Couchbase Server Bucket using `upgrade` dataset")
cbs: CouchbaseServer = cblpytest.couchbase_servers[0]
cbs.drop_bucket("upgrade")
cbs.restore_bucket("upgrade", tools_path(), dataset_path, "upgrade")

test_case.mark_test_step("Wait 2s to ensure SG picks up the restored database.")
await asyncio.sleep(2)

test_case.mark_test_step("Reset local database, and load `upgrade` dataset.")
dbs = await cblpytest.test_servers[0].create_and_reset_db(
["db1"], dataset="upgrade"
)
return dbs[0]


async def do_upgrade_replication_test(
test_case: CBLTestClass,
cblpytest: CBLPyTest,
db: Database,
doc_id: str,
replicator_type: ReplicatorType,
conflict_resolver: ReplicatorConflictResolver | None = None,
doc_events: set[WaitForDocumentEventEntry] | None = None,
compare_docs: bool | None = True,
validator: DocValidator | None = None,
) -> None:
sg = cblpytest.sync_gateways[0]

pre_local_doc = await db.get_document(DocumentEntry("_default._default", doc_id))
pre_remote_doc = await sg.get_document("upgrade", doc_id)

assert pre_local_doc is not None
assert pre_remote_doc is not None
cbl_info(f"Revision Info before Replication ({replicator_type}):")
cbl_info(f"Local : RevID = {pre_local_doc.revid}, HLV = {pre_local_doc.cv}")
cbl_info(f"Remote : RevID = {pre_remote_doc.revid}, HLV = {pre_remote_doc.cv}")

wait_for_doc_events = bool(doc_events)

conflict_resolver_name = (
f"{conflict_resolver.name}" if conflict_resolver else "None"
)

test_case.mark_test_step(f"""
Start a replicator:
* endpoint: '/upgrade'
* collections : '_default._default'
* type: {replicator_type}
* document_ids: ['{doc_id}']
* continuous: {wait_for_doc_events}
* conflict_resolver: {conflict_resolver_name}
""")
replicator = Replicator(
db,
cblpytest.sync_gateways[0].replication_url("upgrade"),
collections=[
ReplicatorCollectionEntry(
names=["_default._default"],
document_ids=[doc_id],
conflict_resolver=conflict_resolver,
)
],
replicator_type=replicator_type,
continuous=wait_for_doc_events,
authenticator=ReplicatorBasicAuthenticator("user1", "pass"),
pinned_server_cert=cblpytest.sync_gateways[0].tls_cert(),
enable_document_listener=wait_for_doc_events,
)

await replicator.start()

if doc_events:
test_case.mark_test_step("Wait until receiving all document replication events")
await replicator.wait_for_all_doc_events(
events=doc_events,
max_retries=100,
)
else:
test_case.mark_test_step("Wait until the replicator is stopped.")
status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED)
assert status.error is None, (
f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}"
)

if compare_docs:
test_case.mark_test_step("Check that the doc is replicated correctly.")
await compare_local_and_remote(
db, sg, replicator_type, "upgrade", ["_default._default"], [doc_id]
)

local_doc = await db.get_document(DocumentEntry("_default._default", doc_id))
remote_doc = await sg.get_document("upgrade", doc_id)

assert local_doc is not None
assert remote_doc is not None
cbl_info(f"Revision Info after Replication ({replicator_type}):")
cbl_info(f"Local : RevID = {local_doc.revid}, HLV = {local_doc.cv}")
cbl_info(f"Remote : RevID = {remote_doc.revid}, HLV = {remote_doc.cv}")

if validator:
test_case.mark_test_step("Validate revid and HLV of local and remote doc.")
validator(
DocSnapshot(pre_local_doc, pre_remote_doc),
DocSnapshot(local_doc, remote_doc),
)
193 changes: 193 additions & 0 deletions tests/QE/test_replication_upgrade_delta_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from pathlib import Path

import pytest
from cbltest import CBLPyTest
from cbltest.api.cbltestclass import CBLTestClass
from cbltest.api.error import CblSyncGatewayBadResponseError
from cbltest.api.replicator_types import ReplicatorType
from cbltest.api.syncgateway import DocumentUpdateEntry, PutDatabasePayload
from cbltest.api.upgrade_test_helpers import (
DocSnapshot,
do_upgrade_replication_test,
setup_upgrade_env,
)

_DELTA_SYNC_UPGRADE_CONFIG: dict = {
"bucket": "upgrade",
"num_index_replicas": 0,
"scopes": {
"_default": {
"collections": {
"_default": {
"sync": (
"function(doc, oldDoc, meta) {"
" if (doc._deleted) { channel(oldDoc.channels); }"
" else { channel(doc.channels || 'upgrade'); }"
"}"
)
}
}
}
},
"import_docs": True,
"enable_shared_bucket_access": True,
"delta_sync": {"enabled": True},
}


@pytest.mark.sgw
@pytest.mark.min_test_servers(1)
@pytest.mark.min_sync_gateways(1)
@pytest.mark.min_couchbase_servers(1)
class TestUpgradeDeltaSync(CBLTestClass):
async def _prepare_sg_with_delta_sync(self, cblpytest: CBLPyTest) -> None:
sg = cblpytest.sync_gateways[0]
payload = PutDatabasePayload(_DELTA_SYNC_UPGRADE_CONFIG)

self.mark_test_step(
"Create SG 'upgrade' database with delta_sync enabled and import from bucket"
)
try:
await sg.put_database("upgrade", payload)
except CblSyncGatewayBadResponseError as e:
if e.code != 412:
raise
# DB already exists. Try to force-recreate so our delta_sync
# config is applied. delete_database silently swallows 403
# internally (config-managed DBs), so the delete may be a no-op
# and the retry put may also 412. Tolerate that — the
# verify-config assertion below is the real backstop and will
# dump the active config if delta_sync is not enabled.
await sg.delete_database("upgrade")
try:
await sg.put_database("upgrade", payload)
except CblSyncGatewayBadResponseError as e2:
if e2.code != 412:
raise
await sg.wait_for_db_up("upgrade")

Comment thread
vipbhardwaj marked this conversation as resolved.
self.mark_test_step(
"Verify delta_sync is actually enabled on SGW 'upgrade' database"
)
config = await sg.get_database_config("upgrade")
delta_sync = config.get("delta_sync") or {}
assert delta_sync.get("enabled") is True, (
"Prerequisite failed: SGW 'upgrade' database does not have "
f"delta_sync.enabled=True. Active config: {config!r}"
)

self.mark_test_step("Create user1 for replication")
collection_access = sg.create_collection_access_dict(
{"_default._default": ["*"]}
)
await sg.add_user("upgrade", "user1", "pass", collection_access)

@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.xfail(
strict=True,
reason=(
"SGW delta-sync history bug: when SGW sends a delta of a "
"revtree+HLV rev to a client holding the revtree-only ancestor, "
"the rev message's `history` field is empty. Fix pending."
),
)
async def test_delta_sync_history_pull_post_upgrade_sgw_mutation(
self, cblpytest: CBLPyTest, dataset_path: Path
) -> None:
doc_id = "nonconflict_3"
db = await setup_upgrade_env(self, cblpytest, dataset_path)
await self._prepare_sg_with_delta_sync(cblpytest)
sg = cblpytest.sync_gateways[0]

self.mark_test_step(
f"Mutate '{doc_id}' on 4.x SGW to produce a new revtree leaf + fresh HLV"
)
current = await sg.get_document("upgrade", doc_id)
assert current is not None, f"Expected '{doc_id}' imported from bucket"
assert current.revid is not None, (
f"Expected '{doc_id}' to have a revid pre-mutation, got None"
)
new_body = {**current.body, "updated_by": "delta_sync_history_test"}
await sg.update_documents(
"upgrade",
[DocumentUpdateEntry(doc_id, current.revid, body=new_body)],
)

def validator(pre: DocSnapshot, post: DocSnapshot) -> None:
assert pre.local.revid is not None and pre.local.cv is None, (
f"Local precondition invalid: RevID={pre.local.revid}, "
f"HLV={pre.local.cv} (expected revtree-only)"
)
assert pre.remote.revid is not None and pre.remote.cv is not None, (
f"Remote precondition invalid: RevID={pre.remote.revid}, "
f"HLV={pre.remote.cv} (expected revtree + HLV after 4.x mutation)"
)
assert not pre.remote.cv.endswith("@Revision+Tree+Encoding"), (
f"Expected canonical HLV on SGW after 4.x write, got RTE-encoded: "
f"{pre.remote.cv}"
)

assert post.local.revid is None, (
f"Expected post-pull local doc to be HLV-only, "
f"got revid={post.local.revid}"
)
assert post.local.cv and post.local.cv == post.remote.cv, (
f"Expected post-pull local HLV to match SGW HLV. "
f"Local={post.local.cv}, Remote={post.remote.cv}"
)

await do_upgrade_replication_test(
self,
cblpytest,
db,
doc_id=doc_id,
replicator_type=ReplicatorType.PULL,
compare_docs=False,
validator=validator,
)

@pytest.mark.asyncio(loop_scope="session")
async def test_delta_sync_history_pull_pre_upgrade_sgw_two_revs(
self, cblpytest: CBLPyTest, dataset_path: Path
) -> None:
doc_id = "nonconflict_2"
db = await setup_upgrade_env(self, cblpytest, dataset_path)
await self._prepare_sg_with_delta_sync(cblpytest)

def validator(pre: DocSnapshot, post: DocSnapshot) -> None:
assert pre.local.revid is not None and pre.local.cv is None, (
f"Local precondition invalid: RevID={pre.local.revid}, "
f"HLV={pre.local.cv} (expected revtree-only)"
)
assert pre.remote.revid is not None and pre.remote.cv is None, (
f"Remote precondition invalid: RevID={pre.remote.revid}, "
f"HLV={pre.remote.cv} (expected revtree-only, no HLV)"
)
assert pre.local.revid < pre.remote.revid, (
f"Pre-condition: expected local revid < remote revid, "
f"got local={pre.local.revid}, remote={pre.remote.revid}"
)

assert post.local.revid is None, (
f"Expected post-pull local doc to be HLV-only, "
f"got revid={post.local.revid}"
)
assert post.local.cv and post.local.cv.endswith(
"@Revision+Tree+Encoding"
), (
f"Expected post-pull local HLV to be RTE-encoded from the "
f"pulled revtree-only rev, got {post.local.cv}"
)
Comment thread
vipbhardwaj marked this conversation as resolved.
assert post.remote.cv is None, (
f"Expected SGW HLV unchanged (none) after PULL, got {post.remote.cv}"
)

await do_upgrade_replication_test(
self,
cblpytest,
db,
doc_id=doc_id,
replicator_type=ReplicatorType.PULL,
compare_docs=False,
validator=validator,
)
Loading
Loading