Skip to content

Commit 156b41d

Browse files
Updated on_task_process_exception interface; replaced "function" with "name" in SimpleJob processing
1 parent 9477a13 commit 156b41d

File tree

5 files changed

+23
-20
lines changed

5 files changed

+23
-20
lines changed

tests/test_simple_job_factory.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ async def run(self, **kwargs):
2424
task_a = Task(
2525
"test",
2626
timeout=0,
27-
encoded_data=json.dumps({"kwargs": {"test": 1}, "function": "job_a"}),
27+
encoded_data=json.dumps({"kwargs": {"test": 1}, "name": "job_a"}),
2828
)
2929
task_b = Task(
3030
"test",
3131
timeout=0,
32-
encoded_data=json.dumps({"kwargs": {"test": 1}, "function": "job_b"}),
32+
encoded_data=json.dumps({"kwargs": {"test": 1}, "name": "job_b"}),
3333
)
3434

3535
factory = SimpleJobFactory(handlers=handlers)
@@ -45,7 +45,7 @@ def test_missing_key_job_creation():
4545
task = Task(
4646
"test",
4747
timeout=0,
48-
encoded_data=json.dumps({"kwargs": {"test": 1}, "function": "job"}),
48+
encoded_data=json.dumps({"kwargs": {"test": 1}, "name": "job"}),
4949
)
5050
factory = SimpleJobFactory(handlers={})
5151

tests/test_worker.py

+14-14
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def redis_connection(cls):
3333
run_task = asyncio.create_task(run_coro)
3434
await asyncio.wait_for(worker.started.wait(), timeout=1)
3535

36-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
36+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
3737

3838
await asyncio.wait_for(event.wait(), timeout=1)
3939
await worker.stop()
@@ -69,7 +69,7 @@ async def redis_connection(cls):
6969
run_task = asyncio.create_task(run_coro)
7070
await asyncio.wait_for(worker.started.wait(), timeout=1)
7171

72-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
72+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
7373

7474
await asyncio.wait_for(event.wait(), timeout=1)
7575
await worker.stop()
@@ -103,7 +103,7 @@ async def redis_connection(cls):
103103
run_task = asyncio.create_task(run_coro)
104104
await asyncio.wait_for(worker.started.wait(), timeout=1)
105105

106-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
106+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
107107

108108
await asyncio.wait_for(event.wait(), timeout=1)
109109
await asyncio.wait_for(worker.completed_task.wait(), timeout=1)
@@ -132,8 +132,8 @@ async def redis_connection(cls):
132132

133133
worker = build_worker(redis_connection, Settings, [task_queue.name], max_jobs=1)
134134

135-
scheduled_task_1 = await task_queue.add_task({"function": "job", "kwargs": {}})
136-
scheduled_task_2 = await task_queue.add_task({"function": "job", "kwargs": {}})
135+
scheduled_task_1 = await task_queue.add_task({"name": "job", "kwargs": {}})
136+
scheduled_task_2 = await task_queue.add_task({"name": "job", "kwargs": {}})
137137

138138
run_coro = worker.run()
139139
run_task = asyncio.create_task(run_coro)
@@ -171,7 +171,7 @@ async def redis_connection(cls):
171171
run_task = asyncio.create_task(run_coro)
172172
await asyncio.wait_for(worker.started.wait(), timeout=1)
173173

174-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
174+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
175175
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
176176

177177
await worker.stop()
@@ -208,7 +208,7 @@ async def redis_connection(cls):
208208
run_task = asyncio.create_task(run_coro)
209209
await asyncio.wait_for(worker.started.wait(), timeout=1)
210210

211-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
211+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
212212
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
213213

214214
await worker.stop()
@@ -240,7 +240,7 @@ async def redis_connection(cls):
240240
run_task = asyncio.create_task(run_coro)
241241
await asyncio.wait_for(worker.started.wait(), timeout=1)
242242

243-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
243+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
244244
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
245245

246246
await worker.stop()
@@ -275,7 +275,7 @@ async def redis_connection(cls):
275275
run_task = asyncio.create_task(run_coro)
276276
await asyncio.wait_for(worker.started.wait(), timeout=1)
277277

278-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
278+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
279279
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
280280

281281
await worker.stop()
@@ -308,7 +308,7 @@ async def redis_connection(cls):
308308
await asyncio.wait_for(worker.started.wait(), timeout=1)
309309

310310
scheduled_task = await task_queue.add_task(
311-
{"function": "job", "kwargs": {}}, retry_policy=RetryPolicy.LINEAR
311+
{"name": "job", "kwargs": {}}, retry_policy=RetryPolicy.LINEAR
312312
)
313313
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
314314

@@ -332,7 +332,7 @@ async def redis_connection(cls):
332332
factory_kwargs = {"handlers": {}}
333333

334334
scheduled_task = await task_queue.add_task(
335-
{"function": "job", "kwargs": {}}, task_timeout=0
335+
{"name": "job", "kwargs": {}}, task_timeout=0
336336
)
337337
await task_queue.get_task()
338338

@@ -359,7 +359,7 @@ async def redis_connection(cls):
359359
return redis_connection
360360

361361
@staticmethod
362-
async def on_task_process_exception(exc_info):
362+
async def on_task_process_exception(job, exc_info):
363363
exception_handler()
364364

365365
queue_namespace = task_queue.namespace
@@ -371,7 +371,7 @@ async def on_task_process_exception(exc_info):
371371
run_task = asyncio.create_task(run_coro)
372372
await asyncio.wait_for(worker.started.wait(), timeout=1)
373373

374-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
374+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
375375
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
376376

377377
await worker.stop()
@@ -411,7 +411,7 @@ async def on_task_process_exception(exc_info):
411411
run_task = asyncio.create_task(run_coro)
412412
await asyncio.wait_for(worker.started.wait(), timeout=1)
413413

414-
scheduled_task = await task_queue.add_task({"function": "job", "kwargs": {}})
414+
scheduled_task = await task_queue.add_task({"name": "job", "kwargs": {}})
415415
await asyncio.wait_for(worker.got_task.wait(), timeout=1)
416416

417417
await worker.stop()

yatq/worker/factory/simple.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def create_job(self, task: "Task") -> T_SimpleJobClass:
2222
if not task_data:
2323
raise ValueError("Task data is not set")
2424

25-
task_function = task_data["function"]
25+
task_function = task_data["name"]
2626
handler_class = self.handlers[task_function]
2727

2828
return handler_class(task)

yatq/worker/runner.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from yatq.exceptions import TaskRescheduleException
1313
from yatq.queue import Queue
1414
from yatq.worker.factory.base import BaseJobFactory
15+
from yatq.worker.job.base import BaseJob
1516
from yatq.worker.worker_settings import T_ExcInfo, WorkerSettings
1617

1718
LOGGER = logging.getLogger("yatq.worker")
@@ -24,7 +25,7 @@ def __init__(
2425
self,
2526
queue_list: List[Queue],
2627
task_factory: BaseJobFactory,
27-
on_task_process_exception: Callable[[T_ExcInfo], Awaitable],
28+
on_task_process_exception: Callable[[BaseJob, T_ExcInfo], Awaitable],
2829
poll_interval: float = 2.0,
2930
max_jobs: int = 8,
3031
gravekeeper_interval: float = 30.0,
@@ -142,7 +143,7 @@ async def _handle_task(self, wrapper, queue) -> None:
142143

143144
exc_info = cast(T_ExcInfo, sys.exc_info())
144145
try:
145-
await self._on_task_process_exception(exc_info)
146+
await self._on_task_process_exception(task_job, exc_info)
146147
except Exception:
147148
LOGGER.exception("Exception in exception handler")
148149

yatq/worker/worker_settings.py

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import aioredis
55

66
from yatq.worker.factory.simple import SimpleJobFactory
7+
from yatq.worker.job.simple import SimpleJob
78

89
T_ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
910

@@ -29,6 +30,7 @@ async def redis_client() -> aioredis.Redis: # pragma: no cover
2930

3031
@staticmethod
3132
async def on_task_process_exception(
33+
job: SimpleJob,
3234
exc_info: T_ExcInfo,
3335
) -> None: # pragma: no cover
3436
...

0 commit comments

Comments
 (0)