|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 |
|
17 | 17 | __author__ = "Tobias Kraft" |
18 | | -__version__ = "1.96" |
| 18 | +__version__ = "1.97" |
19 | 19 |
|
20 | 20 | import time |
21 | 21 | from requests.sessions import Session |
|
33 | 33 | import argparse |
34 | 34 | import subprocess |
35 | 35 | from config_provider import ConfigFileConfigProvider, MqttHandler, ConfigProviderChain |
| 36 | +import json |
36 | 37 |
|
37 | 38 | session = Session() |
38 | 39 | logging.basicConfig( |
@@ -1172,6 +1173,88 @@ def GetPowermeterWatts(self): |
1172 | 1173 | return CastToInt(power) |
1173 | 1174 |
|
1174 | 1175 |
|
| 1176 | +def extract_json_value(data, path): |
| 1177 | + from jsonpath_ng import parse |
| 1178 | + jsonpath_expr = parse(path) |
| 1179 | + match = jsonpath_expr.find(data) |
| 1180 | + if match: |
| 1181 | + return int(float(match[0].value)) |
| 1182 | + else: |
| 1183 | + raise ValueError("No match found for the JSON path") |
| 1184 | + |
| 1185 | + |
| 1186 | +class MqttPowermeter(Powermeter): |
| 1187 | + def __init__( |
| 1188 | + self, |
| 1189 | + broker: str, |
| 1190 | + port: int, |
| 1191 | + topic_incoming: str, |
| 1192 | + json_path_incoming: str = None, |
| 1193 | + topic_outgoing: str = None, |
| 1194 | + json_path_outgoing: str = None, |
| 1195 | + username: str = None, |
| 1196 | + password: str = None, |
| 1197 | + ): |
| 1198 | + self.broker = broker |
| 1199 | + self.port = port |
| 1200 | + self.topic_incoming = topic_incoming |
| 1201 | + self.json_path_incoming = json_path_incoming |
| 1202 | + self.topic_outgoing = topic_outgoing |
| 1203 | + self.json_path_outgoing = json_path_outgoing |
| 1204 | + self.username = username |
| 1205 | + self.password = password |
| 1206 | + self.value_incoming = None |
| 1207 | + self.value_outgoing = None |
| 1208 | + |
| 1209 | + # Initialize MQTT client |
| 1210 | + import paho.mqtt.client as mqtt |
| 1211 | + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) |
| 1212 | + if self.username and self.password: |
| 1213 | + self.client.username_pw_set(self.username, self.password) |
| 1214 | + self.client.on_connect = self.on_connect |
| 1215 | + self.client.on_message = self.on_message |
| 1216 | + |
| 1217 | + # Connect to the broker |
| 1218 | + self.client.connect(self.broker, self.port) |
| 1219 | + self.client.loop_start() |
| 1220 | + |
| 1221 | + def on_connect(self, client, userdata, flags, reason_code, properties): |
| 1222 | + logger.info(f"Connected with result code {reason_code}") |
| 1223 | + # Subscribe to the topics |
| 1224 | + client.subscribe(self.topic_incoming) |
| 1225 | + logger.info(f"Subscribed to topic {self.topic_incoming}") |
| 1226 | + if self.topic_outgoing and self.topic_outgoing != self.topic_incoming: |
| 1227 | + client.subscribe(self.topic_outgoing) |
| 1228 | + logger.info(f"Subscribed to topic {self.topic_outgoing}") |
| 1229 | + |
| 1230 | + def on_message(self, client, userdata, msg): |
| 1231 | + payload = msg.payload.decode() |
| 1232 | + try: |
| 1233 | + data = json.loads(payload) |
| 1234 | + if msg.topic == self.topic_incoming: |
| 1235 | + self.value_incoming = extract_json_value(data, self.json_path_incoming) if self.json_path_incoming else int(float(payload)) |
| 1236 | + logger.info('MQTT: Incoming power: %s Watt', self.value_incoming) |
| 1237 | + elif msg.topic == self.topic_outgoing: |
| 1238 | + self.value_outgoing = extract_json_value(data, self.json_path_outgoing) if self.json_path_outgoing else int(float(payload)) |
| 1239 | + logger.info('MQTT: Outgoing power: %s Watt', self.value_outgoing) |
| 1240 | + except json.JSONDecodeError: |
| 1241 | + print("Failed to decode JSON") |
| 1242 | + |
| 1243 | + def GetPowermeterWatts(self): |
| 1244 | + if self.value_incoming is None: |
| 1245 | + self.wait_for_message("incoming") |
| 1246 | + if self.topic_outgoing and self.value_outgoing is None: |
| 1247 | + self.wait_for_message("outgoing") |
| 1248 | + |
| 1249 | + return self.value_incoming - (self.value_outgoing if self.value_outgoing is not None else 0) |
| 1250 | + |
| 1251 | + def wait_for_message(self, message_type, timeout=5): |
| 1252 | + start_time = time.time() |
| 1253 | + while (message_type == "incoming" and self.value_incoming is None) or (message_type == "outgoing" and self.value_outgoing is None): |
| 1254 | + if time.time() - start_time > timeout: |
| 1255 | + raise TimeoutError(f"Timeout waiting for MQTT {message_type} message") |
| 1256 | + time.sleep(1) |
| 1257 | + |
1175 | 1258 | def CreatePowermeter() -> Powermeter: |
1176 | 1259 | shelly_ip = config.get('SHELLY', 'SHELLY_IP') |
1177 | 1260 | shelly_user = config.get('SHELLY', 'SHELLY_USER') |
@@ -1244,6 +1327,17 @@ def CreatePowermeter() -> Powermeter: |
1244 | 1327 | return AmisReader( |
1245 | 1328 | config.get('AMIS_READER', 'AMIS_READER_IP') |
1246 | 1329 | ) |
| 1330 | + elif config.getboolean('SELECT_POWERMETER', 'USE_MQTT'): |
| 1331 | + return MqttPowermeter( |
| 1332 | + config.get('MQTT_POWERMETER', 'MQTT_BROKER', fallback=config.get("MQTT_CONFIG", "MQTT_BROKER", fallback=None)), |
| 1333 | + config.getint('MQTT_POWERMETER', 'MQTT_PORT', fallback=config.getint("MQTT_CONFIG", "MQTT_PORT", fallback=1883)), |
| 1334 | + config.get('MQTT_POWERMETER', 'MQTT_TOPIC_INCOMING'), |
| 1335 | + config.get('MQTT_POWERMETER', 'MQTT_JSON_PATH_INCOMING', fallback=None), |
| 1336 | + config.get('MQTT_POWERMETER', 'MQTT_TOPIC_OUTGOING', fallback=None), |
| 1337 | + config.get('MQTT_POWERMETER', 'MQTT_JSON_PATH_OUTGOING', fallback=None), |
| 1338 | + config.get('MQTT_POWERMETER', 'MQTT_USERNAME', fallback=config.get('MQTT_CONFIG', 'MQTT_USERNAME', fallback=None)), |
| 1339 | + config.get('MQTT_POWERMETER', 'MQTT_PASSWORD', fallback=config.get('MQTT_CONFIG', 'MQTT_PASSWORD', fallback=None)) |
| 1340 | + ) |
1247 | 1341 | else: |
1248 | 1342 | raise Exception("Error: no powermeter defined!") |
1249 | 1343 |
|
@@ -1325,7 +1419,18 @@ def CreateIntermediatePowermeter(dtu: DTU) -> Powermeter: |
1325 | 1419 | config.get('INTERMEDIATE_SCRIPT', 'SCRIPT_IP_INTERMEDIATE'), |
1326 | 1420 | config.get('INTERMEDIATE_SCRIPT', 'SCRIPT_USER_INTERMEDIATE'), |
1327 | 1421 | config.get('INTERMEDIATE_SCRIPT', 'SCRIPT_PASS_INTERMEDIATE') |
1328 | | - ) |
| 1422 | + ) |
| 1423 | + elif config.getboolean('SELECT_INTERMEDIATE_METER', 'USE_MQTT_INTERMEDIATE'): |
| 1424 | + return MqttPowermeter( |
| 1425 | + config.get('INTERMEDIATE_MQTT', 'MQTT_BROKER', fallback=config.get("MQTT_CONFIG", "MQTT_BROKER", fallback=None)), |
| 1426 | + config.getint('INTERMEDIATE_MQTT', 'MQTT_PORT', fallback=config.getint("MQTT_CONFIG", "MQTT_PORT", fallback=1883)), |
| 1427 | + config.get('INTERMEDIATE_MQTT', 'MQTT_TOPIC_INCOMING'), |
| 1428 | + config.get('INTERMEDIATE_MQTT', 'MQTT_JSON_PATH_INCOMING', fallback=None), |
| 1429 | + config.get('INTERMEDIATE_MQTT', 'MQTT_TOPIC_OUTGOING', fallback=None), |
| 1430 | + config.get('INTERMEDIATE_MQTT', 'MQTT_JSON_PATH_OUTGOING', fallback=None), |
| 1431 | + config.get('INTERMEDIATE_MQTT', 'MQTT_USERNAME', fallback=config.get("MQTT_CONFIG", "MQTT_USERNAME", fallback=None)), |
| 1432 | + config.get('INTERMEDIATE_MQTT', 'MQTT_PASSWORD', fallback=config.get("MQTT_CONFIG", "MQTT_PASSWORD", fallback=None)) |
| 1433 | + ) |
1329 | 1434 | elif config.getboolean('SELECT_INTERMEDIATE_METER', 'USE_AMIS_READER_INTERMEDIATE'): |
1330 | 1435 | return AmisReader( |
1331 | 1436 | config.get('INTERMEDIATE_AMIS_READER', 'AMIS_READER_IP_INTERMEDIATE') |
|
0 commit comments