Skip to content

Commit ff93cc3

Browse files
committed
Merge remote-tracking branch 'origin/mf/retry-tasks'
2 parents 4c2bbc8 + b0f0898 commit ff93cc3

File tree

5 files changed

+36
-23
lines changed

5 files changed

+36
-23
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ async def main():
5151
)
5252
ctx.run("/golem/entrypoints/render_entrypoint.py")
5353
ctx.download_file("/golem/output/out.png", f"output_{frame}.png")
54-
yield ctx.commit()
54+
yield ctx.commit(task)
5555
# TODO: Check if job is valid
56-
# and reject by: task.reject_task(msg = 'invalid file')
56+
# and reject by: task.reject_task(reason = 'invalid file')
5757
task.accept_task()
5858

5959
ctx.log("no more frame to render")

examples/blender/blender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async def worker(ctx: WorkContext, tasks):
3434
)
3535
ctx.run("/golem/entrypoints/run-blender.sh")
3636
ctx.download_file(f"/golem/output/out{frame:04d}.png", f"output_{frame}.png")
37-
yield ctx.commit()
37+
yield ctx.commit(task)
3838
# TODO: Check if job results are valid
3939
# and reject by: task.reject_task(msg = 'invalid file')
4040
task.accept_task()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "poetry.masonry.api"
44

55
[tool.poetry]
66
name = "yapapi"
7-
version = "0.1.5"
7+
version = "0.2.0"
88
description = "Hi-Level API for Golem The Next Milestone"
99
authors = ["Przemysław K. Rekucki <[email protected]>"]
1010
license = "LGPL-3.0-or-later"

yapapi/runner/__init__.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,11 @@ async def map(
167167
tasks_processed = {"c": 0, "s": 0}
168168

169169
def on_work_done(task, status):
170-
tasks_processed["c"] += 1
170+
if status == "accept":
171+
tasks_processed["c"] += 1
172+
else:
173+
loop = asyncio.get_event_loop()
174+
loop.create_task(work_queue.put(task))
171175

172176
# Creating allocation
173177
if not self._budget_allocation:
@@ -285,7 +289,7 @@ async def start_worker(agreement: rest.market.Agreement):
285289
provider_idn = details.view_prov(Identification)
286290
emit_progress("wkr", "created", wid, agreement=agreement.id, provider_idn=provider_idn)
287291

288-
async def task_emiter():
292+
async def task_emitter():
289293
while True:
290294
item = await work_queue.get()
291295
item._add_callback(on_work_done)
@@ -297,20 +301,25 @@ async def task_emiter():
297301
emit_progress("act", "create", act.id)
298302

299303
work_context = WorkContext(f"worker-{wid}", storage_manager)
300-
async for batch in worker(work_context, task_emiter()):
301-
await batch.prepare()
302-
cc = CommandContainer()
303-
batch.register(cc)
304-
remote = await act.send(cc.commands())
305-
print("new batch !!!", cc.commands(), remote)
306-
async for step in remote:
307-
message = step.message[:25] if step.message else None
308-
idx = step.idx
309-
emit_progress("wkr", "step", wid, message=message, idx=idx)
310-
emit_progress("wkr", "get-results", wid)
311-
await batch.post()
312-
emit_progress("wkr", "bach-done", wid)
313-
await accept_payment_for_agreement(agreement.id, partial=True)
304+
async for (task, batch) in worker(work_context, task_emitter()):
305+
try:
306+
await batch.prepare()
307+
print("prepared")
308+
cc = CommandContainer()
309+
batch.register(cc)
310+
remote = await act.send(cc.commands())
311+
print("new batch !!!", cc.commands(), remote)
312+
async for step in remote:
313+
message = step.message[:25] if step.message else None
314+
idx = step.idx
315+
emit_progress("wkr", "step", wid, message=message, idx=idx)
316+
emit_progress("wkr", "get-results", wid)
317+
await batch.post()
318+
emit_progress("wkr", "batch-done", wid)
319+
await accept_payment_for_agreement(agreement.id, partial=True)
320+
except Exception as exc:
321+
task.reject_task(reason=f"failure: {exc}")
322+
raise
314323

315324
await accept_payment_for_agreement(agreement.id)
316325
emit_progress("wkr", "done", wid, agreement=agreement.id)
@@ -478,9 +487,13 @@ def accept_task(self, result: Optional[TaskResult] = None):
478487
for cb in self._callbacks:
479488
cb(self, "accept")
480489

481-
def reject_task(self):
490+
def reject_task(self, reason: Optional[str] = None):
491+
if self._emit_event:
492+
self._emit_event("task", "reject", None, reason=reason)
482493
assert self._status == TaskStatus.RUNNING
483494
self._status = TaskStatus.REJECTED
495+
for cb in self._callbacks:
496+
cb(self, "reject")
484497

485498

486499
class Package(abc.ABC):

yapapi/runner/ctx.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def download_file(self, src_path: str, dst_path: str):
173173
def log(self, *args):
174174
print(f"W{self._id}: ", *args)
175175

176-
def commit(self) -> Work:
176+
def commit(self, task: "Task") -> Tuple["Task", Work]:
177177
steps = self._pending_steps
178178
self._pending_steps = []
179-
return _Steps(*steps)
179+
return task, _Steps(*steps)

0 commit comments

Comments
 (0)