-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimer.py
More file actions
39 lines (31 loc) · 1.02 KB
/
timer.py
File metadata and controls
39 lines (31 loc) · 1.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import time
import asyncio
from typing import Callable
class Timer:
def __init__(self, timeout_secs: int, callback: Callable=None):
assert timeout_secs > 0
if not callback:
def callback():
raise TimeoutError()
self.timeout_secs = timeout_secs
self.start_time = None
def start(self) -> 'Timer':
self.start_time = time.time()
return self
def validate(self):
if not self.start_time:
print("Timer error: validate() called before calling start()")
return
now = time.time()
if now - self.start_time > self.timeout_secs:
self.callback()
class AsyncTimer(Timer):
def start(self):
loop = asyncio.get_event_loop()
loop.create_task(self.__start())
if not loop.is_running():
print("Timer error: no running asyncio loop found")
loop.run_forever()
async def __start(self):
await asyncio.sleep(self.timeout_secs)
self.callback()