Skip to content

Commit 4436e17

Browse files
authored
Merge pull request #78 from nsteinmetz/patch-1
Add function name to logs
2 parents 0e677a6 + 667a532 commit 4436e17

File tree

7 files changed

+351
-364
lines changed

7 files changed

+351
-364
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ Let's see what the output looks like:
7979

8080
```
8181
[INFO] 02:14:30: starting worker 3265311d for 2 functions
82-
[INFO] 02:14:35: task cf0c55387a214320bd23e8987283a562 → worker 3265311d
83-
[INFO] 02:14:38: task cf0c55387a214320bd23e8987283a562 ← 3
84-
[INFO] 02:14:40: task 1de3f192ee4a40d4884ebf303874681c → worker 3265311d
85-
[INFO] 02:14:41: task 1de3f192ee4a40d4884ebf303874681c ← 1
86-
[INFO] 02:15:00: task 2a4b864e5ecd4fc99979a92f5db3a6e0 → worker 3265311d
82+
[INFO] 02:14:35: task fetch □ cf0c55387a214320bd23e8987283a562 → worker 3265311d
83+
[INFO] 02:14:38: task fetch ■ cf0c55387a214320bd23e8987283a562 ← 3
84+
[INFO] 02:14:40: task fetch □ 1de3f192ee4a40d4884ebf303874681c → worker 3265311d
85+
[INFO] 02:14:41: task fetch ■ 1de3f192ee4a40d4884ebf303874681c ← 1
86+
[INFO] 02:15:00: task cronjob □ 2a4b864e5ecd4fc99979a92f5db3a6e0 → worker 3265311d
8787
Nobody respects the spammish repetition!
88-
[INFO] 02:15:00: task 2a4b864e5ecd4fc99979a92f5db3a6e0 ← None
88+
[INFO] 02:15:00: task cronjob ■ 2a4b864e5ecd4fc99979a92f5db3a6e0 ← None
8989
```
9090
```python
9191
TaskInfo(fn_name='sleeper', enqueue_time=1751508876961, tries=0, scheduled=datetime.datetime(2025, 7, 3, 2, 14, 39, 961000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())

docs/getting-started.rst

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ To start, you'll need to create a ``Worker`` object. At worker creation, you can
2626
You can also do any startup/shutdown work here
2727
"""
2828
async with AsyncClient() as http_client:
29-
yield Context(http_client)
29+
yield WorkerContext(http_client)
3030
3131
worker = Worker(redis_url="redis://localhost:6379", lifespan=lifespan)
3232
@@ -66,13 +66,12 @@ Let's see what the output looks like:
6666

6767
.. code-block::
6868
69-
[INFO] 13:18:07: starting worker ca5bd9eb for 2 functions
70-
[INFO] 13:18:10: task 1ab9543aae374bd89713ca00f5c566f9 → worker ca5bd9eb
71-
[INFO] 13:18:10: task 1ab9543aae374bd89713ca00f5c566f9 ← 15
72-
[INFO] 13:18:14: task cac277a9e3034704a36e67099c1d6f07 → worker ca5bd9eb
73-
[INFO] 13:18:15: task cac277a9e3034704a36e67099c1d6f07 ← 303557
69+
[INFO] 07:19:48: task fetch □ 45d7ff032e6d42239e9f479a2fc4b70e → worker 12195ce1
70+
[INFO] 07:19:48: task fetch ■ 45d7ff032e6d42239e9f479a2fc4b70e ← 15
71+
[INFO] 07:19:51: task fetch □ 65e687f9ba644a1fbe23096fa246dfe1 → worker 12195ce1
72+
[INFO] 07:19:52: task fetch ■ 65e687f9ba644a1fbe23096fa246dfe1 ← 303659
7473
7574
.. code-block:: python
7675
77-
TaskInfo(fn_name='fetch', enqueue_time=1751635090933, tries=0, scheduled=datetime.datetime(2025, 7, 4, 13, 18, 13, 933000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
78-
TaskResult(fn_name='fetch', enqueue_time=1751635090933, success=True, result=303557, start_time=1751635094068, finish_time=1751635095130, tries=1, worker_id='ca5bd9eb')
76+
TaskInfo(fn_name='fetch', enqueue_time=1756365588232, tries=0, scheduled=datetime.datetime(2025, 8, 28, 7, 19, 51, 232000, tzinfo=datetime.timezone.utc), dependencies=set(), dependents=set())
77+
TaskResult(fn_name='fetch', enqueue_time=1756365588232, success=True, result=303659, start_time=1756365591327, finish_time=1756365592081, tries=1, worker_id='12195ce1')

docs/worker.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Next, create an async context manager to run at worker creation/teardown. Use th
3636
# here we run code if desired after worker start up
3737
# yield our dependencies as an instance of the class
3838
async with AsyncClient() as http_client:
39-
yield Context(http_client)
39+
yield WorkerContext(http_client)
4040
# here we run code if desired before worker shutdown
4141
4242
Now, tasks created for the worker will have access to the dependencies like so:

pyproject.toml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ authors = [
5454
{ name = "Graeme Holliday", email = "graeme@tastyware.dev" }
5555
]
5656
dependencies = [
57-
"anyio>=4.8.0",
57+
"anyio>=4.10.0",
5858
"coredis>=5.0.1",
5959
"crontab>=1.0.5",
60-
"typer>=0.15.2",
60+
"typer>=0.16.1",
6161
"uvloop>=0.21.0; sys_platform != 'win32'",
6262
"watchfiles>=1.1.0",
6363
]
@@ -71,10 +71,10 @@ benchmark = [
7171
]
7272
web = [
7373
"async-lru>=2.0.5",
74-
"fastapi>=0.115.12",
74+
"fastapi>=0.116.1",
7575
"jinja2>=3.1.6",
7676
"python-multipart>=0.0.20",
77-
"uvicorn>=0.34.2",
77+
"uvicorn>=0.35.0",
7878
]
7979

8080
[project.scripts]
@@ -89,16 +89,16 @@ Changelog = "https://github.com/tastyware/streaq/releases"
8989

9090
[dependency-groups]
9191
dev = [
92-
"enum-tools[sphinx]>=0.12.0",
92+
"enum-tools[sphinx]>=0.13.0",
9393
"httpx>=0.28.1",
94-
"pyright>=1.1.392.post0",
95-
"pytest>=8.3.4",
96-
"pytest-cov>=6.0.0",
94+
"pyright>=1.1.404",
95+
"pytest>=8.4.1",
96+
"pytest-cov>=6.2.1",
9797
"pytest-xdist>=3.8.0",
98-
"ruff>=0.9.3",
98+
"ruff>=0.12.10",
9999
"sphinx>=8.1.3",
100-
"sphinx-immaterial>=0.13.5",
101-
"testcontainers[redis]>=4.9.1",
100+
"sphinx-immaterial>=0.13.6",
101+
"testcontainers[redis]>=4.12.0",
102102
]
103103

104104
[tool.pytest.ini_options]

streaq/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import coredis
44

5-
VERSION = "5.2.1"
5+
VERSION = "5.2.2"
66
__version__ = VERSION
77

88
logger = logging.getLogger(__name__)

streaq/worker.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -405,11 +405,10 @@ def wrapped(fn: AsyncCron[R] | SyncCron[R]) -> RegisteredCron[C, R]:
405405
)
406406
if task.fn_name in self.registry:
407407
raise StreaqError(
408-
f"A task named {task.fn_name!r} has already been registered!"
408+
f"A task named {task.fn_name} has already been registered!"
409409
)
410410
self.cron_jobs[task.fn_name] = task
411411
self.registry[task.fn_name] = task
412-
logger.debug(f"cron job {task.fn_name} registered in worker {self.id}")
413412
return task
414413

415414
return wrapped # type: ignore
@@ -460,10 +459,9 @@ def wrapped(
460459
)
461460
if task.fn_name in self.registry:
462461
raise StreaqError(
463-
f"A task named {task.fn_name!r} has already been registered!"
462+
f"A task named {task.fn_name} has already been registered!"
464463
)
465464
self.registry[task.fn_name] = task
466-
logger.debug(f"task {task.fn_name} registered in worker {self.id}")
467465
return task
468466

469467
return wrapped # type: ignore
@@ -637,7 +635,9 @@ async def abort_tasks(self, tasks: set[str]) -> None:
637635
and not self._cancel_scopes[task_id].cancel_called
638636
):
639637
self._cancel_scopes[task_id].cancel()
640-
logger.debug(f"aborting task {task_id} in worker {self.id}")
638+
logger.debug(
639+
f"task ⊘ {task_id} marked for abortion in worker {self.id}"
640+
)
641641

642642
async def schedule_cron_jobs(self) -> None:
643643
"""
@@ -767,7 +767,7 @@ def key(mid: str) -> str:
767767
if len(output) > truncate_length:
768768
output = f"{output[:truncate_length]}…"
769769
if not silent:
770-
logger.info(f"task {task_id}{output}")
770+
logger.info(f"task {fn_name}{task_id}{output}")
771771
if triggers:
772772
args = self.serialize(to_tuple(return_value))
773773
pipe.set(key(REDIS_PREVIOUS), args, ex=timedelta(minutes=5))
@@ -841,26 +841,26 @@ def key(mid: str) -> str:
841841
await pipe.execute()
842842
raw, task_try, abort, active = await asyncio.gather(*commands)
843843
if not raw:
844-
logger.warning(f"task {task_id} expired")
844+
logger.warning(f"task {task_id} expired")
845845
return await self.finish_failed_task(
846846
msg, StreaqError("Task expired!"), task_try
847847
)
848848
if not active:
849-
logger.warning(f"task {task_id} reclaimed from worker {self.id}")
849+
logger.warning(f"task {task_id} reclaimed from worker {self.id}")
850850
self.counters["relinquished"] += 1
851851
return None
852852

853853
try:
854854
data = self.deserialize(raw)
855855
except StreaqError as e:
856-
logger.exception(f"Failed to deserialize task {task_id}!")
856+
logger.error(f"task {task_id} failed to deserialize")
857857
return await self.finish_failed_task(msg, e, task_try)
858858

859859
if (fn_name := data["f"]) not in self.registry:
860-
logger.error(f"Missing function {fn_name}, can't execute task {task_id}!")
860+
logger.error(f"task {fn_name}{task_id} aborted, missing function")
861861
return await self.finish_failed_task(
862862
msg,
863-
StreaqError("Nonexistent function!"),
863+
StreaqError(f"Missing function {fn_name}!"),
864864
task_try,
865865
enqueue_time=data["t"],
866866
fn_name=data["f"],
@@ -869,27 +869,27 @@ def key(mid: str) -> str:
869869

870870
if abort:
871871
if not task.silent:
872-
logger.info(f"task {task_id} aborted prior to run")
872+
logger.info(f"task {fn_name}{task_id} aborted prior to run")
873873
return await self.finish_failed_task(
874874
msg,
875875
asyncio.CancelledError("Task aborted prior to run!"),
876876
task_try,
877877
enqueue_time=data["t"],
878-
fn_name=data["f"],
878+
fn_name=fn_name,
879879
silent=task.silent,
880880
ttl=task.ttl,
881881
)
882882
if task.max_tries and task_try > task.max_tries:
883883
if not task.silent:
884884
logger.warning(
885-
f"task {task_id} failed × after {task.max_tries} retries"
885+
f"task {fn_name} × {task_id} failed after {task.max_tries} retries"
886886
)
887887
return await self.finish_failed_task(
888888
msg,
889-
StreaqError(f"Max retry attempts reached for task {task_id}!"),
889+
StreaqError("Max retry attempts reached for task!"),
890890
task_try,
891891
enqueue_time=data["t"],
892-
fn_name=data["f"],
892+
fn_name=fn_name,
893893
silent=task.silent,
894894
ttl=task.ttl,
895895
)
@@ -901,7 +901,7 @@ def key(mid: str) -> str:
901901
after = data.get("A")
902902
pipe = await self.redis.pipeline(transaction=True)
903903
if task.unique:
904-
lock_key = self.prefix + REDIS_UNIQUE + task.fn_name
904+
lock_key = self.prefix + REDIS_UNIQUE + fn_name
905905
locked = pipe.set(
906906
lock_key, task_id, get=True, condition=PureToken.NX, pxat=timeout
907907
)
@@ -917,7 +917,8 @@ def key(mid: str) -> str:
917917
if existing and existing != task_id:
918918
if not task.silent:
919919
logger.warning(
920-
f"unique task {task_id} clashed ↯ with running task {existing}"
920+
f"task {fn_name}{task_id} clashed with unique task "
921+
f"{existing}"
921922
)
922923
return await self.finish_failed_task(
923924
msg,
@@ -927,7 +928,7 @@ def key(mid: str) -> str:
927928
),
928929
task_try,
929930
enqueue_time=data["t"],
930-
fn_name=data["f"],
931+
fn_name=fn_name,
931932
silent=task.silent,
932933
ttl=task.ttl,
933934
)
@@ -959,7 +960,7 @@ async def _fn(*args: Any, **kwargs: Any) -> Any:
959960
return await task.fn(*args, **kwargs)
960961

961962
if not task.silent:
962-
logger.info(f"task {task_id} → worker {self.id}")
963+
logger.info(f"task {task.fn_name}{task_id} → worker {self.id}")
963964

964965
wrapped = _fn
965966
for middleware in reversed(self.middlewares):
@@ -978,7 +979,7 @@ async def _fn(*args: Any, **kwargs: Any) -> Any:
978979
success = False
979980
done = True
980981
if not task.silent:
981-
logger.info(f"task {task_id} aborted")
982+
logger.info(f"task {task.fn_name}{task_id} aborted")
982983
self.counters["aborted"] += 1
983984
self.counters["failed"] -= 1 # this will get incremented later
984985
except StreaqRetry as e:
@@ -988,22 +989,26 @@ async def _fn(*args: Any, **kwargs: Any) -> Any:
988989
schedule = datetime_ms(e.schedule)
989990
if not task.silent:
990991
logger.exception(f"Retrying task {task_id}!")
991-
logger.info(f"retrying ↻ task {task_id} at {schedule}")
992+
logger.info(
993+
f"task {task.fn_name}{task_id} retrying at {schedule}"
994+
)
992995
else:
993996
delay = to_ms(e.delay) if e.delay is not None else task_try**2 * 1000
994997
schedule = now_ms() + delay
995998
if not task.silent:
996999
logger.exception(f"Retrying task {task_id}!")
997-
logger.info(f"retrying ↻ task {task_id} in {delay}s")
1000+
logger.info(f"task {task.fn_name}{task_id} retrying in {delay}s")
9981001
except TimeoutError as e:
9991002
if not task.silent:
1000-
logger.error(f"task {task_id} timed out")
1003+
logger.error(f"task {task.fn_name}{task_id} timed out")
10011004
result = e
10021005
success = False
10031006
done = True
10041007
except asyncio.CancelledError:
10051008
if not task.silent:
1006-
logger.info(f"task {task_id} cancelled, will be retried ↻")
1009+
logger.info(
1010+
f"task {task.fn_name}{task_id} cancelled, will be retried"
1011+
)
10071012
success = False
10081013
done = False
10091014
raise # best practice from anyio docs
@@ -1012,8 +1017,8 @@ async def _fn(*args: Any, **kwargs: Any) -> Any:
10121017
success = False
10131018
done = True
10141019
if not task.silent:
1015-
logger.info(f"task {task_id} failed ×")
10161020
logger.exception(f"Task {task_id} failed!")
1021+
logger.info(f"task {task.fn_name} × {task_id} failed")
10171022
finally:
10181023
with CancelScope(shield=True):
10191024
finish_time = now_ms()
@@ -1057,7 +1062,7 @@ async def fail_task_dependents(self, dependents: list[str]) -> None:
10571062
self.counters["failed"] += len(dependents)
10581063
to_delete: list[KeyT] = []
10591064
for dep_id in dependents:
1060-
logger.info(f"task {dep_id} dependency failed ×")
1065+
logger.info(f"task dependent × {dep_id} failed")
10611066
to_delete.append(self.prefix + REDIS_TASK + dep_id)
10621067
pipe.set(self.results_key + dep_id, result, ex=300)
10631068
pipe.publish(self._channel_key + dep_id, result)

0 commit comments

Comments
 (0)