Skip to content

Commit 3507c8a

Browse files
Add mongo ycsb driver support
Change-Id: I9df090e623d386b72363d46ae422204e99d930b8 Reviewed-on: https://review.couchbase.org/c/perfrunner/+/245470 Reviewed-by: Salim Salim <salim.salim@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 62d8ad6 commit 3507c8a

4 files changed

Lines changed: 255 additions & 2 deletions

File tree

perfrunner/helpers/local.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,146 @@ def run_ycsb(
930930
run_local_shell_command(cmd_args, quiet=True, cwd=cwd, stderr=stderr_file)
931931

932932

933+
def run_mongo_ycsb(
934+
hosts: List[str],
935+
database: str,
936+
action: str,
937+
workload: str,
938+
items: int,
939+
workers: int,
940+
target: int,
941+
port: int = 27017,
942+
instance: int = 0,
943+
username: Optional[str] = None,
944+
password: Optional[str] = None,
945+
replica_set_name: Optional[str] = None,
946+
ycsb_client: str = "mongodb",
947+
write_concern: str = "majority",
948+
batch_size: int = 1,
949+
requestdistribution: str = "zipfian",
950+
fieldlength: int = 1024,
951+
fieldcount: int = 10,
952+
upsert: bool = False,
953+
write_all_fields: bool = True,
954+
journal_ack: bool = True,
955+
read_preference: Optional[str] = None,
956+
read_concern: Optional[str] = None,
957+
max_pool_size: Optional[int] = None,
958+
ops: Optional[int] = None,
959+
execution_time: Optional[int] = None,
960+
timeseries: int = 0,
961+
phase_params: Optional[dict] = None,
962+
insert_test_params: Optional[dict] = None,
963+
soe_params: Optional[dict] = None,
964+
ycsb_jvm_args: Optional[str] = None,
965+
measurement_type: Optional[str] = None,
966+
histogram_buckets: Optional[int] = None,
967+
histogram_bucket_size: Optional[int] = None,
968+
verbose_histogram: bool = False,
969+
ycsb_status_output: bool = True,
970+
):
971+
seeds = ",".join(f"{h}:{port}" for h in hosts)
972+
973+
cred = ""
974+
if username and password:
975+
cred = (
976+
f"{urllib.parse.quote(username, safe='')}:"
977+
f"{urllib.parse.quote(password, safe='')}@"
978+
)
979+
980+
uri_params = [f"w={write_concern}"]
981+
if replica_set_name:
982+
uri_params.append(f"replicaSet={replica_set_name}")
983+
if not journal_ack:
984+
uri_params.append("journal=false")
985+
if read_preference:
986+
uri_params.append(f"readPreference={read_preference}")
987+
if read_concern:
988+
uri_params.append(f"readConcernLevel={read_concern}")
989+
if max_pool_size:
990+
uri_params.append(f"maxPoolSize={max_pool_size}")
991+
mongo_url = f"mongodb://{cred}{seeds}/{database}?" + "&".join(uri_params)
992+
993+
parameters = [
994+
f"target={target}",
995+
f"fieldlength={fieldlength}",
996+
f"fieldcount={fieldcount}",
997+
f"requestdistribution={requestdistribution}",
998+
f"writeallfields={str(write_all_fields).lower()}",
999+
f"mongodb.url={mongo_url}",
1000+
f"mongodb.database={database}",
1001+
f"mongodb.batchsize={batch_size}",
1002+
f"mongodb.writeConcern={write_concern}",
1003+
f"mongodb.upsert={str(upsert).lower()}",
1004+
f"exportfile=ycsb_{action}_{instance}_{database}.log",
1005+
]
1006+
1007+
if ops is not None:
1008+
parameters += [f"operationcount={ops}"]
1009+
if execution_time is not None:
1010+
parameters += [f"maxexecutiontime={execution_time}"]
1011+
if ycsb_jvm_args is not None:
1012+
parameters += [f"jvm-args='{ycsb_jvm_args}'"]
1013+
1014+
if timeseries:
1015+
parameters += ["measurementtype=timeseries"]
1016+
elif measurement_type:
1017+
parameters += [f"measurementtype={measurement_type}"]
1018+
if histogram_buckets:
1019+
parameters += [f"histogram.buckets={histogram_buckets}"]
1020+
if histogram_bucket_size:
1021+
parameters += [f"histogram.bucket.size={histogram_bucket_size}"]
1022+
if verbose_histogram:
1023+
parameters += ["measurement.histogram.verbose=true"]
1024+
1025+
if soe_params:
1026+
parameters += [
1027+
f"totalrecordcount={items}",
1028+
f"recordcount={soe_params['recorded_load_cache_size']}",
1029+
f"insertstart={soe_params['insertstart']}",
1030+
]
1031+
elif phase_params:
1032+
parameters += [
1033+
f"totalrecordcount={items}",
1034+
f"recordcount={phase_params['inserts_per_workerinstance']}",
1035+
f"insertstart={phase_params['insertstart']}",
1036+
]
1037+
elif insert_test_params:
1038+
parameters += [
1039+
f"recordcount={insert_test_params['recordcount']}",
1040+
f"insertstart={insert_test_params['insertstart']}",
1041+
]
1042+
else:
1043+
parameters += [f"recordcount={items}"]
1044+
1045+
cwd = "YCSB"
1046+
max_arg_strlen = get_max_arg_strlen()
1047+
if too_long_params := [arg for arg in parameters if len(arg) > max_arg_strlen]:
1048+
parameters = [arg for arg in parameters if arg not in too_long_params]
1049+
logger.info(f"Some parameters are too long to provide via CLI: {too_long_params}")
1050+
custom_workload = f"{workload}_custom_{uuid4().hex[:6]}"
1051+
logger.info(
1052+
f"Copying {workload} file to {custom_workload} and inserting long parameters there."
1053+
)
1054+
inject_params_into_ycsb_workload_file(
1055+
f"{cwd}/{workload}", f"{cwd}/{custom_workload}", too_long_params
1056+
)
1057+
workload = custom_workload
1058+
1059+
cmd_args = ["bin/ycsb", action, ycsb_client]
1060+
if ycsb_status_output:
1061+
cmd_args.append("-s")
1062+
cmd_args += ["-threads", workers, "-P", workload] + [
1063+
arg for param in parameters for arg in ["-p", param]
1064+
]
1065+
cmd_args = list(map(str, cmd_args))
1066+
1067+
logger.info(f"Running: {shlex.join(cmd_args)}")
1068+
run_local_shell_command("pyenv local 2.7.18", quiet=True, cwd=cwd)
1069+
with open(f"{cwd}/ycsb_{action}_{instance}_{database}_stderr.log", "w") as stderr_file:
1070+
run_local_shell_command(cmd_args, quiet=True, cwd=cwd, stderr=stderr_file)
1071+
1072+
9331073
def inject_params_into_ycsb_workload_file(
9341074
source_filename: str, target_filename: str, params: list[str]
9351075
):

perfrunner/helpers/worker.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@
6060
)
6161
from perfrunner.workloads.vectordb_bench import run_vectordb_bench_case
6262
from perfrunner.workloads.xdcr_conflict_sim import run_conflictsim
63-
from perfrunner.workloads.ycsb import ycsb_data_load, ycsb_workload
63+
from perfrunner.workloads.ycsb import (
64+
ycsb_data_load,
65+
ycsb_mongo_data_load,
66+
ycsb_mongo_workload,
67+
ycsb_workload,
68+
)
6469

6570
try:
6671
set_start_method("fork")
@@ -144,6 +149,16 @@ def ycsb_data_load_task(*args):
144149
ycsb_data_load(*args)
145150

146151

152+
@celery.task
153+
def ycsb_mongo_data_load_task(*args):
154+
ycsb_mongo_data_load(*args)
155+
156+
157+
@celery.task
158+
def ycsb_mongo_workload_task(*args):
159+
ycsb_mongo_workload(*args)
160+
161+
147162
@celery.task
148163
def ycsb_task(*args):
149164
ycsb_workload(*args)

perfrunner/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,6 +1808,9 @@ def __init__(self, options: dict):
18081808
self.ycsb_client = options.get('ycsb_client', self.YCSB_CLIENT)
18091809
self.ycsb_out_of_order = int(options.get('out_of_order', self.YCSB_OUT_OF_ORDER))
18101810
self.insertstart = int(options.get('insertstart', self.YCSB_INSERTSTART))
1811+
self.upsert = maybe_atoi(options.get("upsert", "false"))
1812+
self.mongo_batch_size = options.get("mongo_batch_size")
1813+
self.mongo_durability_level = options.get("mongo_durability_level")
18111814
self.ycsb_split_workload = int(options.get('ycsb_split_workload', self.YCSB_SPLIT_WORKLOAD))
18121815

18131816
self.range_scan_sampling = options.get('range_scan_sampling', self.RANGE_SCAN_SAMPLING)

perfrunner/workloads/ycsb.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from perfrunner.helpers.local import restart_memcached, run_ycsb
1+
from perfrunner.helpers.local import restart_memcached, run_mongo_ycsb, run_ycsb
22
from perfrunner.settings import PhaseSettings, TargetSettings
33

44

@@ -148,3 +148,98 @@ def ycsb_workload(workload_settings: PhaseSettings,
148148
histogram_bucket_size=workload_settings.histogram_bucket_size,
149149
verbose_histogram=workload_settings.verbose_histogram,
150150
)
151+
152+
def ycsb_mongo_data_load(workload_settings: PhaseSettings,
153+
target: TargetSettings,
154+
timer: int,
155+
instance: int):
156+
phase_params = None
157+
if workload_settings.phase:
158+
phase_params = {
159+
'insertstart': instance * workload_settings.inserts_per_workerinstance +
160+
workload_settings.insertstart,
161+
'inserts_per_workerinstance': workload_settings.inserts_per_workerinstance,
162+
}
163+
164+
host = target.node
165+
if target.cloud:
166+
host = target.cloud.get("cluster_svc", host)
167+
168+
run_mongo_ycsb(
169+
hosts=[host],
170+
database=target.bucket,
171+
username=target.username,
172+
password=target.password,
173+
action="load",
174+
ycsb_client=workload_settings.ycsb_client,
175+
workload=workload_settings.workload_path,
176+
items=int(workload_settings.items),
177+
workers=workload_settings.workers,
178+
target=int(workload_settings.target),
179+
instance=instance,
180+
fieldlength=workload_settings.field_length,
181+
fieldcount=workload_settings.field_count,
182+
upsert=bool(workload_settings.upsert),
183+
batch_size=workload_settings.mongo_batch_size or 1,
184+
write_concern=workload_settings.mongo_durability_level or "majority",
185+
requestdistribution=workload_settings.requestdistribution,
186+
ycsb_jvm_args=workload_settings.ycsb_jvm_args,
187+
timeseries=workload_settings.timeseries,
188+
phase_params=phase_params,
189+
)
190+
191+
192+
def ycsb_mongo_workload(workload_settings: PhaseSettings,
193+
target: TargetSettings,
194+
timer: int,
195+
instance: int):
196+
197+
if workload_settings.ycsb_split_workload:
198+
split_instance = workload_settings.workload_instances // 2
199+
200+
if instance < split_instance:
201+
workload_settings.workload_path = workload_settings.workload_path.split(",")[0]
202+
elif instance >= split_instance:
203+
workload_settings.workload_path = workload_settings.workload_path.split(",")[1]
204+
205+
insert_test_params = None
206+
if workload_settings.insert_test_flag:
207+
insert_test_params = {
208+
'insertstart': int(instance * workload_settings.inserts_per_workerinstance +
209+
workload_settings.items),
210+
'recordcount': int((instance+1) * workload_settings.inserts_per_workerinstance +
211+
workload_settings.items),
212+
}
213+
214+
host = target.node
215+
if target.cloud:
216+
host = target.cloud.get("cluster_svc", host)
217+
218+
run_mongo_ycsb(
219+
hosts=[host],
220+
database=target.bucket,
221+
username=target.username,
222+
password=target.password,
223+
action="run",
224+
ycsb_client=workload_settings.ycsb_client,
225+
workload=workload_settings.workload_path,
226+
items=int(workload_settings.items),
227+
workers=workload_settings.workers,
228+
target=int(workload_settings.target),
229+
ops=int(workload_settings.ops),
230+
instance=instance,
231+
execution_time=workload_settings.time,
232+
timeseries=workload_settings.timeseries,
233+
fieldlength=workload_settings.field_length,
234+
fieldcount=workload_settings.field_count,
235+
upsert=bool(workload_settings.upsert),
236+
batch_size=workload_settings.mongo_batch_size or 1,
237+
write_concern=workload_settings.mongo_durability_level or "majority",
238+
requestdistribution=workload_settings.requestdistribution,
239+
ycsb_jvm_args=workload_settings.ycsb_jvm_args,
240+
insert_test_params=insert_test_params,
241+
measurement_type=workload_settings.measurement_type,
242+
histogram_buckets=workload_settings.histogram_buckets,
243+
histogram_bucket_size=workload_settings.histogram_bucket_size,
244+
verbose_histogram=bool(workload_settings.verbose_histogram),
245+
)

0 commit comments

Comments
 (0)