Skip to content

Commit bb1100c

Browse files
authored
[python] Directory utilities (#51)
Reusable subclasses of pathlib.Path representing requestor and provider directory structures. Bump package to v0.20.0.
1 parent 416437b commit bb1100c

File tree

6 files changed

+139
-74
lines changed

6 files changed

+139
-74
lines changed

python/golem_task_api/dirutils.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from pathlib import Path
2+
3+
from golem_task_api import constants
4+
5+
# Concrete path type appropriate for the platform (WindowsPath or PosixPath)
6+
FlavouredPath = type(Path())
7+
8+
9+
class RequestorTaskDir(FlavouredPath):
10+
11+
@property
12+
def task_inputs_dir(self) -> Path:
13+
return Path(self / constants.TASK_INPUTS_DIR)
14+
15+
@property
16+
def task_outputs_dir(self) -> Path:
17+
return Path(self / constants.TASK_OUTPUTS_DIR)
18+
19+
@property
20+
def subtask_inputs_dir(self) -> Path:
21+
return Path(self / constants.SUBTASK_INPUTS_DIR)
22+
23+
@property
24+
def subtask_outputs_top_dir(self) -> Path:
25+
return Path(self / constants.SUBTASK_OUTPUTS_DIR)
26+
27+
def subtask_outputs_dir(self, subtask_id: str) -> Path:
28+
return self.subtask_outputs_top_dir / subtask_id
29+
30+
def prepare(self) -> None:
31+
self.mkdir()
32+
self.task_inputs_dir.mkdir()
33+
self.task_outputs_dir.mkdir()
34+
self.subtask_inputs_dir.mkdir()
35+
self.subtask_outputs_top_dir.mkdir()
36+
37+
38+
class RequestorDir(FlavouredPath):
39+
40+
def task_dir(self, task_id: str) -> RequestorTaskDir:
41+
return RequestorTaskDir(self / task_id)
42+
43+
def subtask_outputs_dir(self, task_id: str, subtask_id: str) -> Path:
44+
return self.task_dir(task_id).subtask_outputs_dir(subtask_id)
45+
46+
47+
class ProviderTaskDir(FlavouredPath):
48+
49+
@property
50+
def subtask_inputs_dir(self) -> Path:
51+
return Path(self / constants.SUBTASK_INPUTS_DIR)
52+
53+
def subtask_dir(self, subtask_id: str) -> Path:
54+
return Path(self / subtask_id)
55+
56+
def prepare(self) -> None:
57+
self.mkdir()
58+
self.subtask_inputs_dir.mkdir()
59+
60+
61+
class ProviderDir(FlavouredPath):
62+
63+
def task_dir(self, task_id: str) -> ProviderTaskDir:
64+
return ProviderTaskDir(self / task_id)
65+
66+
def subtask_dir(self, task_id: str, subtask_id: str) -> Path:
67+
return self.task_dir(task_id).subtask_dir(subtask_id)
68+

python/golem_task_api/handlers.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import Path
55
from typing import List, Tuple, Optional
66

7+
from golem_task_api.dirutils import ProviderTaskDir, RequestorTaskDir
78
from golem_task_api.enums import VerifyResult
89
from golem_task_api.structs import Subtask, Task
910

@@ -38,7 +39,7 @@ class RequestorAppHandler:
3839
@abc.abstractmethod
3940
async def create_task(
4041
self,
41-
task_work_dir: Path,
42+
task_work_dir: RequestorTaskDir,
4243
max_subtasks_count: int,
4344
task_params: dict,
4445
) -> Task:
@@ -47,23 +48,23 @@ async def create_task(
4748
@abc.abstractmethod
4849
async def next_subtask(
4950
self,
50-
task_work_dir: Path,
51+
task_work_dir: RequestorTaskDir,
5152
opaque_node_id: str,
5253
) -> Optional[Subtask]:
5354
pass
5455

5556
@abc.abstractmethod
5657
async def verify(
5758
self,
58-
task_work_dir: Path,
59+
task_work_dir: RequestorTaskDir,
5960
subtask_id: str,
6061
) -> Tuple[VerifyResult, Optional[str]]:
6162
pass
6263

6364
@abc.abstractmethod
6465
async def discard_subtasks(
6566
self,
66-
task_work_dir: Path,
67+
task_work_dir: RequestorTaskDir,
6768
subtask_ids: List[str],
6869
) -> List[str]:
6970
pass
@@ -81,7 +82,7 @@ class ProviderAppHandler:
8182
@abc.abstractmethod
8283
async def compute(
8384
self,
84-
task_work_dir: Path,
85+
task_work_dir: ProviderTaskDir,
8586
subtask_id: str,
8687
subtask_params: dict,
8788
) -> Path:

python/golem_task_api/server.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from grpclib import server
99
from grpclib.health.service import Health
1010

11+
from golem_task_api.dirutils import ProviderTaskDir, RequestorDir
1112
from golem_task_api.proto.golem_task_api_grpc import (
1213
ProviderAppBase,
1314
RequestorAppBase,
@@ -63,15 +64,14 @@ def __init__(
6364
handler: RequestorAppHandler,
6465
lifecycle: AppLifecycleHandler,
6566
) -> None:
66-
self._work_dir = work_dir
67+
self._work_dir = RequestorDir(work_dir)
6768
self._handler = handler
6869
self._lifecycle = lifecycle
6970

7071
@forward_exceptions()
7172
async def CreateTask(self, stream):
7273
request: CreateTaskRequest = await stream.recv_message()
73-
task_id = request.task_id
74-
task_work_dir = self._work_dir / task_id
74+
task_work_dir = self._work_dir.task_dir(request.task_id)
7575
max_subtasks_count = request.max_subtasks_count
7676
task_params = json.loads(request.task_params_json)
7777
task = await self._handler.create_task(
@@ -87,9 +87,8 @@ async def CreateTask(self, stream):
8787
@forward_exceptions()
8888
async def NextSubtask(self, stream):
8989
request: NextSubtaskRequest = await stream.recv_message()
90-
task_id = request.task_id
90+
task_work_dir = self._work_dir.task_dir(request.task_id)
9191
opaque_node_id = request.opaque_node_id
92-
task_work_dir = self._work_dir / task_id
9392
reply = NextSubtaskReply()
9493
subtask = await self._handler.next_subtask(
9594
task_work_dir, opaque_node_id)
@@ -104,9 +103,8 @@ async def NextSubtask(self, stream):
104103
@forward_exceptions()
105104
async def Verify(self, stream):
106105
request: VerifyRequest = await stream.recv_message()
107-
task_id = request.task_id
106+
task_work_dir = self._work_dir.task_dir(request.task_id)
108107
subtask_id = request.subtask_id
109-
task_work_dir = self._work_dir / task_id
110108
result, reason = await self._handler.verify(task_work_dir, subtask_id)
111109
reply = VerifyReply()
112110
reply.result = result.value
@@ -117,9 +115,8 @@ async def Verify(self, stream):
117115
@forward_exceptions()
118116
async def DiscardSubtasks(self, stream):
119117
request: DiscardSubtasksRequest = await stream.recv_message()
120-
task_id = request.task_id
118+
task_work_dir = self._work_dir.task_dir(request.task_id)
121119
subtask_ids = request.subtask_ids
122-
task_work_dir = self._work_dir / task_id
123120
discarded_subtask_ids = \
124121
await self._handler.discard_subtasks(task_work_dir, subtask_ids)
125122
reply = DiscardSubtasksReply()
@@ -137,7 +134,7 @@ async def RunBenchmark(self, stream):
137134
@forward_exceptions()
138135
async def HasPendingSubtasks(self, stream):
139136
request: HasPendingSubtasksRequest = await stream.recv_message()
140-
task_work_dir = self._work_dir / request.task_id
137+
task_work_dir = self._work_dir.task_dir(request.task_id)
141138
has_pending_subtasks = \
142139
await self._handler.has_pending_subtasks(task_work_dir)
143140
reply = HasPendingSubtasksReply()
@@ -159,7 +156,7 @@ def __init__(
159156
handler: ProviderAppHandler,
160157
lifecycle: AppLifecycleHandler,
161158
) -> None:
162-
self._work_dir = work_dir
159+
self._work_dir = ProviderTaskDir(work_dir)
163160
self._handler = handler
164161
self._lifecycle = lifecycle
165162

0 commit comments

Comments
 (0)