Skip to content

Commit e386108

Browse files
committed
Simplify celery app configuration
+ Remove celeryremote.py and celerylocal.py and define all celery config options in worker.py + Distinguish common, remote-only and local-only celery config options + Remove redundant kombu registry calls + Make celery on the orchestrator node (the one running env/bin/perfrunner) only get configured once via RemoteWorkerManager/LocalWorkerManager Change-Id: I1e5d6b65e3348152bcd4ccb9750cff80764a90ff Reviewed-on: https://review.couchbase.org/c/perfrunner/+/242603 Reviewed-by: Salim Salim <salim.salim@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent ea2df2d commit e386108

3 files changed

Lines changed: 56 additions & 106 deletions

File tree

perfrunner/celerylocal.py

Lines changed: 0 additions & 14 deletions
This file was deleted.

perfrunner/celeryremote.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

perfrunner/helpers/worker.py

Lines changed: 56 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from sqlalchemy import create_engine
1313

1414
from logger import logger
15-
from perfrunner import celerylocal, celeryremote
1615
from perfrunner.helpers import local
1716
from perfrunner.helpers.config_files import CAOWorkerFile
1817
from perfrunner.helpers.remote import RemoteHelper
@@ -67,6 +66,37 @@
6766
except Exception as ex:
6867
print(ex)
6968

69+
70+
COMMON_CELERY_CONFIG = dict(
71+
task_serializer="pickle",
72+
result_serializer="pickle",
73+
accept_content=["pickle", "json", "application/json", "application/data", "application/text"],
74+
task_protocol=2,
75+
)
76+
77+
LOCAL_BROKER_DB = "perfrunner.db"
78+
LOCAL_RESULTS_DB = "results.db"
79+
80+
LOCAL_CELERY_CONFIG = dict(
81+
**COMMON_CELERY_CONFIG,
82+
broker_url=f"sqla+sqlite:///{LOCAL_BROKER_DB}",
83+
result_backend="database",
84+
database_url=f"sqlite:///{LOCAL_RESULTS_DB}",
85+
)
86+
87+
REMOTE_CELERY_CONFIG = dict(
88+
**COMMON_CELERY_CONFIG,
89+
broker_pool_limit=None,
90+
worker_hijack_root_logger=False,
91+
result_backend="rpc://",
92+
result_persistent=False,
93+
result_exchange="perf_results",
94+
broker_connection_retry=True,
95+
)
96+
97+
ON_PREM_BROKER_URL = "amqp://couchbase:couchbase@172.23.96.202:5672/broker"
98+
99+
70100
celery = Celery('workers')
71101

72102
try:
@@ -77,59 +107,20 @@
77107
except Exception as ex:
78108
print(ex)
79109

80-
if 'env/bin/perfrunner' in sys.argv:
81-
if '--remote' in sys.argv:
82-
# -C flag is a hack to distinguish local and remote workers!
83-
celery.config_from_object(celeryremote)
84-
else:
85-
celery.config_from_object(celerylocal)
86-
elif 'env/bin/nosetests' in sys.argv:
87-
pass
88-
else:
89-
worker_type = os.getenv('WORKER_TYPE')
90-
broker_url = os.getenv('BROKER_URL')
91-
if worker_type == 'local':
110+
if "env/bin/perfrunner" not in sys.argv and "env/bin/nostests" not in sys.argv:
111+
# configure workers that are started using `env/bin/celery worker`
112+
worker_type = os.getenv("WORKER_TYPE")
113+
if worker_type == "local":
114+
celery.conf.update(LOCAL_CELERY_CONFIG)
115+
elif worker_type == "remote":
92116
celery.conf.update(
93-
broker_url='sqla+sqlite:///perfrunner.db',
94-
result_backend='database',
95-
database_url='sqlite:///results.db',
96-
task_serializer='pickle',
97-
result_serializer='pickle',
98-
accept_content={'pickle',
99-
'json',
100-
'application/json',
101-
'application/data',
102-
'application/text'},
103-
task_protocol=2)
104-
elif worker_type == 'remote':
105-
celery.conf.update(
106-
broker_url=broker_url,
107-
broker_pool_limit=None,
108-
worker_hijack_root_logger=False,
109-
result_backend="rpc://",
110-
result_persistent=False,
111-
result_exchange="perf_results",
112-
accept_content=['pickle',
113-
'json',
114-
'application/json',
115-
'application/data',
116-
'application/text'],
117-
result_serializer='pickle',
118-
task_serializer='pickle',
119-
task_protocol=2,
117+
REMOTE_CELERY_CONFIG,
118+
broker_url=os.getenv("BROKER_URL"),
120119
broker_connection_timeout=30,
121-
broker_connection_retry=True,
122-
broker_connection_max_retries=10)
120+
broker_connection_max_retries=10,
121+
)
123122
else:
124-
raise Exception('invalid worker type: {}'.format(worker_type))
125-
126-
try:
127-
registry.enable('json')
128-
registry.enable('application/json')
129-
registry.enable('application/data')
130-
registry.enable('application/text')
131-
except Exception as ex:
132-
print(ex)
123+
raise Exception(f"Invalid worker type: {worker_type}")
133124

134125

135126
@celery.task
@@ -365,10 +356,11 @@ def __init__(self, cluster_spec: ClusterSpec, test_config: TestConfig,
365356
verbose: bool):
366357
self.cluster_spec = cluster_spec
367358
self.test_config = test_config
368-
self.broker_url = 'amqp://couchbase:couchbase@172.23.96.202:5672/broker'
369359
self.remote = RemoteHelper(
370360
cluster_spec, verbose, external_client=self.cluster_spec.external_client
371361
)
362+
363+
self.broker_url = ON_PREM_BROKER_URL
372364
if self.cluster_spec.cloud_infrastructure:
373365
if (
374366
self.cluster_spec.kubernetes_infrastructure
@@ -380,27 +372,17 @@ def __init__(self, cluster_spec: ClusterSpec, test_config: TestConfig,
380372
worker_config.update_worker_spec()
381373
self.worker_path = worker_config.dest_file
382374
else:
383-
self.broker_url = "amqp://couchbase:couchbase@{}:5672/broker".format(
384-
self.cluster_spec.utilities[0]
375+
self.broker_url = (
376+
f"amqp://couchbase:couchbase@{self.cluster_spec.utilities[0]}:5672/broker"
385377
)
378+
386379
celery.conf.update(
380+
REMOTE_CELERY_CONFIG,
387381
broker_url=self.broker_url,
388-
broker_pool_limit=None,
389-
worker_hijack_root_logger=False,
390-
result_backend="rpc://",
391-
result_persistent=False,
392-
result_exchange="perf_results",
393-
accept_content=['pickle',
394-
'json',
395-
'application/json',
396-
'application/data',
397-
'application/text'],
398-
result_serializer='pickle',
399-
task_serializer='pickle',
400-
task_protocol=2,
401382
broker_connection_timeout=1500,
402-
broker_connection_retry=True,
403-
broker_connection_max_retries=100)
383+
broker_connection_max_retries=100,
384+
)
385+
404386
self.workers = cycle(self.cluster_spec.workers)
405387
self.terminate()
406388
self.start()
@@ -617,12 +599,15 @@ def run_sg_bp_tasks(self,
617599

618600
class LocalWorkerManager(RemoteWorkerManager):
619601

620-
BROKER_DB = 'perfrunner.db'
621-
RESULTS_DB = 'results.db'
602+
BROKER_DB = LOCAL_BROKER_DB
603+
RESULTS_DB = LOCAL_RESULTS_DB
622604

623605
def __init__(self, cluster_spec: ClusterSpec, test_config: TestConfig, verbose: bool):
624606
self.cluster_spec = cluster_spec
625607
self.test_config = test_config
608+
609+
celery.conf.update(LOCAL_CELERY_CONFIG)
610+
626611
self.workers = cycle(['localhost'])
627612
self.terminate()
628613
self.tune_sqlite()
@@ -723,4 +708,3 @@ def run_sg_tasks(self,
723708
)
724709
self.fg_async_results.append(async_result)
725710
time.sleep(15)
726-

0 commit comments

Comments
 (0)