Skip to content

Commit 5c7bdc1

Browse files
Merge pull request #3 from huntflow/time-metrics-collection
Added time metrics collection
2 parents 7b882f1 + a47cffd commit 5c7bdc1

File tree

5 files changed

+33
-4
lines changed

5 files changed

+33
-4
lines changed

tests/conftest.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ async def create_redis_connection(redis_uri: str):
1515
async def zadd_single(client: aioredis.Redis, set_name: str, key: str, value: Any):
1616
await client.zadd(set_name, {key: value})
1717

18-
1918
else:
2019

2120
async def create_redis_connection(redis_uri: str):
@@ -118,6 +117,13 @@ async def assert_metric_broken(self, value: int):
118117

119118
assert stored_value == value
120119

120+
async def assert_metric_time_wait(self, value: int):
121+
stored_value = int(
122+
await self.queue.client.get(self.queue.metrics_time_wait) or 0
123+
)
124+
125+
assert stored_value == value
126+
121127

122128
@pytest.fixture
123129
def queue_checker(task_queue) -> QueueChecker:

tests/test_task_queue.py

+11
Original file line numberDiff line numberDiff line change
@@ -516,3 +516,14 @@ async def test_task_retry_delay(task_queue, queue_checker, freezer):
516516
await queue_checker.assert_metric_added(1)
517517
await queue_checker.assert_metric_taken(2)
518518
await queue_checker.assert_metric_requeued(1)
519+
520+
521+
@pytest.mark.asyncio
522+
async def test_task_wait_time_metric(task_queue, queue_checker, freezer):
523+
await task_queue.add_task({"key": "value"})
524+
525+
await queue_checker.assert_metric_time_wait(0)
526+
freezer.tick(3.0)
527+
528+
await task_queue.get_task()
529+
await queue_checker.assert_metric_time_wait(3000)

yatq/lua/get_template_lua.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@
2323
end
2424
end
2525
26+
local function incr_metric_key_by (key, amount)
27+
local incr_success, value = redis.pcall("INCRBY", key, amount)
28+
29+
if not incr_success then
30+
redis.pcall("SET", key, amount)
31+
end
32+
end
33+
2634
2735
--[[ Drops invalid task keys from queue
2836
@@ -46,7 +54,7 @@
4654
end
4755
4856
49-
local available_tasks = redis.call("ZRANGEBYSCORE", pending_key, 0, time, "LIMIT", 0, 1)
57+
local available_tasks = redis.call("ZRANGEBYSCORE", pending_key, 0, time, "WITHSCORES", "LIMIT", 0, 1)
5058
local task_key = available_tasks[1]
5159
5260
if task_key == nil then
@@ -55,6 +63,8 @@
5563
})
5664
end
5765
66+
local task_score = tonumber(available_tasks[2])
67+
5868
redis.call("ZREM", pending_key, task_key)
5969
local task_id = redis.call("HGET", task_mapping_key, task_key)
6070
if not task_id then
@@ -106,6 +116,7 @@
106116
redis.call("PUBLISH", channel, message)
107117
108118
incr_metric_key("$metrics_taken_key")
119+
incr_metric_key_by("$metrics_time_wait", math.floor((time - task_score) * 1000))
109120
110121
return cjson.encode({
111122
success = true,

yatq/queue.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(
8282
self.metrics_resurrected_key = f"{self._key_prefix}:metrics:resurrected"
8383
self.metrics_buried_key = f"{self._key_prefix}:metrics:buried"
8484
self.metrics_broken_key = f"{self._key_prefix}:metrics:broken"
85+
self.metrics_time_wait = f"{self._key_prefix}:metrics:time_wait"
8586
self.environment = {
8687
"processing_key": self.processing_set_name,
8788
"pending_key": self.pending_set_name,
@@ -95,6 +96,7 @@ def __init__(
9596
"metrics_resurrected_key": self.metrics_resurrected_key,
9697
"metrics_buried_key": self.metrics_buried_key,
9798
"metrics_broken_key": self.metrics_broken_key,
99+
"metrics_time_wait": self.metrics_time_wait,
98100
"default_timeout": DEFAULT_TIMEOUT,
99101
"default_task_expiration": DEFAULT_TASK_EXPIRATION,
100102
}
@@ -227,7 +229,7 @@ async def auto_reschedule_task(
227229
if task.policy == RetryPolicy.LINEAR:
228230
delay = task.delay * task.retry_counter
229231
else:
230-
delay = task.delay ** task.retry_counter
232+
delay = task.delay**task.retry_counter
231233

232234
after_time = int(time.time()) + delay
233235
task.state = TaskState.REQUEUED

yatq/redis_compat.py

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ async def eval_sha(
1111
): # pragma: no cover
1212
return await client.evalsha(digest, 0, *args)
1313

14-
1514
else: # pragma: no cover
1615
from aioredis.errors import ReplyError
1716

0 commit comments

Comments
 (0)