-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmqtt_publish_payload.py
More file actions
149 lines (112 loc) · 3.49 KB
/
mqtt_publish_payload.py
File metadata and controls
149 lines (112 loc) · 3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import json
import logging.config
from typing import Any, Dict
import redis
from datatypes import transform_cyberpartner_dict
logger = logging.getLogger("console")
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
SERVICES_TO_UPDATE_INVENTORY = ["svc-cp-inventory", "svc-game-store"]
def build_mqtt_payload(redis_client_cp_data: redis.Redis, redis_client_inventory: redis.Redis, badge_id: str, msg: Dict) -> dict | None:
"""Build the MQTT payload based on message type and source service.
Parameters
----------
redis_client_cp_data: redis.Redis
redis_client_inventory: redis.Redis
badge_id: str
msg: Dict
Returns
-------
dict | None
"""
from_service = msg.get("event_source", "")
event_type = msg.get("event_type", "")
event_subtype = msg.get("event_subtype", "")
# Handle store sync events
if event_type == "store.sync":
return _handle_store_sync(msg)
# Handle state request events
if event_subtype == "get.state":
return _handle_get_state(msg)
# Handle inventory update events
if from_service in SERVICES_TO_UPDATE_INVENTORY:
return _handle_inventory_update(redis_client_inventory, badge_id)
# Handle default case - cyberpartner state update
return _handle_cyberpartner_update(redis_client_cp_data, badge_id, msg)
def _handle_store_sync(msg: Dict) -> dict:
"""Process store sync event payload.
Parameters
----------
msg: Dict
Returns
-------
Dict
"""
_obj = msg.get("store")
if isinstance(_obj, str):
_obj = json.loads(_obj)
return _obj
def _handle_get_state(msg: Dict) -> dict | None:
"""Process get.state event payload.
Parameters
----------
msg: Dict
Returns
-------
dict | None
"""
cp_obj = msg.get("cp_obj")
if not cp_obj:
logger.error("Did not pass cyberpartner object.")
return None
return _transform_cyberpartner_object(cp_obj)
def _handle_inventory_update(redis_client: redis.Redis, badge_id: str) -> dict | None:
"""Process inventory update event payload.
Parameters
----------
badge_id: str
Returns
-------
dict | None
"""
inventory = redis_client.get(badge_id)
if not inventory:
logger.error(f"Could not find CP inventory with badge id: {badge_id}")
return None
if isinstance(inventory, str):
inventory = json.loads(inventory)
return inventory
def _handle_cyberpartner_update(redis_client: redis.Redis, badge_id: str, msg: Dict[str, Any]) -> dict | None:
"""Process cyberpartner update event payload.
Parameters
----------
badge_id: str
msg: Dict
Returns
-------
dict
"""
cp_obj = msg.get("cp_obj")
if not cp_obj:
cp_obj = redis_client.get(badge_id)
if not cp_obj:
logger.error(f"Could not find CP with badge id: {badge_id}")
return {}
return _transform_cyberpartner_object(cp_obj)
def _transform_cyberpartner_object(cp_obj: Dict[str, Any]) -> dict | None:
"""Transform cyberpartner object to MQTT payload format.
Parameters
----------
cp_obj: Union[Dict, str]
Returns
-------
dict | None
"""
if isinstance(cp_obj, str):
cp_obj = json.loads(cp_obj)
try:
cp = transform_cyberpartner_dict(cp_obj)
return cp.mqtt_payload()
except AttributeError as ae:
logger.error(f"Error transforming cyberpartner: {str(ae)}")
logger.error(f"CP obj: {cp_obj}")
return None