|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +import pytest |
| 4 | +from cbltest import CBLPyTest |
| 5 | +from cbltest.api.cbltestclass import CBLTestClass |
| 6 | +from cbltest.api.cloud import CouchbaseCloud |
| 7 | +from cbltest.api.database_types import EncryptedValue |
| 8 | +from cbltest.api.replicator import Replicator |
| 9 | +from cbltest.api.replicator_types import ( |
| 10 | + ReplicatorActivityLevel, |
| 11 | + ReplicatorBasicAuthenticator, |
| 12 | + ReplicatorCollectionEntry, |
| 13 | + ReplicatorType, |
| 14 | +) |
| 15 | +from cbltest.api.syncgateway import DocumentUpdateEntry |
| 16 | +from cbltest.api.test_functions import compare_local_and_remote |
| 17 | +from cbltest.responses import ServerVariant |
| 18 | + |
| 19 | + |
| 20 | +@pytest.mark.min_test_servers(1) |
| 21 | +@pytest.mark.min_sync_gateways(1) |
| 22 | +@pytest.mark.min_couchbase_servers(1) |
| 23 | +class TestReplicatorEncryptionHook(CBLTestClass): |
| 24 | + @pytest.mark.asyncio(loop_scope="session") |
| 25 | + async def test_replication_complex_doc_encryption( |
| 26 | + self, cblpytest: CBLPyTest, dataset_path: Path |
| 27 | + ): |
| 28 | + """ |
| 29 | + @summary: Testing dict and array encrypted values are present in 10-15th level of |
| 30 | + complex doc and replicated should detected values without any errors. |
| 31 | + 1. Have SG and CBL up and running |
| 32 | + 2. Create a complex document with encryption property (Array, and Dict) |
| 33 | + 3. Start the replicator and make sure documents are replicated on SG. |
| 34 | + 4. Verify encrypted fields and Verify data is encrypted. |
| 35 | + 5. Verify encrypted value at 15th level are detected by replicator and shown correctly on sg. |
| 36 | + """ |
| 37 | + await self.skip_if_not_platform(cblpytest.test_servers[0], ServerVariant.C) |
| 38 | + |
| 39 | + self.mark_test_step("Reset SG and load `posts` dataset") |
| 40 | + cloud = CouchbaseCloud( |
| 41 | + cblpytest.sync_gateways[0], cblpytest.couchbase_servers[0] |
| 42 | + ) |
| 43 | + await cloud.configure_dataset(dataset_path, "posts") |
| 44 | + |
| 45 | + self.mark_test_step("Reset local database, and load `posts` dataset.") |
| 46 | + dbs = await cblpytest.test_servers[0].create_and_reset_db( |
| 47 | + ["db1"], dataset="posts" |
| 48 | + ) |
| 49 | + db = dbs[0] |
| 50 | + |
| 51 | + self.mark_test_step("Replicate to CBL from SGW") |
| 52 | + replicator = Replicator( |
| 53 | + db, |
| 54 | + cblpytest.sync_gateways[0].replication_url("posts"), |
| 55 | + collections=[ReplicatorCollectionEntry(["_default.posts"])], |
| 56 | + authenticator=ReplicatorBasicAuthenticator("user1", "pass"), |
| 57 | + pinned_server_cert=cblpytest.sync_gateways[0].tls_cert(), |
| 58 | + ) |
| 59 | + await replicator.start() |
| 60 | + |
| 61 | + self.mark_test_step("Wait until the replicator stops.") |
| 62 | + status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED) |
| 63 | + assert status.error is None, ( |
| 64 | + f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}" |
| 65 | + ) |
| 66 | + |
| 67 | + self.mark_test_step("Check that all docs are replicated correctly.") |
| 68 | + lite_all_docs = await db.get_all_documents("_default.posts") |
| 69 | + assert len(lite_all_docs["_default.posts"]) == 5, ( |
| 70 | + f"Incorrect number of initial documents replicated (expected 5; got {len(lite_all_docs['_default.posts'])}" |
| 71 | + ) |
| 72 | + await compare_local_and_remote( |
| 73 | + db, |
| 74 | + cblpytest.sync_gateways[0], |
| 75 | + ReplicatorType.PUSH, |
| 76 | + "posts", |
| 77 | + ["_default.posts"], |
| 78 | + ) |
| 79 | + |
| 80 | + self.mark_test_step( |
| 81 | + "Create document in CBL with encrypted value at the 15th level of nesting" |
| 82 | + ) |
| 83 | + async with db.batch_updater() as b: |
| 84 | + b.upsert_document( |
| 85 | + "_default.posts", |
| 86 | + "post_1000", |
| 87 | + [ |
| 88 | + { |
| 89 | + "channels": ["group1"], |
| 90 | + "nest_1": { |
| 91 | + "nest_2": { |
| 92 | + "nest_3": { |
| 93 | + "nest_4": { |
| 94 | + "nest_5": { |
| 95 | + "nest_6": { |
| 96 | + "nest_7": { |
| 97 | + "nest_8": { |
| 98 | + "nest_9": { |
| 99 | + "nest_10": { |
| 100 | + "nest_11": { |
| 101 | + "nest_12": { |
| 102 | + "nest_13": { |
| 103 | + "nest_14": EncryptedValue( |
| 104 | + "secret_password" |
| 105 | + ) |
| 106 | + } |
| 107 | + } |
| 108 | + } |
| 109 | + } |
| 110 | + } |
| 111 | + } |
| 112 | + } |
| 113 | + } |
| 114 | + } |
| 115 | + } |
| 116 | + } |
| 117 | + } |
| 118 | + }, |
| 119 | + } |
| 120 | + ], |
| 121 | + ) |
| 122 | + |
| 123 | + self.mark_test_step("Replicate to SGW from CBL") |
| 124 | + await replicator.start() |
| 125 | + |
| 126 | + self.mark_test_step("Wait until the replicator stops.") |
| 127 | + status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED) |
| 128 | + assert status.error is None, ( |
| 129 | + f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}" |
| 130 | + ) |
| 131 | + lite_all_docs = await db.get_all_documents("_default.posts") |
| 132 | + assert len(lite_all_docs["_default.posts"]) == 6, ( |
| 133 | + f"Incorrect number of initial documents replicated (expected 6; got {len(lite_all_docs['_default.posts'])}" |
| 134 | + ) |
| 135 | + |
| 136 | + self.mark_test_step("Check that the document is in SGW") |
| 137 | + doc = await cblpytest.sync_gateways[0].get_document( |
| 138 | + "posts", "post_1000", collection="posts" |
| 139 | + ) |
| 140 | + self.mark_test_step(f"Document from SGW: {doc.id}") |
| 141 | + |
| 142 | + await cblpytest.test_servers[0].cleanup() |
| 143 | + self.mark_test_step("...COMPLETED...") |
| 144 | + |
| 145 | + @pytest.mark.asyncio(loop_scope="session") |
| 146 | + async def test_delta_sync_with_encryption( |
| 147 | + self, cblpytest: CBLPyTest, dataset_path: Path |
| 148 | + ): |
| 149 | + """ |
| 150 | + @summary: Verify Delta sync do not work when encryption callback hook is present |
| 151 | + Verify Doc is not editable in the SG |
| 152 | + 1. Have delta sync enabled |
| 153 | + 2. Create docs with encrypted field in CBL |
| 154 | + 3. Do push/pull replication to SGW |
| 155 | + 4. Update docs in CBL & SG |
| 156 | + 5. Replicate docs using pull replication. |
| 157 | + 6. Verify Bandwidth is saved for other documents |
| 158 | + """ |
| 159 | + await self.skip_if_not_platform(cblpytest.test_servers[0], ServerVariant.C) |
| 160 | + |
| 161 | + self.mark_test_step( |
| 162 | + "Reset SG and load `travel` dataset with delta sync enabled" |
| 163 | + ) |
| 164 | + cloud = CouchbaseCloud( |
| 165 | + cblpytest.sync_gateways[0], cblpytest.couchbase_servers[0] |
| 166 | + ) |
| 167 | + await cloud.configure_dataset(dataset_path, "travel", ["delta_sync"]) |
| 168 | + |
| 169 | + self.mark_test_step("Reset local database, and load `travel` dataset.") |
| 170 | + dbs = await cblpytest.test_servers[0].create_and_reset_db( |
| 171 | + ["db1"], dataset="travel" |
| 172 | + ) |
| 173 | + db = dbs[0] |
| 174 | + |
| 175 | + self.mark_test_step("Start a replicator") |
| 176 | + replicator = Replicator( |
| 177 | + db, |
| 178 | + cblpytest.sync_gateways[0].replication_url("travel"), |
| 179 | + collections=[ReplicatorCollectionEntry(["travel.hotels"])], |
| 180 | + authenticator=ReplicatorBasicAuthenticator("user1", "pass"), |
| 181 | + pinned_server_cert=cblpytest.sync_gateways[0].tls_cert(), |
| 182 | + ) |
| 183 | + await replicator.start() |
| 184 | + |
| 185 | + self.mark_test_step("Wait until the replicator stops.") |
| 186 | + status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED) |
| 187 | + assert status.error is None, ( |
| 188 | + f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}" |
| 189 | + ) |
| 190 | + |
| 191 | + self.mark_test_step("Check that all docs are replicated correctly.") |
| 192 | + lite_all_docs = await db.get_all_documents("travel.hotels") |
| 193 | + assert len(lite_all_docs["travel.hotels"]) == 700, ( |
| 194 | + f"Incorrect number of initial documents replicated (expected 700; got {len(lite_all_docs['travel.hotels'])}" |
| 195 | + ) |
| 196 | + await compare_local_and_remote( |
| 197 | + db, |
| 198 | + cblpytest.sync_gateways[0], |
| 199 | + ReplicatorType.PUSH_AND_PULL, |
| 200 | + "travel", |
| 201 | + ["travel.hotels"], |
| 202 | + ) |
| 203 | + |
| 204 | + self.mark_test_step("Create a document with encrypted field in CBL") |
| 205 | + async with db.batch_updater() as b: |
| 206 | + b.upsert_document( |
| 207 | + "travel.hotels", |
| 208 | + "hotel_1", |
| 209 | + [{"name": "CBL", "encrypted_field": EncryptedValue("secret_password")}], |
| 210 | + ) |
| 211 | + |
| 212 | + self.mark_test_step("Replicate to SGW from CBL") |
| 213 | + await replicator.start() |
| 214 | + |
| 215 | + self.mark_test_step("Wait until the replicator stops.") |
| 216 | + status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED) |
| 217 | + assert status.error is None, ( |
| 218 | + f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}" |
| 219 | + ) |
| 220 | + |
| 221 | + self.mark_test_step("Verify the new document is present in SGW") |
| 222 | + lite_all_docs = await db.get_all_documents("travel.hotels") |
| 223 | + assert len(lite_all_docs["travel.hotels"]) == 701, ( |
| 224 | + f"Incorrect number of new documents replicated (expected 701; got {len(lite_all_docs['travel.hotels'])}" |
| 225 | + ) |
| 226 | + doc = await cblpytest.sync_gateways[0].get_document( |
| 227 | + "travel", "hotel_1", "travel", "hotels" |
| 228 | + ) |
| 229 | + assert doc is not None, "Document should exist in SGW" |
| 230 | + assert doc.body.get("encrypted_field") is not None, ( |
| 231 | + "Encrypted value should be present" |
| 232 | + ) |
| 233 | + |
| 234 | + self.mark_test_step("Update docs in CBL & SGW") |
| 235 | + async with db.batch_updater() as b: |
| 236 | + b.upsert_document("travel.hotels", "hotel_2", {"name": "SGW"}) |
| 237 | + await cblpytest.sync_gateways[0].update_documents( |
| 238 | + "travel", |
| 239 | + [DocumentUpdateEntry("hotel_2", None, {"name": "CBL"})], |
| 240 | + "travel", |
| 241 | + "hotels", |
| 242 | + ) |
| 243 | + |
| 244 | + self.mark_test_step("Replicate docs using pull replication") |
| 245 | + replicator = Replicator( |
| 246 | + db, |
| 247 | + cblpytest.sync_gateways[0].replication_url("travel"), |
| 248 | + collections=[ReplicatorCollectionEntry(["travel.hotels"])], |
| 249 | + replicator_type=ReplicatorType.PULL, |
| 250 | + authenticator=ReplicatorBasicAuthenticator("user1", "pass"), |
| 251 | + pinned_server_cert=cblpytest.sync_gateways[0].tls_cert(), |
| 252 | + ) |
| 253 | + await replicator.start() |
| 254 | + status = await replicator.wait_for(ReplicatorActivityLevel.STOPPED) |
| 255 | + assert status.error is None, ( |
| 256 | + f"Error waiting for replicator: ({status.error.domain} / {status.error.code}) {status.error.message}" |
| 257 | + ) |
| 258 | + |
| 259 | + self.mark_test_step("Verify bandwidth is saved for other documents") |
| 260 | + lite_all_docs = await db.get_all_documents("travel.hotels") |
| 261 | + total_docs = len(lite_all_docs["travel.hotels"]) |
| 262 | + repl_status = await replicator.get_status() |
| 263 | + assert repl_status.progress.completed, "Expected replication to be completed" |
| 264 | + processed_docs = len(replicator.document_updates) |
| 265 | + assert processed_docs == 1, ( |
| 266 | + f"Expected only 1 document to be processed due to delta sync, but got {processed_docs}" |
| 267 | + ) |
| 268 | + assert processed_docs < total_docs, ( |
| 269 | + f"All documents ({total_docs}) were processed instead of just the mutated ones ({processed_docs})" |
| 270 | + ) |
| 271 | + |
| 272 | + await cblpytest.test_servers[0].clean_up() |
| 273 | + self.mark_test_step("...COMPLETED...") |
0 commit comments