Skip to content

Commit a5ba444

Browse files
Support for doc loading using spec
Change-Id: I8527547daa7c8dda64c666bb1d2e9539eaf7f134 Reviewed-on: http://review.couchbase.org/c/TAF/+/154738 Reviewed-by: Ritesh Agarwal <ritesh.agarwal@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 4d171f9 commit a5ba444

2 files changed

Lines changed: 90 additions & 20 deletions

File tree

pytests/magma/magma_base.py

Lines changed: 90 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import random
44
import subprocess
55
import time
6+
import copy
67

78
from BucketLib.BucketOperations import BucketHelper
89
from BucketLib.bucket import Bucket
@@ -37,7 +38,8 @@ def setUp(self):
3738
self.retry_exceptions = [SDKException.TimeoutException,
3839
SDKException.AmbiguousTimeoutException,
3940
SDKException.RequestCanceledException,
40-
SDKException.UnambiguousTimeoutException]
41+
SDKException.UnambiguousTimeoutException,
42+
SDKException.ServerOutOfMemoryException]
4143
self.ignore_exceptions = []
4244

4345
# Sets autocompaction at bucket level
@@ -184,6 +186,7 @@ def setUp(self):
184186
self.gen_delete = None
185187
self.gen_read = None
186188
self.gen_update = None
189+
self.gen_expiry = None
187190
self.create_perc = self.input.param("update_perc", 100)
188191
self.update_perc = self.input.param("update_perc", 0)
189192
self.delete_perc = self.input.param("delete_perc", 0)
@@ -221,6 +224,9 @@ def setUp(self):
221224
self.update_itr = self.input.param("update_itr", 2)
222225
self.next_half = self.input.param("next_half", False)
223226
self.deep_copy = self.input.param("deep_copy", False)
227+
self.suppress_error_table = True
228+
self.skip_read_on_error = False
229+
self.track_failures = True
224230

225231
# self.thread_count is used to define number of thread use
226232
# to read same number of documents parallelly
@@ -239,6 +245,7 @@ def setUp(self):
239245
compression_settings=self.sdk_compression)
240246

241247
# Initial Data Load
248+
self.loader_dict = None
242249
self.init_loading = self.input.param("init_loading", True)
243250
if self.init_loading:
244251
if self.active_resident_threshold < 100:
@@ -248,32 +255,96 @@ def setUp(self):
248255
self.initial_load()
249256
self.log.info("==========Finished magma base setup========")
250257

258+
def _loader_dict(self):
259+
loader_dict = dict()
260+
common_params = {"retry_exceptions": self.retry_exceptions,
261+
"suppress_error_table": self.suppress_error_table,
262+
"durability_level": self.durability_level,
263+
"skip_read_success_results": False,
264+
"target_items": 5000,
265+
"skip_read_on_error": self.skip_read_on_error,
266+
"track_failures": self.track_failures,
267+
"ignore_exceptions": self.ignore_exceptions,
268+
"sdk_timeout_unit": "seconds",
269+
"sdk_timeout": 60,
270+
"doc_ttl": 0,
271+
"doc_gen_type": "default"}
272+
for bucket in self.bucket_util.buckets:
273+
loader_dict.update({bucket: dict()})
274+
loader_dict[bucket].update({"scopes": dict()})
275+
for scope in bucket.scopes.keys():
276+
loader_dict[bucket]["scopes"].update({scope: dict()})
277+
loader_dict[bucket]["scopes"][scope].update({"collections":dict()})
278+
for collection in bucket.scopes[scope].collections.keys():
279+
loader_dict[bucket]["scopes"][scope]["collections"].update({collection:dict()})
280+
if self.gen_update is not None:
281+
op_type = "update"
282+
common_params.update({"doc_gen": self.gen_update})
283+
loader_dict[bucket]["scopes"][scope]["collections"][collection][op_type] = copy.deepcopy(common_params)
284+
if self.gen_create is not None:
285+
op_type = "create"
286+
common_params.update({"doc_gen": self.gen_create})
287+
loader_dict[bucket]["scopes"][scope]["collections"][collection][op_type] = copy.deepcopy(common_params)
288+
if self.gen_delete is not None:
289+
op_type = "delete"
290+
common_params.update({"doc_gen": self.gen_delete})
291+
loader_dict[bucket]["scopes"][scope]["collections"][collection][op_type] = copy.deepcopy(common_params)
292+
if self.gen_expiry is not None and self.maxttl:
293+
op_type = "update"
294+
common_params.update({"doc_gen": self.gen_expiry,
295+
"doc_ttl": self.maxttl})
296+
loader_dict[bucket]["scopes"][scope]["collections"][collection][op_type] = copy.deepcopy(common_params)
297+
common_params.update({"doc_ttl": 0})
298+
if self.gen_read is not None:
299+
op_type = "read"
300+
common_params.update({"doc_gen": self.gen_read,
301+
"skip_read_success_results": True,
302+
"track_failures": False,
303+
"suppress_error_table": True})
304+
loader_dict[bucket]["scopes"][scope]["collections"][collection][op_type] = common_params
305+
self.loader_dict = loader_dict
306+
307+
def doc_loader(self, loader_spec):
308+
task = self.task.async_load_gen_docs_from_spec(
309+
self.cluster, self.task_manager, loader_spec,
310+
self.sdk_client_pool,
311+
batch_size=self.batch_size,
312+
process_concurrency=self.process_concurrency,
313+
print_ops_rate=True,
314+
start_task=True,
315+
track_failures=self.track_failures)
316+
317+
return task
318+
319+
def data_load(self):
320+
self._loader_dict()
321+
return self.doc_loader(self.loader_dict)
322+
323+
def wait_for_doc_load_completion(self, task, wait_for_stats=True):
324+
self.task_manager.get_task_result(task)
325+
self.bucket_util.validate_doc_loading_results(task)
326+
self.assertTrue(task.result,
327+
"Doc ops failed for task: {}".format(task.thread_name))
328+
329+
if wait_for_stats:
330+
try:
331+
self.bucket_util._wait_for_stats_all_buckets(timeout=1800)
332+
except Exception as e:
333+
raise e
334+
251335
def initial_load(self):
252336
self.create_start = 0
253337
self.create_end = self.init_items_per_collection
254338
if self.rev_write:
255339
self.create_start = -int(self.init_items_per_collection - 1)
256340
self.create_end = 1
257341

342+
self.generate_docs(doc_ops="create")
343+
258344
self.log.debug("initial_items_in_each_collection {}".format(self.init_items_per_collection))
259-
tasks_info = dict()
260-
for collection in self.collections:
261-
self.generate_docs(doc_ops="create", target_vbucket=None)
262-
tem_tasks_info = self.loadgen_docs(
263-
self.retry_exceptions,
264-
self.ignore_exceptions,
265-
scope=CbServer.default_scope,
266-
collection=collection,
267-
_sync=False,
268-
doc_ops="create")
269-
tasks_info.update(tem_tasks_info.items())
270-
for task in tasks_info:
271-
self.task_manager.get_task_result(task)
272-
self.bucket_util.verify_doc_op_task_exceptions(
273-
tasks_info, self.cluster)
274-
self.bucket_util.log_doc_ops_task_failures(tasks_info)
275-
self.bucket_util._wait_for_stats_all_buckets(check_ep_items_remaining=True,
276-
timeout=3600)
345+
task = self.data_load()
346+
self.wait_for_doc_load_completion(task, True)
347+
277348
if self.standard_buckets == 1 or self.standard_buckets == self.magma_buckets:
278349
for bucket in self.bucket_util.get_all_buckets():
279350
disk_usage = self.get_disk_usage(

pytests/magma/magma_rollback.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1987,7 +1987,6 @@ def test_rebalance_during_rollback(self):
19871987
for shell in shell_conn:
19881988
shell.disconnect()
19891989

1990-
19911990
class MagmaSpaceAmplification(MagmaBaseTest):
19921991

19931992
def test_space_amplification(self):

0 commit comments

Comments
 (0)