Skip to content

Commit 7b882f1

Browse files
Merge pull request #1 from huntflow/initial-review
Initial review
2 parents 6fcad88 + d31383f commit 7b882f1

File tree

13 files changed

+240
-242
lines changed

13 files changed

+240
-242
lines changed

tests/test_worker.py

+91-113
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from yatq.enums import RetryPolicy, TaskState
88
from yatq.queue import Queue
9+
from yatq.worker.factory.simple import SimpleJobFactory
910
from yatq.worker.job.simple import SimpleJob
1011
from yatq.worker.runner import build_worker
1112
from yatq.worker.worker_settings import WorkerSettings
@@ -19,15 +20,13 @@ class Job(SimpleJob):
1920
async def run(self, **kwargs) -> Any:
2021
event.set()
2122

22-
class Settings(WorkerSettings):
23-
@classmethod
24-
async def redis_connection(cls):
25-
return redis_connection
26-
27-
queue_namespace = task_queue.namespace
28-
factory_kwargs = {"handlers": {"job": Job}}
29-
30-
worker = build_worker(redis_connection, Settings, [task_queue.name])
23+
worker = build_worker(
24+
redis_connection,
25+
SimpleJobFactory,
26+
{"handlers": {"job": Job}},
27+
[task_queue.name],
28+
queue_namespace=task_queue.namespace,
29+
)
3130

3231
run_coro = worker.run()
3332
run_task = asyncio.create_task(run_coro)
@@ -56,14 +55,12 @@ class Job(SimpleJob):
5655
async def run(self, **kwargs) -> Any:
5756
event.set()
5857

59-
class Settings(WorkerSettings):
60-
@classmethod
61-
async def redis_connection(cls):
62-
return redis_connection
63-
64-
factory_kwargs = {"handlers": {"job": Job}}
65-
66-
worker = build_worker(redis_connection, Settings, [task_queue.name])
58+
worker = build_worker(
59+
redis_connection,
60+
SimpleJobFactory,
61+
{"handlers": {"job": Job}},
62+
[task_queue.name],
63+
)
6764

6865
run_coro = worker.run()
6966
run_task = asyncio.create_task(run_coro)
@@ -89,15 +86,13 @@ class Job(SimpleJob):
8986
async def run(self, **kwargs) -> Any:
9087
event.set()
9188

92-
class Settings(WorkerSettings):
93-
@classmethod
94-
async def redis_connection(cls):
95-
return redis_connection
96-
97-
queue_namespace = task_queue.namespace
98-
factory_kwargs = {"handlers": {"job": Job}}
99-
100-
worker = build_worker(redis_connection, Settings, [task_queue.name])
89+
worker = build_worker(
90+
redis_connection,
91+
SimpleJobFactory,
92+
{"handlers": {"job": Job}},
93+
[task_queue.name],
94+
queue_namespace=task_queue.namespace,
95+
)
10196

10297
run_coro = worker.run()
10398
run_task = asyncio.create_task(run_coro)
@@ -122,15 +117,14 @@ class Job(SimpleJob):
122117
async def run(self, **kwargs) -> Any:
123118
pass
124119

125-
class Settings(WorkerSettings):
126-
@classmethod
127-
async def redis_connection(cls):
128-
return redis_connection
129-
130-
queue_namespace = task_queue.namespace
131-
factory_kwargs = {"handlers": {"job": Job}}
132-
133-
worker = build_worker(redis_connection, Settings, [task_queue.name], max_jobs=1)
120+
worker = build_worker(
121+
redis_connection,
122+
SimpleJobFactory,
123+
{"handlers": {"job": Job}},
124+
[task_queue.name],
125+
max_jobs=1,
126+
queue_namespace=task_queue.namespace,
127+
)
134128

135129
scheduled_task_1 = await task_queue.add_task({"name": "job", "kwargs": {}})
136130
scheduled_task_2 = await task_queue.add_task({"name": "job", "kwargs": {}})
@@ -157,15 +151,13 @@ async def redis_connection(cls):
157151

158152
@pytest.mark.asyncio
159153
async def test_worker_job_creation_failed(redis_connection, task_queue: Queue):
160-
class Settings(WorkerSettings):
161-
@classmethod
162-
async def redis_connection(cls):
163-
return redis_connection
164-
165-
queue_namespace = task_queue.namespace
166-
factory_kwargs = {"handlers": {}}
167-
168-
worker = build_worker(redis_connection, Settings, [task_queue.name])
154+
worker = build_worker(
155+
redis_connection,
156+
SimpleJobFactory,
157+
{"handlers": {}},
158+
[task_queue.name],
159+
queue_namespace=task_queue.namespace,
160+
)
169161

170162
run_coro = worker.run()
171163
run_task = asyncio.create_task(run_coro)
@@ -194,15 +186,13 @@ async def process(self, required_arg) -> None:
194186
async def run(self, **kwargs) -> Any:
195187
pass
196188

197-
class Settings(WorkerSettings):
198-
@classmethod
199-
async def redis_connection(cls):
200-
return redis_connection
201-
202-
queue_namespace = task_queue.namespace
203-
factory_kwargs = {"handlers": {"job": Job}}
204-
205-
worker = build_worker(redis_connection, Settings, [task_queue.name])
189+
worker = build_worker(
190+
redis_connection,
191+
SimpleJobFactory,
192+
{"handlers": {"job": Job}},
193+
[task_queue.name],
194+
queue_namespace=task_queue.namespace,
195+
)
206196

207197
run_coro = worker.run()
208198
run_task = asyncio.create_task(run_coro)
@@ -226,15 +216,13 @@ class Job(SimpleJob):
226216
async def run(self, **kwargs) -> Any:
227217
raise ValueError
228218

229-
class Settings(WorkerSettings):
230-
@classmethod
231-
async def redis_connection(cls):
232-
return redis_connection
233-
234-
queue_namespace = task_queue.namespace
235-
factory_kwargs = {"handlers": {"job": Job}}
236-
237-
worker = build_worker(redis_connection, Settings, [task_queue.name])
219+
worker = build_worker(
220+
redis_connection,
221+
SimpleJobFactory,
222+
{"handlers": {"job": Job}},
223+
[task_queue.name],
224+
queue_namespace=task_queue.namespace,
225+
)
238226

239227
run_coro = worker.run()
240228
run_task = asyncio.create_task(run_coro)
@@ -261,15 +249,13 @@ async def run(self, **kwargs) -> Any:
261249
async def post_process(self, **kwargs) -> None:
262250
raise ValueError()
263251

264-
class Settings(WorkerSettings):
265-
@classmethod
266-
async def redis_connection(cls):
267-
return redis_connection
268-
269-
queue_namespace = task_queue.namespace
270-
factory_kwargs = {"handlers": {"job": Job}}
271-
272-
worker = build_worker(redis_connection, Settings, [task_queue.name])
252+
worker = build_worker(
253+
redis_connection,
254+
SimpleJobFactory,
255+
{"handlers": {"job": Job}},
256+
[task_queue.name],
257+
queue_namespace=task_queue.namespace,
258+
)
273259

274260
run_coro = worker.run()
275261
run_task = asyncio.create_task(run_coro)
@@ -293,15 +279,13 @@ class Job(SimpleJob):
293279
async def run(self, **kwargs) -> Any:
294280
raise ValueError
295281

296-
class Settings(WorkerSettings):
297-
@classmethod
298-
async def redis_connection(cls):
299-
return redis_connection
300-
301-
queue_namespace = task_queue.namespace
302-
factory_kwargs = {"handlers": {"job": Job}}
303-
304-
worker = build_worker(redis_connection, Settings, [task_queue.name])
282+
worker = build_worker(
283+
redis_connection,
284+
SimpleJobFactory,
285+
{"handlers": {"job": Job}},
286+
[task_queue.name],
287+
queue_namespace=task_queue.namespace,
288+
)
305289

306290
run_coro = worker.run()
307291
run_task = asyncio.create_task(run_coro)
@@ -323,22 +307,20 @@ async def redis_connection(cls):
323307

324308
@pytest.mark.asyncio
325309
async def test_worker_task_gravekeeper(freezer, redis_connection, task_queue: Queue):
326-
class Settings(WorkerSettings):
327-
@classmethod
328-
async def redis_connection(cls):
329-
return redis_connection
330-
331-
queue_namespace = task_queue.namespace
332-
factory_kwargs = {"handlers": {}}
333-
334310
scheduled_task = await task_queue.add_task(
335311
{"name": "job", "kwargs": {}}, task_timeout=0
336312
)
337313
await task_queue.get_task()
338314

339315
freezer.tick(10)
340316

341-
worker = build_worker(redis_connection, Settings, [task_queue.name])
317+
worker = build_worker(
318+
redis_connection,
319+
SimpleJobFactory,
320+
{"handlers": {}},
321+
[task_queue.name],
322+
queue_namespace=task_queue.namespace,
323+
)
342324
await worker._call_gravekeeper()
343325

344326
task = await task_queue.check_task(scheduled_task.id)
@@ -353,19 +335,17 @@ class Job(SimpleJob):
353335
async def run(self, **kwargs) -> Any:
354336
raise ValueError
355337

356-
class Settings(WorkerSettings):
357-
@classmethod
358-
async def redis_connection(cls):
359-
return redis_connection
360-
361-
@staticmethod
362-
async def on_task_process_exception(job, exc_info):
363-
exception_handler()
364-
365-
queue_namespace = task_queue.namespace
366-
factory_kwargs = {"handlers": {"job": Job}}
338+
async def on_task_process_exception(job, exc_info):
339+
exception_handler()
367340

368-
worker = build_worker(redis_connection, Settings, [task_queue.name])
341+
worker = build_worker(
342+
redis_connection,
343+
SimpleJobFactory,
344+
{"handlers": {"job": Job}},
345+
[task_queue.name],
346+
queue_namespace=task_queue.namespace,
347+
on_task_process_exception=on_task_process_exception,
348+
)
369349

370350
run_coro = worker.run()
371351
run_task = asyncio.create_task(run_coro)
@@ -393,19 +373,17 @@ class Job(SimpleJob):
393373
async def run(self, **kwargs) -> Any:
394374
raise ValueError
395375

396-
class Settings(WorkerSettings):
397-
@classmethod
398-
async def redis_connection(cls):
399-
return redis_connection
400-
401-
@staticmethod
402-
async def on_task_process_exception(exc_info):
403-
raise Exception("FAIL")
376+
async def on_task_process_exception(exc_info):
377+
raise Exception("FAIL")
404378

405-
queue_namespace = task_queue.namespace
406-
factory_kwargs = {"handlers": {"job": Job}}
407-
408-
worker = build_worker(redis_connection, Settings, [task_queue.name])
379+
worker = build_worker(
380+
redis_connection,
381+
SimpleJobFactory,
382+
{"handlers": {"job": Job}},
383+
[task_queue.name],
384+
queue_namespace=task_queue.namespace,
385+
on_task_process_exception=on_task_process_exception,
386+
)
409387

410388
run_coro = worker.run()
411389
run_task = asyncio.create_task(run_coro)

yatq/defaults.py

+12
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,15 @@
22
DEFAULT_QUEUE_NAMESPACE = "tasks"
33
DEFAULT_TIMEOUT = 5 * 60
44
DEFAULT_TASK_EXPIRATION = 60 * 60 * 12
5+
DEFAULT_LOGGING_CONFIG = {
6+
"version": 1,
7+
"disable_existing_loggers": False,
8+
"handlers": {
9+
"console": {
10+
"class": "logging.StreamHandler",
11+
"level": "INFO",
12+
},
13+
},
14+
"root": {"level": "INFO", "handlers": ["console"]},
15+
}
16+
DEFAULT_MAX_JOBS = 8

yatq/dto.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ class Task:
3939
Note on keys and ids:
4040
Task ID represents unique task invocation, while task key is designed to be
4141
unique for task's *purpose*. For instance, multiple tasks are scheduled to reindex
42-
data of same database model. Thus, they key should be equal - for example, "reindex-1".
42+
data of same database model. Thus, that keys should be equal - for example, "reindex-1".
4343
If one task in schedule while another is already PENDING or PROCESSING, queue will reject it
44-
and instead return ID of existing task. On the other hand, if task execution does not overlap,
44+
and instead return ID of the existing task. On the other hand, if task execution does not overlap,
4545
each unique invocation data is stored, identified by task ID.
4646
4747
Task timeout is used to calculate task's deadline after its transition to PROCESSING state.

yatq/enums.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class QueueAction(str, Enum):
4343
Types of events that can happen to tasks in queue. Used in queue events logging.
4444
4545
ADDED - new task added to queue
46-
BROKE - task data somehow lost consistency. More info in message
46+
BROKEN - task data somehow lost consistency. More info in message
4747
BURIED - task was buried after reaching execution timeout
4848
BURY_ERROR - task burial failed. More info in message
4949
COMPLETED - task was completed successfully

0 commit comments

Comments
 (0)