Skip to content

Commit adea0d8

Browse files
committed
feat: delta-sync history based tests
1 parent 464758c commit adea0d8

3 files changed

Lines changed: 428 additions & 220 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import time
2+
from collections.abc import Callable
3+
from pathlib import Path
4+
from typing import TypeAlias
5+
6+
from cbltest import CBLPyTest, CouchbaseServer
7+
from cbltest.api.cbltestclass import CBLTestClass
8+
from cbltest.api.database import Database, GetDocumentResult
9+
from cbltest.api.database_types import DocumentEntry
10+
from cbltest.api.error import CblSyncGatewayBadResponseError
11+
from cbltest.api.replicator import Replicator
12+
from cbltest.api.replicator_types import (
13+
ReplicatorActivityLevel,
14+
ReplicatorBasicAuthenticator,
15+
ReplicatorCollectionEntry,
16+
ReplicatorConflictResolver,
17+
ReplicatorType,
18+
WaitForDocumentEventEntry,
19+
)
20+
from cbltest.api.syncgateway import RemoteDocument
21+
from cbltest.api.test_functions import compare_local_and_remote
22+
from cbltest.logging import cbl_info
23+
24+
25+
class DocSnapshot:
26+
def __init__(self, local: GetDocumentResult, remote: RemoteDocument):
27+
self.local = local
28+
self.remote = remote
29+
30+
31+
DocValidator: TypeAlias = Callable[[DocSnapshot, DocSnapshot], None]
32+
33+
34+
def tools_path() -> Path:
35+
# client/src/cbltest/api/upgrade_test_helpers.py -> parents[4] is repo root
36+
return Path(__file__).resolve().parents[4] / "tests" / ".tools"
37+
38+
39+
async def setup_upgrade_env(
40+
test_case: CBLTestClass, cblpytest: CBLPyTest, dataset_path: Path
41+
) -> Database:
42+
await test_case.skip_if_cbl_not(cblpytest.test_servers[0], ">= 4.0.0")
43+
44+
dataset_ver = cblpytest.test_servers[0].dataset_version
45+
test_case.skip_if_not(
46+
dataset_ver == "4.0", f"Requires dataset v4.0 (current: {dataset_ver})."
47+
)
48+
49+
test_case.mark_test_step("Delete Sync Gateway 'upgrade' database if exists")
50+
sg = cblpytest.sync_gateways[0]
51+
try:
52+
await sg.delete_database("upgrade")
53+
except CblSyncGatewayBadResponseError as e:
54+
if e.code != 403:
55+
raise
56+
57+
test_case.mark_test_step("Restore Couchbase Server Bucket using `upgrade` dataset")
58+
cbs: CouchbaseServer = cblpytest.couchbase_servers[0]
59+
cbs.drop_bucket("upgrade")
60+
cbs.restore_bucket("upgrade", tools_path(), dataset_path, "upgrade")
61+
62+
test_case.mark_test_step("Wait 2s to ensure SG picks up the restored database.")
63+
time.sleep(2)
64+
65+
test_case.mark_test_step("Reset local database, and load `upgrade` dataset.")
66+
dbs = await cblpytest.test_servers[0].create_and_reset_db(
67+
["db1"], dataset="upgrade"
68+
)
69+
return dbs[0]
70+
71+
72+
async def do_upgrade_replication_test(
73+
test_case: CBLTestClass,
74+
cblpytest: CBLPyTest,
75+
db: Database,
76+
doc_id: str,
77+
replicator_type: ReplicatorType,
78+
conflict_resolver: ReplicatorConflictResolver | None = None,
79+
doc_events: set[WaitForDocumentEventEntry] | None = None,
80+
compare_docs: bool | None = True,
81+
validator: DocValidator | None = None,
82+
) -> None:
83+
sg = cblpytest.sync_gateways[0]
84+
85+
pre_local_doc = await db.get_document(DocumentEntry("_default._default", doc_id))
86+
pre_remote_doc = await sg.get_document("upgrade", doc_id)
87+
88+
assert pre_local_doc is not None
89+
assert pre_remote_doc is not None
90+
cbl_info(f"Revision Info before Replication ({replicator_type}):")
91+
cbl_info(f"Local : RevID = {pre_local_doc.revid}, HLV = {pre_local_doc.cv}")
92+
cbl_info(f"Remote : RevID = {pre_remote_doc.revid}, HLV = {pre_remote_doc.cv}")
93+
94+
wait_for_doc_events = bool(doc_events)
95+
96+
conflict_resolver_name = (
97+
f"{conflict_resolver.name}" if conflict_resolver else "None"
98+
)
99+
100+
test_case.mark_test_step(f"""
101+
Start a replicator:
102+
* endpoint: '/upgrade'
103+
* collections : '_default._default'
104+
* type: {replicator_type}
105+
* document_ids: ['{doc_id}']
106+
* continuous: {wait_for_doc_events}
107+
* conflict_resolver: {conflict_resolver_name}
108+
""")
109+
replicator = Replicator(
110+
db,
111+
cblpytest.sync_gateways[0].replication_url("upgrade"),
112+
collections=[
113+
ReplicatorCollectionEntry(
114+
names=["_default._default"],
115+
document_ids=[doc_id],
116+
conflict_resolver=conflict_resolver,
117+
)
118+
],
119+
replicator_type=replicator_type,
120+
continuous=wait_for_doc_events,
121+
authenticator=ReplicatorBasicAuthenticator("user1", "pass"),
122+
pinned_server_cert=cblpytest.sync_gateways[0].tls_cert(),
123+
enable_document_listener=wait_for_doc_events,
124+
)
125+
126+
await replicator.start()
127+
128+
if doc_events:
129+
test_case.mark_test_step("Wait until receiving all document replication events")
130+
await replicator.wait_for_all_doc_events(
131+
events=doc_events,
132+
max_retries=100,
133+
)
134+
else:
135+
test_case.mark_test_step("Wait until the replicator is stopped.")
136+
status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED)
137+
assert status.error is None, (
138+
f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}"
139+
)
140+
141+
if compare_docs:
142+
test_case.mark_test_step("Check that the doc is replicated correctly.")
143+
await compare_local_and_remote(
144+
db, sg, replicator_type, "upgrade", ["_default._default"], [doc_id]
145+
)
146+
147+
local_doc = await db.get_document(DocumentEntry("_default._default", doc_id))
148+
remote_doc = await sg.get_document("upgrade", doc_id)
149+
150+
assert local_doc is not None
151+
assert remote_doc is not None
152+
cbl_info(f"Revision Info after Replication ({replicator_type}):")
153+
cbl_info(f"Local : RevID = {local_doc.revid}, HLV = {local_doc.cv}")
154+
cbl_info(f"Remote : RevID = {remote_doc.revid}, HLV = {remote_doc.cv}")
155+
156+
if validator:
157+
test_case.mark_test_step("Validate revid and HLV of local and remote doc.")
158+
validator(
159+
DocSnapshot(pre_local_doc, pre_remote_doc),
160+
DocSnapshot(local_doc, remote_doc),
161+
)
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
from pathlib import Path
2+
3+
import pytest
4+
from cbltest import CBLPyTest
5+
from cbltest.api.cbltestclass import CBLTestClass
6+
from cbltest.api.error import CblSyncGatewayBadResponseError
7+
from cbltest.api.replicator_types import ReplicatorType
8+
from cbltest.api.syncgateway import DocumentUpdateEntry, PutDatabasePayload
9+
from cbltest.api.upgrade_test_helpers import (
10+
DocSnapshot,
11+
do_upgrade_replication_test,
12+
setup_upgrade_env,
13+
)
14+
15+
_DELTA_SYNC_UPGRADE_CONFIG: dict = {
16+
"bucket": "upgrade",
17+
"num_index_replicas": 0,
18+
"scopes": {
19+
"_default": {
20+
"collections": {
21+
"_default": {
22+
"sync": (
23+
"function(doc, oldDoc, meta) {"
24+
" if (doc._deleted) { channel(oldDoc.channels); }"
25+
" else { channel(doc.channels || 'upgrade'); }"
26+
"}"
27+
)
28+
}
29+
}
30+
}
31+
},
32+
"import_docs": True,
33+
"enable_shared_bucket_access": True,
34+
"delta_sync": {"enabled": True},
35+
}
36+
37+
38+
@pytest.mark.sgw
39+
@pytest.mark.min_test_servers(1)
40+
@pytest.mark.min_sync_gateways(1)
41+
@pytest.mark.min_couchbase_servers(1)
42+
class TestUpgradeDeltaSync(CBLTestClass):
43+
async def _prepare_sg_with_delta_sync(self, cblpytest: CBLPyTest) -> None:
44+
sg = cblpytest.sync_gateways[0]
45+
46+
self.mark_test_step(
47+
"Create SG 'upgrade' database with delta_sync enabled and import from bucket"
48+
)
49+
try:
50+
await sg.put_database(
51+
"upgrade", PutDatabasePayload(_DELTA_SYNC_UPGRADE_CONFIG)
52+
)
53+
except CblSyncGatewayBadResponseError as e:
54+
if e.code != 412:
55+
raise
56+
await sg.wait_for_db_up("upgrade")
57+
58+
self.mark_test_step("Create user1 for replication")
59+
collection_access = sg.create_collection_access_dict(
60+
{"_default._default": ["*"]}
61+
)
62+
await sg.add_user("upgrade", "user1", "pass", collection_access)
63+
64+
@pytest.mark.asyncio(loop_scope="session")
65+
@pytest.mark.xfail(
66+
strict=True,
67+
reason=(
68+
"SGW delta-sync history bug: when SGW sends a delta of a "
69+
"revtree+HLV rev to a client holding the revtree-only ancestor, "
70+
"the rev message's `history` field is empty. Fix pending."
71+
),
72+
)
73+
async def test_delta_sync_history_pull_post_upgrade_sgw_mutation(
74+
self, cblpytest: CBLPyTest, dataset_path: Path
75+
) -> None:
76+
doc_id = "nonconflict_3"
77+
db = await setup_upgrade_env(self, cblpytest, dataset_path)
78+
await self._prepare_sg_with_delta_sync(cblpytest)
79+
sg = cblpytest.sync_gateways[0]
80+
81+
self.mark_test_step(
82+
f"Mutate '{doc_id}' on 4.x SGW to produce a new revtree leaf + fresh HLV"
83+
)
84+
current = await sg.get_document("upgrade", doc_id)
85+
assert current is not None, f"Expected '{doc_id}' imported from bucket"
86+
assert current.revid is not None, (
87+
f"Expected '{doc_id}' to have a revid pre-mutation, got None"
88+
)
89+
new_body = {**current.body, "updated_by": "delta_sync_history_test"}
90+
await sg.update_documents(
91+
"upgrade",
92+
[DocumentUpdateEntry(doc_id, current.revid, body=new_body)],
93+
)
94+
95+
def validator(pre: DocSnapshot, post: DocSnapshot) -> None:
96+
assert pre.local.revid is not None and pre.local.cv is None, (
97+
f"Local precondition invalid: RevID={pre.local.revid}, "
98+
f"HLV={pre.local.cv} (expected revtree-only)"
99+
)
100+
assert pre.remote.revid is not None and pre.remote.cv is not None, (
101+
f"Remote precondition invalid: RevID={pre.remote.revid}, "
102+
f"HLV={pre.remote.cv} (expected revtree + HLV after 4.x mutation)"
103+
)
104+
assert not pre.remote.cv.endswith("@Revision+Tree+Encoding"), (
105+
f"Expected canonical HLV on SGW after 4.x write, got RTE-encoded: "
106+
f"{pre.remote.cv}"
107+
)
108+
109+
assert post.local.revid is None, (
110+
f"Expected post-pull local doc to be HLV-only, "
111+
f"got revid={post.local.revid}"
112+
)
113+
assert post.local.cv and post.local.cv == post.remote.cv, (
114+
f"Expected post-pull local HLV to match SGW HLV. "
115+
f"Local={post.local.cv}, Remote={post.remote.cv}"
116+
)
117+
118+
await do_upgrade_replication_test(
119+
self,
120+
cblpytest,
121+
db,
122+
doc_id=doc_id,
123+
replicator_type=ReplicatorType.PULL,
124+
compare_docs=False,
125+
validator=validator,
126+
)
127+
128+
@pytest.mark.asyncio(loop_scope="session")
129+
async def test_delta_sync_history_pull_pre_upgrade_sgw_two_revs(
130+
self, cblpytest: CBLPyTest, dataset_path: Path
131+
) -> None:
132+
doc_id = "nonconflict_2"
133+
db = await setup_upgrade_env(self, cblpytest, dataset_path)
134+
await self._prepare_sg_with_delta_sync(cblpytest)
135+
136+
def validator(pre: DocSnapshot, post: DocSnapshot) -> None:
137+
assert pre.local.revid is not None and pre.local.cv is None, (
138+
f"Local precondition invalid: RevID={pre.local.revid}, "
139+
f"HLV={pre.local.cv} (expected revtree-only)"
140+
)
141+
assert pre.remote.revid is not None and pre.remote.cv is None, (
142+
f"Remote precondition invalid: RevID={pre.remote.revid}, "
143+
f"HLV={pre.remote.cv} (expected revtree-only, no HLV)"
144+
)
145+
assert pre.local.revid < pre.remote.revid, (
146+
f"Pre-condition: expected local revid < remote revid, "
147+
f"got local={pre.local.revid}, remote={pre.remote.revid}"
148+
)
149+
150+
assert post.local.revid is None, (
151+
f"Expected post-pull local doc to be HLV-only, "
152+
f"got revid={post.local.revid}"
153+
)
154+
assert post.local.cv and post.local.cv.endswith(
155+
"@Revision+Tree+Encoding"
156+
), (
157+
f"Expected post-pull local HLV to be RTE-encoded from the "
158+
f"pulled revtree-only rev, got {post.local.cv}"
159+
)
160+
assert post.remote.cv is None, (
161+
f"Expected SGW HLV unchanged (none) after PULL, got {post.remote.cv}"
162+
)
163+
164+
await do_upgrade_replication_test(
165+
self,
166+
cblpytest,
167+
db,
168+
doc_id=doc_id,
169+
replicator_type=ReplicatorType.PULL,
170+
compare_docs=False,
171+
validator=validator,
172+
)

0 commit comments

Comments
 (0)