Skip to content

Commit 230cd66

Browse files
Add management command to prune finished tasks (#60)
Fixes #16
1 parent b0fb242 commit 230cd66

File tree

4 files changed

+357
-0
lines changed

4 files changed

+357
-0
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ Finally, you can run the `db_worker` command to run tasks as they're created. Ch
157157
./manage.py db_worker
158158
```
159159

160+
### Pruning old tasks
161+
162+
After a while, tasks may start to build up in your database. This can be managed using the `prune_db_task_results` management command, which deletes completed and failed tasks according to the given retention policy. Check the `--help` for the available options.
163+
160164
### Retrieving task result
161165

162166
When enqueueing a task, you get a `TaskResult`, however it may be useful to retrieve said result from somewhere else (another request, another task etc). This can be done with `get_result` (or `aget_result`):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import logging
2+
from argparse import ArgumentParser, ArgumentTypeError
3+
from datetime import timedelta
4+
from typing import Optional
5+
6+
from django.core.management.base import BaseCommand
7+
from django.db.models import Q
8+
from django.utils import timezone
9+
10+
from django_tasks import DEFAULT_QUEUE_NAME, DEFAULT_TASK_BACKEND_ALIAS, tasks
11+
from django_tasks.backends.database.backend import DatabaseBackend
12+
from django_tasks.backends.database.models import DBTaskResult
13+
from django_tasks.exceptions import InvalidTaskBackendError
14+
from django_tasks.task import ResultStatus
15+
16+
logger = logging.getLogger("django_tasks.backends.database.prune_db_task_results")
17+
18+
19+
def valid_backend_name(val: str) -> DatabaseBackend:
20+
try:
21+
backend = tasks[val]
22+
except InvalidTaskBackendError as e:
23+
raise ArgumentTypeError(e.args[0]) from e
24+
if not isinstance(backend, DatabaseBackend):
25+
raise ArgumentTypeError(f"Backend '{val}' is not a database backend")
26+
return backend
27+
28+
29+
def valid_positive_int(val: str) -> int:
30+
num = int(val)
31+
if num < 0:
32+
raise ArgumentTypeError("Must be greater than zero")
33+
return num
34+
35+
36+
class Command(BaseCommand):
37+
help = "Prune finished database task results"
38+
39+
def add_arguments(self, parser: ArgumentParser) -> None:
40+
parser.add_argument(
41+
"--backend",
42+
nargs="?",
43+
default=DEFAULT_TASK_BACKEND_ALIAS,
44+
type=valid_backend_name,
45+
dest="backend",
46+
help="The backend to operate on (default: %(default)r)",
47+
)
48+
parser.add_argument(
49+
"--queue-name",
50+
nargs="?",
51+
default=DEFAULT_QUEUE_NAME,
52+
type=str,
53+
help="The queues to process. Separate multiple with a comma. To process all queues, use '*' (default: %(default)r)",
54+
)
55+
parser.add_argument(
56+
"--min-age-days",
57+
nargs="?",
58+
default=14,
59+
type=valid_positive_int,
60+
help="The minimum age (in days) of a finished task result to be pruned (default: %(default)r)",
61+
)
62+
parser.add_argument(
63+
"--failed-min-age-days",
64+
nargs="?",
65+
default=None,
66+
type=valid_positive_int,
67+
help="The minimum age (in days) of a failed task result to be pruned (default: min-age-days)",
68+
)
69+
parser.add_argument(
70+
"--dry-run",
71+
action="store_true",
72+
help="Don't delete the task results, just show how many would be deleted",
73+
)
74+
75+
def configure_logging(self, verbosity: int) -> None:
76+
if verbosity == 0:
77+
logger.setLevel(logging.WARNING)
78+
elif verbosity == 1:
79+
logger.setLevel(logging.INFO)
80+
else:
81+
logger.setLevel(logging.DEBUG)
82+
83+
# If no handler is configured, the logs won't show,
84+
# regardless of the set level.
85+
if not logger.hasHandlers():
86+
logger.addHandler(logging.StreamHandler(self.stdout))
87+
88+
def handle(
89+
self,
90+
*,
91+
verbosity: int,
92+
backend: DatabaseBackend,
93+
min_age_days: int,
94+
failed_min_age_days: Optional[int],
95+
queue_name: str,
96+
dry_run: bool,
97+
**options: dict,
98+
) -> None:
99+
self.configure_logging(verbosity)
100+
101+
min_age = timezone.now() - timedelta(days=min_age_days)
102+
failed_min_age = (
103+
(timezone.now() - timedelta(days=failed_min_age_days))
104+
if failed_min_age_days
105+
else None
106+
)
107+
108+
results = DBTaskResult.objects.finished().filter(backend_name=backend.alias)
109+
110+
queue_names = queue_name.split(",")
111+
if "*" not in queue_names:
112+
results = results.filter(queue_name__in=queue_names)
113+
114+
if failed_min_age is None:
115+
results = results.filter(finished_at__lte=min_age)
116+
else:
117+
results = results.filter(
118+
Q(status=ResultStatus.COMPLETE, finished_at__lte=min_age)
119+
| Q(status=ResultStatus.FAILED, finished_at__lte=failed_min_age)
120+
)
121+
122+
if dry_run:
123+
logger.info("Would delete %d task result(s)", results.count())
124+
else:
125+
deleted, _ = results.delete()
126+
logger.info("Deleted %d task result(s)", deleted)

django_tasks/backends/database/models.py

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ def complete(self) -> "DBTaskResultQuerySet":
6060
def failed(self) -> "DBTaskResultQuerySet":
6161
return self.filter(status=ResultStatus.FAILED)
6262

63+
def running(self) -> "DBTaskResultQuerySet":
64+
return self.filter(status=ResultStatus.RUNNING)
65+
66+
def finished(self) -> "DBTaskResultQuerySet":
67+
return self.failed() | self.complete()
68+
6369
@retry()
6470
def get_locked(self) -> Optional["DBTaskResult"]:
6571
"""

tests/tests/test_database_backend.py

+221
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
from django_tasks.backends.database.management.commands.db_worker import (
2323
logger as db_worker_logger,
2424
)
25+
from django_tasks.backends.database.management.commands.prune_db_task_results import (
26+
logger as prune_db_tasks_logger,
27+
)
2528
from django_tasks.backends.database.models import DBTaskResult
2629
from django_tasks.backends.database.utils import (
2730
connection_requires_manual_exclusive_transaction,
@@ -921,3 +924,221 @@ def test_explicit_transaction(self) -> None:
921924
self.assertFalse(
922925
connection_requires_manual_exclusive_transaction(self.connection)
923926
)
927+
928+
929+
@override_settings(
930+
TASKS={
931+
"default": {
932+
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
933+
"QUEUES": ["default", "queue-1"],
934+
},
935+
"dummy": {"BACKEND": "django_tasks.backends.dummy.DummyBackend"},
936+
}
937+
)
938+
class DatabaseBackendPruneTaskResultsTestCase(TransactionTestCase):
939+
prune_task_results = partial(call_command, "prune_db_task_results", verbosity=0)
940+
941+
def tearDown(self) -> None:
942+
# Reset the logger after every run, to ensure the correct `stdout` is used
943+
for handler in prune_db_tasks_logger.handlers:
944+
prune_db_tasks_logger.removeHandler(handler)
945+
946+
def test_prunes_tasks(self) -> None:
947+
result = test_tasks.noop_task.enqueue()
948+
949+
DBTaskResult.objects.all().update(
950+
status=ResultStatus.COMPLETE, finished_at=timezone.now()
951+
)
952+
953+
self.assertEqual(DBTaskResult.objects.finished().count(), 1)
954+
955+
stdout = StringIO()
956+
957+
with self.assertNumQueries(3):
958+
self.prune_task_results(min_age_days=0, stdout=stdout, verbosity=3)
959+
960+
self.assertEqual(DBTaskResult.objects.finished().count(), 0)
961+
962+
with self.assertRaises(ResultDoesNotExist):
963+
result.refresh()
964+
965+
self.assertEqual(stdout.getvalue().strip(), "Deleted 1 task result(s)")
966+
967+
def test_doesnt_prune_new_tasks(self) -> None:
968+
result = test_tasks.noop_task.enqueue()
969+
970+
self.assertEqual(DBTaskResult.objects.ready().count(), 1)
971+
972+
stdout = StringIO()
973+
with self.assertNumQueries(3):
974+
self.prune_task_results(min_age_days=0, stdout=stdout, verbosity=3)
975+
976+
self.assertEqual(DBTaskResult.objects.ready().count(), 1)
977+
978+
result.refresh()
979+
980+
self.assertEqual(stdout.getvalue().strip(), "Deleted 0 task result(s)")
981+
982+
def test_doesnt_prune_running_tasks(self) -> None:
983+
result = test_tasks.noop_task.enqueue()
984+
985+
DBTaskResult.objects.all().update(status=ResultStatus.RUNNING)
986+
987+
self.assertEqual(DBTaskResult.objects.running().count(), 1)
988+
989+
with self.assertNumQueries(3):
990+
self.prune_task_results(min_age_days=0)
991+
992+
self.assertEqual(DBTaskResult.objects.running().count(), 1)
993+
994+
result.refresh()
995+
996+
def test_only_prunes_specified_queue(self) -> None:
997+
result = test_tasks.noop_task.enqueue()
998+
queue_1_result = test_tasks.noop_task.using(queue_name="queue-1").enqueue()
999+
1000+
DBTaskResult.objects.all().update(
1001+
status=ResultStatus.COMPLETE, finished_at=timezone.now()
1002+
)
1003+
1004+
self.assertEqual(DBTaskResult.objects.complete().count(), 2)
1005+
1006+
with self.assertNumQueries(3):
1007+
self.prune_task_results(queue_name="queue-1", min_age_days=0)
1008+
1009+
self.assertEqual(DBTaskResult.objects.complete().count(), 1)
1010+
1011+
result.refresh()
1012+
1013+
with self.assertRaises(ResultDoesNotExist):
1014+
queue_1_result.refresh()
1015+
1016+
def test_prune_all_queues(self) -> None:
1017+
test_tasks.noop_task.enqueue()
1018+
test_tasks.noop_task.using(queue_name="queue-1").enqueue()
1019+
1020+
DBTaskResult.objects.all().update(
1021+
status=ResultStatus.COMPLETE, finished_at=timezone.now()
1022+
)
1023+
1024+
self.assertEqual(DBTaskResult.objects.complete().count(), 2)
1025+
1026+
with self.assertNumQueries(3):
1027+
self.prune_task_results(queue_name="*", min_age_days=0)
1028+
1029+
self.assertEqual(DBTaskResult.objects.complete().count(), 0)
1030+
1031+
def test_min_age(self) -> None:
1032+
one_day_result = test_tasks.noop_task.enqueue()
1033+
1034+
DBTaskResult.objects.ready().update(
1035+
status=ResultStatus.COMPLETE, finished_at=timezone.now() - timedelta(days=1)
1036+
)
1037+
1038+
three_day_result = test_tasks.noop_task.enqueue()
1039+
DBTaskResult.objects.ready().update(
1040+
status=ResultStatus.COMPLETE, finished_at=timezone.now() - timedelta(days=3)
1041+
)
1042+
1043+
self.assertEqual(DBTaskResult.objects.complete().count(), 2)
1044+
1045+
with self.assertNumQueries(3):
1046+
self.prune_task_results()
1047+
1048+
self.assertEqual(DBTaskResult.objects.complete().count(), 2)
1049+
1050+
with self.assertNumQueries(3):
1051+
self.prune_task_results(min_age_days=3)
1052+
1053+
self.assertEqual(DBTaskResult.objects.complete().count(), 1)
1054+
1055+
one_day_result.refresh()
1056+
1057+
with self.assertRaises(ResultDoesNotExist):
1058+
three_day_result.refresh()
1059+
1060+
with self.assertNumQueries(3):
1061+
self.prune_task_results(min_age_days=1)
1062+
1063+
self.assertEqual(DBTaskResult.objects.complete().count(), 0)
1064+
1065+
def test_failed_min_age(self) -> None:
1066+
completed_result = test_tasks.noop_task.enqueue()
1067+
1068+
DBTaskResult.objects.ready().update(
1069+
status=ResultStatus.COMPLETE, finished_at=timezone.now() - timedelta(days=3)
1070+
)
1071+
1072+
failed_result = test_tasks.noop_task.enqueue()
1073+
DBTaskResult.objects.ready().update(
1074+
status=ResultStatus.FAILED, finished_at=timezone.now() - timedelta(days=3)
1075+
)
1076+
1077+
self.assertEqual(DBTaskResult.objects.finished().count(), 2)
1078+
1079+
with self.assertNumQueries(3):
1080+
self.prune_task_results()
1081+
1082+
self.assertEqual(DBTaskResult.objects.finished().count(), 2)
1083+
1084+
with self.assertNumQueries(3):
1085+
self.prune_task_results(min_age_days=3, failed_min_age_days=5)
1086+
1087+
self.assertEqual(DBTaskResult.objects.finished().count(), 1)
1088+
1089+
failed_result.refresh()
1090+
1091+
with self.assertRaises(ResultDoesNotExist):
1092+
completed_result.refresh()
1093+
1094+
with self.assertNumQueries(3):
1095+
self.prune_task_results(min_age_days=3, failed_min_age_days=1)
1096+
1097+
with self.assertRaises(ResultDoesNotExist):
1098+
failed_result.refresh()
1099+
1100+
def test_dry_run(self) -> None:
1101+
test_tasks.noop_task.enqueue()
1102+
1103+
DBTaskResult.objects.all().update(
1104+
status=ResultStatus.COMPLETE, finished_at=timezone.now()
1105+
)
1106+
1107+
self.assertEqual(DBTaskResult.objects.count(), 1)
1108+
1109+
stdout = StringIO()
1110+
with self.assertNumQueries(1):
1111+
self.prune_task_results(
1112+
min_age_days=0, dry_run=True, stdout=stdout, verbosity=3
1113+
)
1114+
1115+
self.assertEqual(DBTaskResult.objects.count(), 1)
1116+
1117+
self.assertEqual(stdout.getvalue().strip(), "Would delete 1 task result(s)")
1118+
1119+
def test_unknown_backend(self) -> None:
1120+
output = StringIO()
1121+
with redirect_stderr(output):
1122+
with self.assertRaises(SystemExit):
1123+
execute_from_command_line(
1124+
["django-admin", "prune_db_task_results", "--backend", "unknown"]
1125+
)
1126+
self.assertIn("The connection 'unknown' doesn't exist.", output.getvalue())
1127+
1128+
def test_incorrect_backend(self) -> None:
1129+
output = StringIO()
1130+
with redirect_stderr(output):
1131+
with self.assertRaises(SystemExit):
1132+
execute_from_command_line(
1133+
["django-admin", "prune_db_task_results", "--backend", "dummy"]
1134+
)
1135+
self.assertIn("Backend 'dummy' is not a database backend", output.getvalue())
1136+
1137+
def test_negative_age(self) -> None:
1138+
output = StringIO()
1139+
with redirect_stderr(output):
1140+
with self.assertRaises(SystemExit):
1141+
execute_from_command_line(
1142+
["django-admin", "prune_db_task_results", "--min-age-days", "-1"]
1143+
)
1144+
self.assertIn("Must be greater than zero", output.getvalue())

0 commit comments

Comments
 (0)