|
1 | 1 | # This Source Code Form is subject to the terms of the Mozilla Public
|
2 | 2 | # License, v. 2.0. If a copy of the MPL was not distributed with this
|
3 | 3 | # file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
| 4 | +import os |
4 | 5 | import logging
|
5 | 6 |
|
6 |
| -import flask |
7 | 7 | from celery import Celery
|
8 | 8 | from celery.signals import (
|
9 | 9 | after_task_publish,
|
|
15 | 15 | )
|
16 | 16 | from datadog import statsd
|
17 | 17 |
|
18 |
| -from lando.api.legacy.systems import Subsystem |
19 |
| - |
20 | 18 | logger = logging.getLogger(__name__)
|
21 | 19 |
|
22 | 20 |
|
23 |
| -class FlaskCelery(Celery): |
24 |
| - """Celery which executes task in a flask app context.""" |
25 |
| - |
26 |
| - def __init__(self, *args, **kwargs): |
27 |
| - # Avoid passing the flask app to base Celery. |
28 |
| - flask_app = kwargs.pop("app", None) |
29 |
| - |
30 |
| - super().__init__(*args, **kwargs) |
31 |
| - |
32 |
| - # Important to run this after __init__ since task_cls |
33 |
| - # argument to base Celery can change what we're basing on. |
34 |
| - self._flask_override_task_class() |
35 |
| - |
36 |
| - if flask_app is not None: |
37 |
| - self.init_app(flask_app) |
38 |
| - |
39 |
| - @property |
40 |
| - def dispatch_disabled(self): |
41 |
| - """Will the Celery job system dispatch tasks to the workers?""" |
42 |
| - return bool(self.app.config.get("DISABLE_CELERY")) |
43 |
| - |
44 |
| - def init_app(self, app, config=None): |
45 |
| - """Initialize with a flask app.""" |
46 |
| - self.app = app |
47 |
| - |
48 |
| - config = config or {} |
49 |
| - self.conf.update(main=app.import_name, **config) |
50 |
| - |
51 |
| - if self.dispatch_disabled: |
52 |
| - logger.warning( |
53 |
| - "DISABLE_CELERY application configuration variable set, the Celery job " |
54 |
| - "system has been disabled! Any features that depend on the job system " |
55 |
| - "will not function." |
56 |
| - ) |
57 |
| - |
58 |
| - def _flask_override_task_class(self): |
59 |
| - """Change Task class to one which executes in a flask context.""" |
60 |
| - # Define a Task subclass that saves a reference to self in the Task object so |
61 |
| - # the task object can find self.app (the Flask application object) even if |
62 |
| - # self.app hasn't been set yet. |
63 |
| - # |
64 |
| - # We need to delay all of the task's calls to self.app using a custom Task class |
65 |
| - # because the reference to self.app may not be valid at the time the Celery |
66 |
| - # application object creates it set of Task objects. The programmer may |
67 |
| - # set self.app via the self.init_app() method at any time in the future. |
68 |
| - # |
69 |
| - # self.app is expected to be valid and usable by Task objects after the web |
70 |
| - # application is fully initialized and ready to serve requests. |
71 |
| - BaseTask = self.Task |
72 |
| - celery_self = self |
73 |
| - |
74 |
| - class FlaskTask(BaseTask): |
75 |
| - """A Celery Task subclass that has a reference to a Flask app.""" |
| 21 | +# Set the default Django settings module for the 'celery' program. |
| 22 | +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lando.settings') |
76 | 23 |
|
77 |
| - def __call__(self, *args, **kwargs): |
78 |
| - # Override immediate calling of tasks, such as mytask(). This call |
79 |
| - # method is used by the Celery worker process. |
80 |
| - if flask.has_app_context(): |
81 |
| - return super().__call__(*args, **kwargs) |
| 24 | +app = Celery('lando') |
82 | 25 |
|
83 |
| - with celery_self.app.app_context(): |
84 |
| - return super().__call__(*args, **kwargs) |
| 26 | +# Using a string here means the worker doesn't have to serialize |
| 27 | +# the configuration object to child processes. |
| 28 | +# - namespace='CELERY' means all celery-related configuration keys |
| 29 | +# should have a `CELERY_` prefix. |
| 30 | +app.config_from_object('django.conf:settings', namespace='CELERY') |
85 | 31 |
|
86 |
| - def apply_async(self, *args, **kwargs): |
87 |
| - # Override delayed calling of tasks, such as mytask.apply_async(). |
88 |
| - # This call method is used by the Celery app when it wants to |
89 |
| - # schedule a job for eventual execution on a worker. |
90 |
| - if celery_self.dispatch_disabled: |
91 |
| - return None |
92 |
| - else: |
93 |
| - return super().apply_async(*args, **kwargs) |
| 32 | +# Load task modules from all registered Django apps. |
| 33 | +app.autodiscover_tasks() |
94 | 34 |
|
95 |
| - self.Task = FlaskTask |
96 |
| - |
97 |
| - |
98 |
| -celery = FlaskCelery() |
| 35 | +celery = app |
99 | 36 |
|
100 | 37 |
|
101 | 38 | @after_task_publish.connect
|
@@ -129,29 +66,3 @@ def count_task_retried(**kwargs):
|
129 | 66 | def setup_celery_logging(**kwargs):
|
130 | 67 | # Prevent celery from overriding our logging configuration.
|
131 | 68 | pass
|
132 |
| - |
133 |
| - |
134 |
| -class CelerySubsystem(Subsystem): |
135 |
| - name = "celery" |
136 |
| - |
137 |
| - def init_app(self, app): |
138 |
| - super().init_app(app) |
139 |
| - |
140 |
| - # Import tasks to discover celery tasks. |
141 |
| - import landoapi.tasks # noqa |
142 |
| - |
143 |
| - celery.init_app( |
144 |
| - self.flask_app, |
145 |
| - config={"broker_url": settings.CELERY_BROKER_URL}, |
146 |
| - ) |
147 |
| - celery.log.setup() |
148 |
| - |
149 |
| - def ready(self): |
150 |
| - if settings.DISABLE_CELERY: |
151 |
| - return True |
152 |
| - |
153 |
| - # TODO: Check connection to CELERY_BROKER_URL |
154 |
| - return True |
155 |
| - |
156 |
| - |
157 |
| -celery_subsystem = CelerySubsystem() |
0 commit comments