-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.py
More file actions
100 lines (74 loc) · 2.47 KB
/
pool.py
File metadata and controls
100 lines (74 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from abc import ABC, abstractmethod
import asyncio
from enum import Enum
from typing import override
import inspect
from collections import defaultdict
from server.streaming.sensors import SensorData, DataType
class Topic(Enum):
SENSORDATA = "SENSORDATA"
class DenoiseMethod(ABC):
"""
Solutions for handling noisy sensor inputs
"""
@abstractmethod
def __call__(self, data) -> float: ...
class MovingAverage(DenoiseMethod):
def __init__(self, window_size):
self.window_size = window_size
self.window = [0] * window_size
self.current_index = 0
def add(self, data):
self.window[self.current_index] = data
self.current_index += 1
self.current_index %= self.window_size
def rolling_average(self) -> float:
"""
self explanatory
"""
return sum(self.window) / self.window_size
@override
def __call__(self, data) -> float:
self.add(data)
return self.rolling_average()
class Ema(DenoiseMethod):
def __init__(self, alpha=0.2) -> None:
self.alpha = alpha
self.previous = 0
def ema(self, data) -> float:
"""
Exponential moving average
Good for real-time smoothing
"""
current = self.alpha * data + (1 - self.alpha) * self.previous
self.previous = current
return current
@override
def __call__(self, data) -> float:
return self.ema(data)
class Datapool:
def __init__(self, loop: asyncio.AbstractEventLoop):
"""
Initializes the Datapool with an active asyncio event loop.
"""
self.loop = loop
self.subscribers = defaultdict(set)
def subscribe(self, topic: Topic, callback):
"""
Registers a subscriber to a specific topic.
The callback must be an async function.
"""
if not inspect.iscoroutinefunction(callback):
raise TypeError(
f"Subscriber callback for '{topic}' must be an async function."
)
self.subscribers[topic.value].add(callback)
def publish(self, topic: Topic, data: SensorData):
"""
Synchronously publishes data to a topic.
Schedules the async subscribers to execute on the provided event loop.
"""
if topic.value not in self.subscribers.keys():
return
for callback in self.subscribers[topic.value]:
asyncio.run_coroutine_threadsafe(callback(data), self.loop)