Skip to content

Commit 6eb4791

Browse files
committed
View to multiple queues
Signed-off-by: kiblik <5609770+kiblik@users.noreply.github.com>
1 parent daefd70 commit 6eb4791

File tree

9 files changed

+68
-44
lines changed

9 files changed

+68
-44
lines changed

dojo/celery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __call__(self, *args, **kwargs):
4545

4646

4747
@app.task(bind=True)
48-
def debug_task(self, priority=0):
48+
def debug_task(self, priority=1):
4949
logger.info(f"Request: {self.request!r}")
5050

5151

dojo/jira_link/helper.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@ def push_to_jira(obj, *args, **kwargs):
777777

778778
# we need thre separate celery tasks due to the decorators we're using to map to/from ids
779779
@dojo_async_task
780-
@app.task(priority=3)
780+
@app.task(priority=4)
781781
def push_finding_to_jira(finding_id, *args, **kwargs):
782782
finding = get_object_or_none(Finding, id=finding_id)
783783
if not finding:
@@ -790,7 +790,7 @@ def push_finding_to_jira(finding_id, *args, **kwargs):
790790

791791

792792
@dojo_async_task
793-
@app.task(priority=3)
793+
@app.task(priority=4)
794794
def push_finding_group_to_jira(finding_group_id, *args, **kwargs):
795795
finding_group = get_object_or_none(Finding_Group, id=finding_group_id)
796796
if not finding_group:
@@ -807,7 +807,7 @@ def push_finding_group_to_jira(finding_group_id, *args, **kwargs):
807807

808808

809809
@dojo_async_task
810-
@app.task(priority=3)
810+
@app.task(priority=4)
811811
def push_engagement_to_jira(engagement_id, *args, **kwargs):
812812
engagement = get_object_or_none(Engagement, id=engagement_id)
813813
if not engagement:
@@ -1397,7 +1397,7 @@ def jira_check_attachment(issue, source_file_name):
13971397

13981398

13991399
@dojo_async_task
1400-
@app.task(priority=3)
1400+
@app.task(priority=4)
14011401
def close_epic(engagement_id, push_to_jira, **kwargs):
14021402
engagement = get_object_or_none(Engagement, id=engagement_id)
14031403
if not engagement:
@@ -1446,7 +1446,7 @@ def close_epic(engagement_id, push_to_jira, **kwargs):
14461446

14471447

14481448
@dojo_async_task
1449-
@app.task(priority=3)
1449+
@app.task(priority=4)
14501450
def update_epic(engagement_id, **kwargs):
14511451
engagement = get_object_or_none(Engagement, id=engagement_id)
14521452
if not engagement:
@@ -1493,7 +1493,7 @@ def update_epic(engagement_id, **kwargs):
14931493

14941494

14951495
@dojo_async_task
1496-
@app.task(priority=3)
1496+
@app.task(priority=4)
14971497
def add_epic(engagement_id, **kwargs):
14981498
engagement = get_object_or_none(Engagement, id=engagement_id)
14991499
if not engagement:

dojo/notifications/helper.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ class SlackNotificationManger(NotificationManagerHelpers):
200200
"""Manger for slack notifications and their helpers."""
201201

202202
@dojo_async_task
203-
@app.task(priority=3)
203+
@app.task(priority=4)
204204
def send_slack_notification(
205205
self,
206206
event: str,
@@ -318,7 +318,7 @@ class MSTeamsNotificationManger(NotificationManagerHelpers):
318318
"""Manger for Microsoft Teams notifications and their helpers."""
319319

320320
@dojo_async_task
321-
@app.task(priority=3)
321+
@app.task(priority=4)
322322
def send_msteams_notification(
323323
self,
324324
event: str,
@@ -369,7 +369,7 @@ class EmailNotificationManger(NotificationManagerHelpers):
369369
"""Manger for email notifications and their helpers."""
370370

371371
@dojo_async_task
372-
@app.task(priority=3)
372+
@app.task(priority=4)
373373
def send_mail_notification(
374374
self,
375375
event: str,
@@ -421,7 +421,7 @@ class WebhookNotificationManger(NotificationManagerHelpers):
421421
ERROR_TEMPORARY = "temporary"
422422

423423
@dojo_async_task
424-
@app.task(priority=3)
424+
@app.task(priority=4)
425425
def send_webhooks_notification(
426426
self,
427427
event: str,
@@ -559,7 +559,7 @@ def _test_webhooks_notification(self, endpoint: Notification_Webhooks) -> None:
559559
# in "send_webhooks_notification", we are doing deeper analysis, why it failed
560560
# for now, "raise_for_status" should be enough
561561

562-
@app.task(ignore_result=True, priority=1)
562+
@app.task(ignore_result=True, priority=2)
563563
def _webhook_reactivation(self, endpoint_id: int, **_kwargs: dict):
564564
endpoint = Notification_Webhooks.objects.get(pk=endpoint_id)
565565
# User already changed status of endpoint
@@ -875,7 +875,7 @@ def _process_notifications(
875875
)
876876

877877

878-
@app.task(ignore_result=True, priority=1)
878+
@app.task(ignore_result=True, priority=2)
879879
def webhook_status_cleanup(*_args: list, **_kwargs: dict):
880880
# If some endpoint was affected by some outage (5xx, 429, Timeout) but it was clean during last 24 hours,
881881
# we consider this endpoint as healthy so need to reset it

dojo/risk_acceptance/helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def add_findings_to_risk_acceptance(user: Dojo_User, risk_acceptance: Risk_Accep
167167
post_jira_comments(risk_acceptance, findings, accepted_message_creator)
168168

169169

170-
@app.task(priority=3)
170+
@app.task(priority=4)
171171
def expiration_handler(*args, **kwargs):
172172
"""
173173
Creates a notification upon risk expiration and X days beforehand if configured.

dojo/settings/settings.dist.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import environ
1818
import pghistory
1919
from celery.schedules import crontab
20-
from kombu import Queue
2120
from netaddr import IPNetwork, IPSet
2221

2322
from dojo import __version__
@@ -1255,28 +1254,27 @@ def saml2_attrib_map_format(din):
12551254
else:
12561255
CELERY_BROKER_TRANSPORT_OPTIONS = {}
12571256

1257+
# There are 5 queues. Lower number = higher priority. Plus one legacy (for backward compatibility)
1258+
CELERY_QUEUE_TYPES = {
1259+
"celery": "legacy",
1260+
"celery:1": "live checks",
1261+
"celery:2": "keep system consistent (e.g. activation/deactivation webhooks)",
1262+
"celery:3": "regular tasks",
1263+
"celery:4": "notifications and jira",
1264+
"celery:5": "garbidge collectors",
1265+
}
1266+
12581267
if not CELERY_BROKER_URL.startswith("sqla+sqlite"): # Priority queues are not supported by sqlite based engine
12591268
if "queue_order_strategy" not in CELERY_BROKER_TRANSPORT_OPTIONS:
12601269
CELERY_BROKER_TRANSPORT_OPTIONS["queue_order_strategy"] = "priority"
12611270

12621271
if "priority_steps" not in CELERY_BROKER_TRANSPORT_OPTIONS:
1263-
# There are 4 queues. Lower number = higher priority
1264-
# 0 - live checks
1265-
# 1 - keep system consistent (e.g. activation/deavtivation webhooks)
1266-
# 2 - regular tasks
1267-
# 3 - notifications and jira
1268-
# 4 - garbidge collectors
1269-
CELERY_BROKER_TRANSPORT_OPTIONS["priority_steps"] = list(range(5))
1270-
CELERY_TASK_DEFAULT_PRIORITY = 2
1272+
CELERY_BROKER_TRANSPORT_OPTIONS["priority_steps"] = list(range(len(CELERY_QUEUE_TYPES)))
1273+
CELERY_TASK_DEFAULT_PRIORITY = 3
12711274

12721275
if "sep" not in CELERY_BROKER_TRANSPORT_OPTIONS:
12731276
CELERY_BROKER_TRANSPORT_OPTIONS["sep"] = ":"
12741277

1275-
# TODO: needs to be tested
1276-
CELERY_TASK_QUEUES = [
1277-
Queue("celery"), # This handles BOTH legacy AND is the base for priority queues
1278-
]
1279-
12801278

12811279
CELERY_IMPORTS = ("dojo.tools.tool_issue_updater", )
12821280

dojo/system_settings/views.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,11 @@ def get_celery_status(
113113

114114
q_len = get_celery_queue_length()
115115
if q_len is None:
116-
context["celery_q_len"] = " It is not possible to identify number of waiting tasks."
117-
elif q_len:
118-
context["celery_q_len"] = f"{q_len} tasks are waiting to be proccessed."
116+
context["celery_q_len"] = "It is not possible to identify number of waiting tasks."
117+
elif len(q_len):
118+
context["celery_q_len"] = f"{sum(q_len.values())} tasks are waiting to be proccessed."
119+
context["celery_stats"] = q_len
120+
context["celery_stats_desc"] = settings.CELERY_QUEUE_TYPES
119121
else:
120122
context["celery_q_len"] = "No task is waiting to be proccessed."
121123

dojo/tasks.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def log_generic_alert(source, title, description):
3030
icon="bullseye", source=source)
3131

3232

33-
@app.task(bind=True, priority=3)
33+
@app.task(bind=True, priority=4)
3434
def add_alerts(self, runinterval):
3535
now = timezone.now()
3636

@@ -76,7 +76,7 @@ def add_alerts(self, runinterval):
7676
calculate_grade(product.id)
7777

7878

79-
@app.task(bind=True, priority=4)
79+
@app.task(bind=True, priority=5)
8080
def cleanup_alerts(*args, **kwargs):
8181
try:
8282
max_alerts_per_user = settings.MAX_ALERTS_PER_USER
@@ -94,12 +94,12 @@ def cleanup_alerts(*args, **kwargs):
9494
logger.info("total number of alerts deleted: %s", total_deleted_count)
9595

9696

97-
@app.task(bind=True, priority=4)
97+
@app.task(bind=True, priority=5)
9898
def flush_auditlog(*args, **kwargs):
9999
run_flush_auditlog()
100100

101101

102-
@app.task(bind=True, priority=4)
102+
@app.task(bind=True, priority=5)
103103
def async_dupe_delete(*args, **kwargs):
104104
# Wrap with pghistory context for audit trail
105105
with pghistory.context(source="dupe_delete_task"):
@@ -173,7 +173,7 @@ def _async_dupe_delete_impl():
173173
calculate_grade(product.id)
174174

175175

176-
@app.task(ignore_result=False, priority=0)
176+
@app.task(ignore_result=False, priority=1, expires=10) # It is expected to respond in 5 seconds. If it is in a queue more than 10, it is not necessary to respond anymore.
177177
def celery_status():
178178
return True
179179

@@ -237,13 +237,13 @@ def evaluate_pro_proposition(*args, **kwargs):
237237
announcement.save()
238238

239239

240-
@app.task(priority=4)
240+
@app.task(priority=5)
241241
def clear_sessions(*args, **kwargs):
242242
call_command("clearsessions")
243243

244244

245245
@dojo_async_task
246-
@app.task(priority=4)
246+
@app.task(priority=5)
247247
def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
248248
"""
249249
Async task to update watson search indexes for a specific model type.

dojo/templates/dojo/system_settings.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@ <h4> Celery <span class="label label-danger">{{celery_status}}</span> </h4>
2828
</div>
2929
<div class="panel-body text-left">
3030
{{celery_q_len}}
31+
{% if celery_stats %}
32+
<table id="general" class="tablesorter-bootstrap table table-condensed table-striped">
33+
<thead>
34+
<tr>
35+
<th>Name of the Quene</th>
36+
<th>Number of Tasks</th>
37+
<th>Description of the Queue</th>
38+
</tr>
39+
</thead>
40+
{% for key, value in celery_stats.items %}
41+
<tr>
42+
<td>{{ key | first }}</td>
43+
<td>{{ value }}</td>
44+
<td>{{ key | last }}</td>
45+
</tr>
46+
{% endfor %}
47+
</table>
48+
{% endif %}
3149
</div>
3250
</div>
3351
</div>

dojo/utils.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,15 +1332,21 @@ def get_celery_worker_status():
13321332

13331333

13341334
def get_celery_queue_length():
1335+
queue_lens = {}
13351336
try:
1336-
with Connection(settings.CELERY_BROKER_URL) as conn, conn.SimpleQueue("celery") as queue:
1337-
return queue.qsize()
1338-
except ChannelError as e:
1339-
if "NOT_FOUND" in str(e):
1340-
return 0
1341-
return None
1337+
with Connection(settings.CELERY_BROKER_URL) as conn:
1338+
for queue_name in settings.CELERY_QUEUE_TYPES:
1339+
try:
1340+
with conn.SimpleQueue(queue_name) as queue:
1341+
if size := queue.qsize():
1342+
queue_lens[queue_name, settings.CELERY_QUEUE_TYPES[queue_name]] = size
1343+
except ChannelError as e:
1344+
if "NOT_FOUND" in str(e):
1345+
continue
1346+
raise
13421347
except:
13431348
return None
1349+
return queue_lens
13441350

13451351

13461352
# Used to display the counts and enabled tabs in the product view

0 commit comments

Comments
 (0)