Skip to content

Commit b22b455

Browse files
dekkersjpbruinsslotunderdarknl
authored
Fix rescheduling (1.13) (#2141)
Co-authored-by: JP Bruins Slot <jpbruinsslot@gmail.com> Co-authored-by: Jan Klopper <janklopper+underdark@gmail.com>
1 parent eb8a96a commit b22b455

9 files changed

Lines changed: 73 additions & 40 deletions

File tree

mula/scheduler/models/queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@ class PrioritizedItem(BaseModel):
2020

2121
model_config = ConfigDict(from_attributes=True)
2222

23-
id: uuid.UUID = Field(default_factory=uuid.uuid4)
23+
id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
2424

2525
scheduler_id: Optional[str] = None
2626

2727
# A unique generated identifier for the object contained in data
2828
hash: Optional[str] = Field(None, max_length=32)
2929

30-
priority: Optional[int]
30+
priority: Optional[int] = 0
3131

32-
data: Dict
32+
data: Dict = Field(default_factory=dict)
3333

3434
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
3535

mula/scheduler/models/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class NormalizerTask(BaseModel):
109109

110110
type: ClassVar[str] = "normalizer"
111111

112-
id: uuid.UUID = Field(default_factory=lambda: uuid.uuid4())
112+
id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
113113
normalizer: Normalizer
114114
raw_data: RawData
115115

@@ -128,7 +128,7 @@ class BoefjeTask(BaseModel):
128128

129129
type: ClassVar[str] = "boefje"
130130

131-
id: uuid.UUID = Field(default_factory=lambda: uuid.uuid4())
131+
id: Optional[uuid.UUID] = Field(default_factory=uuid.uuid4)
132132
boefje: Boefje
133133
input_ooi: Optional[str]
134134
organization: str

mula/scheduler/server/server.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,15 +442,23 @@ def push_queue(self, queue_id: str, item: models.PrioritizedItem) -> Any:
442442
)
443443

444444
try:
445-
p_item = models.PrioritizedItem(**item.model_dump())
445+
# Load default values
446+
p_item = models.PrioritizedItem()
447+
448+
# Set default values
446449
if p_item.scheduler_id is None:
447450
p_item.scheduler_id = s.scheduler_id
448451

452+
p_item.priority = item.priority
453+
449454
if s.queue.item_type == models.BoefjeTask:
450-
p_item.data = models.BoefjeTask(**p_item.data).dict()
455+
p_item.data = models.BoefjeTask(**item.data).dict()
451456
elif s.queue.item_type == models.NormalizerTask:
452-
p_item.data = models.NormalizerTask(**p_item.data).dict()
457+
p_item.data = models.NormalizerTask(**item.data).dict()
458+
else:
459+
p_item.data = item.data
453460
except Exception as exc:
461+
self.logger.exception(exc)
454462
raise fastapi.HTTPException(
455463
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
456464
detail=str(exc),
@@ -475,7 +483,7 @@ def push_queue(self, queue_id: str, item: models.PrioritizedItem) -> Any:
475483
detail=str(exc_not_allowed),
476484
) from exc_not_allowed
477485

478-
return models.PrioritizedItem(**p_item.model_dump())
486+
return p_item
479487

480488
def run(self) -> None:
481489
uvicorn.run(

mula/tests/integration/test_api.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,19 @@ def test_push_queue(self):
132132

133133
item = create_p_item(self.organisation.id, 1)
134134

135+
# Remove id, scheduler should create id for us
136+
delattr(item, "id")
137+
135138
response = self.client.post(f"/queues/{self.scheduler.scheduler_id}/push", data=item.model_dump_json())
136139
self.assertEqual(response.status_code, 201)
137140
self.assertEqual(1, self.scheduler.queue.qsize())
141+
self.assertIsNotNone(response.json().get("id"))
138142

139143
def test_push_incorrect_item_type(self):
140144
response = self.client.post(
141145
f"/queues/{self.scheduler.scheduler_id}/push", json={"priority": 0, "item": "not a task"}
142146
)
143-
self.assertEqual(response.status_code, 422)
147+
self.assertEqual(response.status_code, 400)
144148

145149
def test_push_queue_full(self):
146150
# Set maxsize of the queue to 1
@@ -227,7 +231,7 @@ def test_push_replace_allowed(self):
227231
self.assertEqual(1, self.scheduler.queue.qsize())
228232

229233
# Check if the item on the queue is the replaced item
230-
self.assertEqual(response.json().get("id"), str(self.scheduler.queue.peek(0).id))
234+
self.assertEqual(response.json().get("data").get("id"), str(self.scheduler.queue.peek(0).data.get("id")))
231235

232236
def test_push_updates_not_allowed(self):
233237
# Set queue to no allow updates
@@ -282,7 +286,7 @@ def test_push_updates_allowed(self):
282286
self.assertEqual(1, self.scheduler.queue.qsize())
283287

284288
# Check if the item on the queue is the updated item
285-
self.assertEqual(response.json().get("id"), str(self.scheduler.queue.peek(0).id))
289+
self.assertEqual(response.json().get("data").get("id"), str(self.scheduler.queue.peek(0).data.get("id")))
286290
self.assertEqual(response.json().get("data").get("name"), "updated-name")
287291

288292
def test_push_priority_updates_not_allowed(self):
@@ -339,7 +343,7 @@ def test_update_priority_higher(self):
339343
self.assertEqual(1, self.scheduler.queue.qsize())
340344

341345
# Check if the item on the queue is the updated item
342-
self.assertEqual(response.json().get("id"), str(self.scheduler.queue.peek(0).id))
346+
self.assertEqual(response.json().get("data").get("id"), str(self.scheduler.queue.peek(0).data.get("id")))
343347

344348
def test_update_priority_lower(self):
345349
"""When updating the priority of the initial item on the priority queue
@@ -370,7 +374,7 @@ def test_update_priority_lower(self):
370374
self.assertEqual(1, self.scheduler.queue.qsize())
371375

372376
# Check if the item on the queue is the updated item
373-
self.assertEqual(response.json().get("id"), str(self.scheduler.queue.peek(0).id))
377+
self.assertEqual(response.json().get("data").get("id"), str(self.scheduler.queue.peek(0).data.get("id")))
374378

375379
def test_pop_queue(self):
376380
# Add one task to the queue
@@ -395,12 +399,14 @@ def test_pop_queue_filters(self):
395399
f"/queues/{self.scheduler.scheduler_id}/push",
396400
data=first_item.model_dump_json(),
397401
)
402+
first_item_id = response.json().get("id")
398403
self.assertEqual(response.status_code, 201)
399404
self.assertEqual(1, self.scheduler.queue.qsize())
400405

401406
# Add second item to the queue
402407
second_item = create_p_item(self.organisation.id, 2, data=functions.TestModel(id="456", name="test"))
403408
response = self.client.post(f"/queues/{self.scheduler.scheduler_id}/push", data=second_item.model_dump_json())
409+
second_item_id = response.json().get("id")
404410
self.assertEqual(response.status_code, 201)
405411
self.assertEqual(2, self.scheduler.queue.qsize())
406412

@@ -409,7 +415,7 @@ def test_pop_queue_filters(self):
409415
f"/queues/{self.scheduler.scheduler_id}/pop", json=[{"field": "name", "operator": "eq", "value": "test"}]
410416
)
411417
self.assertEqual(200, response.status_code)
412-
self.assertEqual(str(first_item.id), response.json().get("id"))
418+
self.assertEqual(first_item_id, response.json().get("id"))
413419
self.assertEqual(1, self.scheduler.queue.qsize())
414420

415421
# Should not return any items
@@ -425,7 +431,7 @@ def test_pop_queue_filters(self):
425431
f"/queues/{self.scheduler.scheduler_id}/pop", json=[{"field": "name", "operator": "eq", "value": "test"}]
426432
)
427433
self.assertEqual(200, response.status_code)
428-
self.assertEqual(str(second_item.id), response.json().get("id"))
434+
self.assertEqual(second_item_id, response.json().get("id"))
429435
self.assertEqual(0, self.scheduler.queue.qsize())
430436

431437
def test_pop_empty(self):
@@ -449,10 +455,11 @@ def setUp(self):
449455
),
450456
)
451457
response = self.client.post(f"/queues/{self.scheduler.scheduler_id}/push", data=first_item.model_dump_json())
458+
initial_item_id = response.json().get("id")
452459
self.assertEqual(response.status_code, 201)
453460
self.assertEqual(1, self.scheduler.queue.qsize())
454461

455-
self.first_item_api = self.client.get(f"/tasks/{first_item.id}").json()
462+
self.first_item_api = self.client.get(f"/tasks/{initial_item_id}").json()
456463

457464
# Add second item to the queue
458465
second_item = create_p_item(
@@ -461,10 +468,11 @@ def setUp(self):
461468
data=functions.TestModel(id="456", name="test"),
462469
)
463470
response = self.client.post(f"/queues/{self.scheduler.scheduler_id}/push", data=second_item.model_dump_json())
471+
second_item_id = response.json().get("id")
464472
self.assertEqual(response.status_code, 201)
465473
self.assertEqual(2, self.scheduler.queue.qsize())
466474

467-
self.second_item_api = self.client.get(f"/tasks/{second_item.id}").json()
475+
self.second_item_api = self.client.get(f"/tasks/{second_item_id}").json()
468476

469477
def test_create_task(self):
470478
item = create_p_item(self.organisation.id, 1)

mula/tests/mocks/queue.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
from scheduler import queues
2-
3-
from tests.utils import functions
1+
from scheduler import models, queues
2+
from scheduler.utils import dict_utils
43

54

65
class MockPriorityQueue(queues.PriorityQueue):
7-
def create_hash(self, item: functions.TestModel):
8-
return item.id.hex
6+
def create_hash(self, p_item: models.PrioritizedItem) -> str:
7+
return dict_utils.deep_get(p_item.model_dump(), ["data", "id"])

rocky/katalogus/views/mixins.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from logging import getLogger
22
from typing import List, Optional, Union
3-
from uuid import uuid4
43

54
from account.mixins import OrganizationView
65
from django.contrib import messages
@@ -82,12 +81,11 @@ class NormalizerMixin(OctopoesView):
8281

8382
def run_normalizer(self, normalizer: KATalogusNormalizer, raw_data: RawData) -> None:
8483
normalizer_task = NormalizerTask(
85-
id=uuid4(),
8684
normalizer=Normalizer.parse_obj(normalizer.dict()),
8785
raw_data=raw_data,
8886
)
8987

90-
task = QueuePrioritizedItem(id=normalizer_task.id, priority=1, data=normalizer_task)
88+
task = QueuePrioritizedItem(priority=1, data=normalizer_task)
9189

9290
schedule_task(self.request, self.organization.code, task)
9391

@@ -100,13 +98,12 @@ class BoefjeMixin(OctopoesView):
10098

10199
def run_boefje(self, katalogus_boefje: KATalogusBoefje, ooi: Optional[OOI]) -> None:
102100
boefje_task = BoefjeTask(
103-
id=uuid4().hex,
104101
boefje=Boefje.parse_obj(katalogus_boefje.dict()),
105102
input_ooi=ooi.reference if ooi else None,
106103
organization=self.organization.code,
107104
)
108105

109-
task = QueuePrioritizedItem(id=boefje_task.id, priority=1, data=boefje_task)
106+
task = QueuePrioritizedItem(priority=1, data=boefje_task)
110107
schedule_task(self.request, self.organization.code, task)
111108

112109
def run_boefje_for_oois(

rocky/rocky/locale/django.pot

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ msgid ""
88
msgstr ""
99
"Project-Id-Version: PACKAGE VERSION\n"
1010
"Report-Msgid-Bugs-To: \n"
11-
"POT-Creation-Date: 2023-12-11 09:21+0000\n"
11+
"POT-Creation-Date: 2023-12-14 15:21+0000\n"
1212
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
1313
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
1414
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -3337,6 +3337,10 @@ msgid ""
33373337
"refresh of the page may be needed to show the results."
33383338
msgstr ""
33393339

3340+
#: tools/view_helpers.py rocky/scheduler.py
3341+
msgid "Task not found."
3342+
msgstr ""
3343+
33403344
#: rocky/messaging.py
33413345
msgid ""
33423346
"You have trusted this member with a clearance level of L{}. This member "
@@ -3360,10 +3364,6 @@ msgstr ""
33603364
msgid "Task already queued."
33613365
msgstr ""
33623366

3363-
#: rocky/scheduler.py
3364-
msgid "Task not found."
3365-
msgstr ""
3366-
33673367
#: rocky/settings.py
33683368
msgid "Blue light"
33693369
msgstr ""

rocky/rocky/scheduler.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class NormalizerMeta(BaseModel):
6565
class NormalizerTask(BaseModel):
6666
"""NormalizerTask represent data needed for a Normalizer to run."""
6767

68-
id: uuid.UUID
68+
id: Optional[uuid.UUID]
6969
normalizer: Normalizer
7070
raw_data: RawData
7171
type: str = "normalizer"
@@ -74,7 +74,7 @@ class NormalizerTask(BaseModel):
7474
class BoefjeTask(BaseModel):
7575
"""BoefjeTask represent data needed for a Boefje to run."""
7676

77-
id: uuid.UUID
77+
id: Optional[uuid.UUID] = None
7878
boefje: Boefje
7979
input_ooi: Optional[str]
8080
organization: str
@@ -87,7 +87,7 @@ class QueuePrioritizedItem(BaseModel):
8787
representation.
8888
"""
8989

90-
id: uuid.UUID
90+
id: Optional[uuid.UUID]
9191
priority: int
9292
hash: Optional[str]
9393
data: Union[BoefjeTask, NormalizerTask]
@@ -105,7 +105,7 @@ class TaskStatus(Enum):
105105

106106

107107
class Task(BaseModel):
108-
id: uuid.UUID
108+
id: Optional[uuid.UUID] = None
109109
scheduler_id: str
110110
type: str
111111
p_item: QueuePrioritizedItem
@@ -227,7 +227,7 @@ def get_lazy_task_list(
227227
def get_task_details(self, organization_code: str, task_id: str) -> Optional[Task]:
228228
res = self.session.get(f"{self._base_uri}/tasks/{task_id}")
229229
res.raise_for_status()
230-
task_details = Task.model_validate_json(res.content)
230+
task_details = Task.parse_raw(res.content)
231231

232232
if task_details.type == "normalizer":
233233
organization = task_details.p_item.data.raw_data.boefje_meta.organization

rocky/tools/view_helpers.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,31 @@ def schedule_task(request: HttpRequest, organization_code: str, p_item: QueuePri
178178
)
179179

180180

181+
# FIXME: Tasks should be (re)created with supplied data, not by fetching prior
182+
# task info from the scheduler. Task data should be available from the context
183+
# from which the task is created.
181184
def reschedule_task(request: HttpRequest, organization_code: str, task_id: str) -> None:
182185
try:
183186
task = client.get_task_details(organization_code, task_id)
184187
except SchedulerError as error:
185188
messages.error(request, error.message)
186-
else:
187-
schedule_task(request, organization_code, task.p_item)
189+
return
190+
191+
if not task:
192+
messages.error(request, _("Task not found."))
193+
return
194+
195+
# Remove id from task data, this should be created by the scheduler
196+
new_task = task.p_item.data
197+
delattr(new_task, "id")
198+
199+
try:
200+
new_p_item = QueuePrioritizedItem(
201+
data=new_task,
202+
priority=1,
203+
)
204+
205+
schedule_task(request, organization_code, new_p_item)
206+
except SchedulerError as error:
207+
messages.error(request, error.message)
208+
return

0 commit comments

Comments
 (0)