This repository was archived by the owner on Feb 1, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathLockingCounter_test.py
More file actions
48 lines (43 loc) · 1.41 KB
/
LockingCounter_test.py
File metadata and controls
48 lines (43 loc) · 1.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import time
import unittest
from LockingCounter import LockingCounter
import threading
class LockingCounterTestCase(unittest.TestCase):
def setUp(self) -> None:
"""
Create counter starting at 0
"""
self.counter = LockingCounter()
def test_wait_for(self) -> None:
"""
Tests a single wait_for cycle
"""
# Generate a thread
t1 = threading.Thread(target=self.wait_thread, args=(5,))
t1.start()
# Keep incrementing until target is met
self.counter.toggle()
self.assertTrue(t1.is_alive())
self.counter.toggle()
self.assertTrue(t1.is_alive())
self.counter.toggle()
self.assertTrue(t1.is_alive())
self.counter.toggle()
self.assertTrue(t1.is_alive())
self.counter.toggle()
time.sleep(0.1)
self.assertFalse(t1.is_alive())
# Generate another thread when target is met
t1 = threading.Thread(target=self.wait_thread, args=(5,))
self.assertFalse(t1.is_alive())
t1 = threading.Thread(target=self.wait_thread, args=(0,))
self.assertFalse(t1.is_alive())
def wait_thread(self, target:int)->None:
"""
Wait until target is met, to be used with threading
Args:
target (int): target to wait for
"""
self.counter.wait_until(target)
if __name__ == '__main__':
unittest.main()