-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmqtt_publish_gate.py
More file actions
85 lines (63 loc) · 2.25 KB
/
mqtt_publish_gate.py
File metadata and controls
85 lines (63 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import logging
import logging.config
from typing import Optional
import redis
from redis_db_indexes import (
redis_db_idx_last_notification_to_badge,
redis_db_idx_last_source_uuid_badge,
)
logger = logging.getLogger("console")
def should_publish_to_mqtt(redis_client: redis.Redis, badge_id: str, source_event_uuid: str = None, force_mqtt_publish: bool = False) -> bool:
"""Determines whether a message should be published to MQTT for a badge.
Parameters
----------
redis_client: redis.Redis
badge_id: str
source_event_uuid: Optional[str]
force_mqtt_publish: Optional[bool]
Returns
-------
bool
"""
if force_mqtt_publish:
return True
if source_event_uuid and _is_badge_initiated_event(redis_client, badge_id, source_event_uuid):
return True
return _is_first_publish_or_cooldown_elapsed(redis_client, badge_id)
def _is_badge_initiated_event(redis_client: redis.Redis, badge_id: str, source_event_uuid: str) -> bool:
"""Check if the event was initiated by the badge itself.
Parameters
----------
redis_client: redis.Redis
badge_id: str
source_event_uuid: str
Returns
-------
bool
"""
redis_client.select(redis_db_idx_last_source_uuid_badge)
last_known_badge_source_event = redis_client.get(badge_id)
logger.info(f"Last known badge source event: {last_known_badge_source_event} | {badge_id} | {source_event_uuid}")
if last_known_badge_source_event == source_event_uuid:
logger.info("Source event was initiated by badge. Publishing.")
return True
return False
def _is_first_publish_or_cooldown_elapsed(redis_client: redis.Redis, badge_id: str) -> bool:
"""Check if this is the first publish for this badge or if cooldown period has elapsed.
Parameters
----------
redis_client: redis.Redis
badge_id: str
Returns
-------
bool
"""
ttl_seconds = 60
# Get last publish time from Redis
redis_client.select(redis_db_idx_last_notification_to_badge)
last_publish_time = redis_client.get(badge_id)
logger.info(f"last_publish_time: {last_publish_time} | {badge_id}")
if last_publish_time is None:
redis_client.setex(badge_id, ttl_seconds, 1)
return True
return False