Skip to content

Commit 2c5ad66

Browse files
apputils - DBTaskManager (#57)
1 parent 18919a0 commit 2c5ad66

File tree

12 files changed

+632
-1
lines changed

12 files changed

+632
-1
lines changed

python/golem_task_api/apputils/__init__.py

Whitespace-only changes.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from pathlib import Path
2+
3+
import peewee
4+
from typing import Iterable, Type
5+
6+
7+
database = peewee.DatabaseProxy()
8+
9+
10+
def initialize_database(
11+
db: peewee.Database,
12+
db_path: Path,
13+
models: Iterable[Type[peewee.Model]],
14+
) -> None:
15+
""" Initialize and bind the corresponding database instance
16+
to the database proxy. Create tables for the given models """
17+
db_path.parent.mkdir(parents=True, exist_ok=True)
18+
db.init(str(db_path))
19+
20+
database.initialize(db)
21+
database.create_tables(models, safe=True)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import enum
2+
3+
from typing import Set, Type
4+
5+
import peewee
6+
7+
8+
class EnumField(peewee.CharField):
9+
10+
def __init__(
11+
self,
12+
enum_type: Type[enum.Enum],
13+
*args,
14+
**kwargs,
15+
) -> None:
16+
super().__init__(*args, **kwargs)
17+
18+
self._type: Type[enum.Enum] = enum_type
19+
self._values: Set[enum.Enum] = set([e.value for e in self._type])
20+
21+
def db_value(self, value):
22+
if isinstance(value, self._type):
23+
return value.value
24+
25+
value = self.coerce(value)
26+
if value not in self._values:
27+
raise TypeError(
28+
f"Expected {self._type.__name__} "
29+
f"type or one of {self._values}")
30+
return value
31+
32+
def python_value(self, value):
33+
return self._type(value)
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import enum
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Optional, Dict, List, Tuple
5+
6+
7+
class SubtaskStatus(enum.Enum):
8+
WAITING = None
9+
COMPUTING = 'computing'
10+
VERIFYING = 'verifying'
11+
SUCCESS = 'success'
12+
FAILURE = 'failure'
13+
ABORTED = 'aborted'
14+
15+
@classmethod
16+
def default(cls) -> 'SubtaskStatus':
17+
return cls.WAITING
18+
19+
def is_computable(self) -> bool:
20+
return self in (self.WAITING, self.FAILURE, self.ABORTED)
21+
22+
23+
class TaskManager(ABC):
24+
""" Responsible for managing the internal task state.
25+
26+
Based on a concept of a:
27+
28+
- part
29+
30+
An outcome of splitting a task into separate units of work. There
31+
usually exists a constant number of parts.
32+
33+
- subtask
34+
35+
A clone of a chosen task part that will be assigned to a
36+
computing node.
37+
38+
Each subtask is given a unique identifier in order to distinguish
39+
computation attempts of the same part, which may fail due to
40+
unexpected errors or simply time out.
41+
42+
A successful subtask computation concludes the computation of the
43+
corresponding part.
44+
45+
This class is responsible for correlating subtasks with task parts and
46+
managing their status.
47+
"""
48+
49+
@abstractmethod
50+
def create_task(
51+
self,
52+
part_count: int
53+
) -> None:
54+
""" Persist a "part_count" number of task parts and other necessary
55+
task information """
56+
raise NotImplementedError
57+
58+
@abstractmethod
59+
def abort_task(
60+
self,
61+
) -> None:
62+
""" Change the statuses of currently assigned subtasks to ABORTED.
63+
Subtasks must have a computable or COMPUTING status """
64+
raise NotImplementedError
65+
66+
@abstractmethod
67+
def get_part(
68+
self,
69+
part_num: int
70+
) -> Optional[object]:
71+
""" Return a task part object with the given part_num, if exists """
72+
raise NotImplementedError
73+
74+
@abstractmethod
75+
def get_part_num(
76+
self,
77+
subtask_id: str
78+
) -> Optional[int]:
79+
""" Return a task part object's number for the given subtask_id,
80+
if exists """
81+
raise NotImplementedError
82+
83+
@abstractmethod
84+
def get_subtask(
85+
self,
86+
subtask_id: str
87+
) -> Optional[object]:
88+
""" Return a subtask object with the given subtask_id, if exists """
89+
raise NotImplementedError
90+
91+
@abstractmethod
92+
def start_subtask(
93+
self,
94+
part_num: int,
95+
subtask_id: str
96+
) -> None:
97+
""" Persist a new subtask with the given subtask_id and a COMPUTING
98+
status. Assign that subtask to a task part object with the given
99+
part_num """
100+
raise NotImplementedError
101+
102+
@abstractmethod
103+
def update_subtask_status(
104+
self,
105+
subtask_id: str,
106+
status: SubtaskStatus,
107+
) -> None:
108+
""" Update the status of a subtask with the given subtask_id """
109+
raise NotImplementedError
110+
111+
@abstractmethod
112+
def get_subtasks_statuses(
113+
self,
114+
part_nums: List[int],
115+
) -> Dict[int, Tuple[SubtaskStatus, Optional[str]]]:
116+
""" Return (subtask status, subtask id) tuples mapped to part numbers
117+
with assigned subtasks or (WAITING, None) tuples otherwise.
118+
Task parts are chosen from the "part_nums" pool """
119+
raise NotImplementedError
120+
121+
@abstractmethod
122+
def get_next_computable_part_num(
123+
self,
124+
) -> Optional[int]:
125+
""" Return the next computable task part's number, if exists """
126+
raise NotImplementedError
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
from pathlib import Path
2+
from typing import Optional, List, Dict, Tuple, Type
3+
4+
import peewee
5+
from golem_task_api.apputils.database import (
6+
database,
7+
initialize_database,
8+
)
9+
from golem_task_api.apputils.database.fields import EnumField
10+
from golem_task_api.apputils.task import SubtaskStatus, TaskManager
11+
12+
13+
class Part(peewee.Model):
14+
15+
class Meta:
16+
database = database
17+
18+
num = peewee.IntegerField(
19+
primary_key=True,
20+
unique=True)
21+
subtask = peewee.DeferredForeignKey(
22+
'Subtask',
23+
null=True,
24+
index=True,
25+
backref='parts')
26+
27+
28+
class Subtask(peewee.Model):
29+
30+
class Meta:
31+
database = database
32+
33+
id = peewee.CharField(
34+
primary_key=True,
35+
unique=True)
36+
part = peewee.ForeignKeyField(
37+
Part,
38+
null=True,
39+
index=True,
40+
backref='subtasks')
41+
status = EnumField(
42+
SubtaskStatus,
43+
default=SubtaskStatus.default)
44+
45+
46+
class DBTaskManager(TaskManager):
47+
""" TaskManager subclass with a SQLite database backend for persistence """
48+
49+
def __init__(
50+
self,
51+
work_dir: Path,
52+
part_model: Type[Part] = Part,
53+
subtask_model: Type[Subtask] = Subtask,
54+
) -> None:
55+
56+
self._part = part_model
57+
self._subtask = subtask_model
58+
models = [self._part, self._subtask]
59+
60+
initialize_database(
61+
db=self._database,
62+
db_path=work_dir / 'task.db',
63+
models=models)
64+
65+
@property
66+
def _database(self) -> peewee.Database:
67+
""" Return an instance of a chosen `peewee.Database` subclass """
68+
return peewee.SqliteDatabase(
69+
database=None,
70+
thread_safe=True,
71+
pragmas=(
72+
('foreign_keys', True),
73+
('busy_timeout', 1000),
74+
('journal_mode', 'WAL'),
75+
),
76+
)
77+
78+
def create_task(
79+
self,
80+
part_count: int,
81+
) -> None:
82+
with database.atomic():
83+
for num in range(part_count):
84+
self._part.create(num=num, subtask=None)
85+
86+
def abort_task(
87+
self,
88+
) -> None:
89+
90+
self._subtask.update(
91+
status=SubtaskStatus.ABORTED,
92+
).where(
93+
self._subtask.status != SubtaskStatus.SUCCESS,
94+
self._subtask.id.in_(
95+
self._part.select(
96+
self._part.subtask.id
97+
)
98+
)
99+
).execute()
100+
101+
def get_part(
102+
self,
103+
part_num: int,
104+
) -> Optional[Part]:
105+
try:
106+
return self._part.get(num=part_num)
107+
except peewee.DoesNotExist:
108+
return None
109+
110+
def get_part_num(
111+
self,
112+
subtask_id: str,
113+
) -> Optional[int]:
114+
115+
result = self._part.select(
116+
self._part.num
117+
).join(
118+
self._subtask
119+
).where(
120+
self._subtask.id == subtask_id
121+
).execute()
122+
123+
return result[0].num if result else None
124+
125+
def get_subtask(
126+
self,
127+
subtask_id: str,
128+
) -> Optional[Subtask]:
129+
try:
130+
return self._subtask.get(id=subtask_id)
131+
except peewee.DoesNotExist:
132+
return None
133+
134+
def start_subtask(
135+
self,
136+
part_num: int,
137+
subtask_id: str,
138+
) -> None:
139+
140+
status = self.get_subtasks_statuses([part_num])
141+
if status and part_num in status:
142+
if not status[part_num][0].is_computable():
143+
raise RuntimeError(f"Subtask {part_num} already started")
144+
145+
with database.atomic():
146+
part = self._part.get(self._part.num == part_num)
147+
part.subtask = subtask_id
148+
part.save()
149+
150+
self._subtask.create(
151+
id=subtask_id,
152+
part=part,
153+
status=SubtaskStatus.COMPUTING)
154+
155+
def update_subtask_status(
156+
self,
157+
subtask_id: str,
158+
status: SubtaskStatus
159+
) -> None:
160+
161+
self._subtask.update(
162+
status=status,
163+
).where(
164+
self._subtask.id == subtask_id,
165+
).execute()
166+
167+
def get_subtasks_statuses(
168+
self,
169+
part_nums: List[int],
170+
) -> Dict[int, Tuple[SubtaskStatus, Optional[str]]]:
171+
172+
parts = self._part.select(
173+
self._part.num,
174+
self._subtask.status,
175+
self._subtask.id,
176+
).join(
177+
self._subtask,
178+
join_type=peewee.JOIN.LEFT_OUTER,
179+
).where(
180+
self._part.num.in_(part_nums),
181+
).execute()
182+
183+
return {
184+
p.num: (
185+
SubtaskStatus(p.subtask.status if p.subtask else None),
186+
p.subtask.id if p.subtask else None
187+
)
188+
for p in parts
189+
}
190+
191+
def get_next_computable_part_num(
192+
self,
193+
) -> Optional[int]:
194+
195+
results = self._part.select(
196+
self._part.num
197+
).where(
198+
~peewee.fn.EXISTS(
199+
self._subtask.select(
200+
self._subtask.id,
201+
).where(
202+
self._subtask.id == self._part.subtask,
203+
self._subtask.status.not_in((
204+
SubtaskStatus.ABORTED,
205+
SubtaskStatus.FAILURE
206+
))
207+
)
208+
)
209+
).order_by(
210+
self._part.num.asc()
211+
).limit(1).execute()
212+
213+
return results[0].num if results else None

0 commit comments

Comments
 (0)