From 03e095b0b21d848b2c905b4a477371b5cf92d49e Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Thu, 2 Jan 2025 13:27:31 -0800 Subject: [PATCH] MQTT Last Will and Testament Register a message with the MQTT broker to publish if the client loses connection. Also publish online/offline status - all to a `status` topic. avoid global variable --- RX_FSK/src/conn-mqtt.cpp | 29 +++++++++++++++++++++-------- RX_FSK/src/conn-mqtt.h | 6 ++++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/RX_FSK/src/conn-mqtt.cpp b/RX_FSK/src/conn-mqtt.cpp index d4277dde..a1c50234 100644 --- a/RX_FSK/src/conn-mqtt.cpp +++ b/RX_FSK/src/conn-mqtt.cpp @@ -37,7 +37,6 @@ TimerHandle_t mqttReconnectTimer; extern t_wifi_state wifi_state; char time_str[32]; - /* Global initalization (on TTGO startup) */ void MQTT::init() { } @@ -70,10 +69,16 @@ void MQTT::netsetup() { if (strlen(sonde.config.mqtt.password) > 0) { mqttClient.setCredentials(sonde.config.mqtt.username, sonde.config.mqtt.password); } + + char lwt[128]; + snprintf(lwt, sizeof(lwt), "%sstatus", sonde.config.mqtt.prefix); + mqttClient.setWill(lwt, MQTT_QOS, MQTT_RETAIN_TRUE, "lost connection"); + MQTT::connectToMqtt(); } void MQTT::netshutdown() { + publishLwt("offline"); mqttClient.disconnect(false); // nice shutdown.... delay(200); mqttClient.disconnect(true); // force @@ -91,6 +96,7 @@ void MQTT::updateStation( PosInfo *pi ) { unsigned long now = millis(); if ( (lastMqttUptime == 0) || (now - lastMqttUptime >= sonde.config.mqtt.report_interval) ) { MQTT::connectToMqtt(); + publishLwt("online"); publishUptime(); publishPmuInfo(); publishGps(); @@ -106,6 +112,12 @@ int MQTT::mqttGate(uint flag){ return ((sonde.config.mqtt.active & flag) && mqttClient.connected()); } +void MQTT::publishLwt(const char *message) { + char lwt[128]; + snprintf(lwt, sizeof(lwt), "%sstatus", sonde.config.mqtt.prefix); + mqttClient.publish(lwt, MQTT_QOS, MQTT_RETAIN_TRUE, message); +} + int MQTT::connectToMqtt() { if(mqttClient.connected()) return 1; @@ -115,6 +127,7 @@ int MQTT::connectToMqtt() { return 0; LOG_D(TAG, "MQTT not connected, connecting...."); mqttClient.connect(); + publishLwt("online"); return 1; } @@ -160,7 +173,7 @@ void MQTT::publishUptime() LOG_D(TAG, "publishUptime: sending %s\n", payload); char topic[128]; snprintf(topic, 128, "%s%s", sonde.config.mqtt.prefix, "uptime"); - mqttClient.publish(topic, 1, 1, payload); + mqttClient.publish(topic, MQTT_QOS, MQTT_RETAIN_TRUE, payload); } void MQTT::publishPmuInfo() @@ -187,7 +200,7 @@ void MQTT::publishPmuInfo() char topic[128]; snprintf(topic, sizeof(topic), "%s%s", sonde.config.mqtt.prefix, "pmu"); - mqttClient.publish(topic, 1, 1, payload); + mqttClient.publish(topic, MQTT_QOS, MQTT_RETAIN_TRUE, payload); } @@ -209,7 +222,7 @@ void MQTT::publishGps() char topic[128]; snprintf(topic, sizeof(topic), "%s%s", sonde.config.mqtt.prefix, "gps"); - mqttClient.publish(topic, 1, 1, payload); + mqttClient.publish(topic, MQTT_QOS, MQTT_RETAIN_TRUE, payload); } @@ -225,7 +238,7 @@ void MQTT::publishPeak(double pf, int rssi) char topic[128]; snprintf(topic, sizeof(topic), "%s%s", sonde.config.mqtt.prefix, "spectrum"); - mqttClient.publish(topic, 1, /* retain */ false, payload); + mqttClient.publish(topic, MQTT_QOS_NONE, MQTT_RETAIN_FALSE, payload); } // What's the scanner looking at? @@ -243,7 +256,7 @@ void MQTT::publishQRG(int num, const char* type, char* launchsite, float mhz) char topic[128]; snprintf(topic, sizeof(topic), "%s%s", sonde.config.mqtt.prefix, "qrg"); - mqttClient.publish(topic, 1, /*retain*/ false, payload); + mqttClient.publish(topic, MQTT_QOS_NONE, MQTT_RETAIN_FALSE, payload); } @@ -257,7 +270,7 @@ void MQTT::publishDebug(char *debugmsg) snprintf(payload, 256, "{\"msg\": %s}", debugmsg); char topic[128]; snprintf(topic, sizeof(topic), "%s%s", sonde.config.mqtt.prefix, "debug"); - mqttClient.publish(topic, 1, /*retain*/ false, payload); + mqttClient.publish(topic, MQTT_QOS_NONE, MQTT_RETAIN_FALSE, payload); } void MQTT::publishPacket(SondeInfo *si) @@ -279,7 +292,7 @@ void MQTT::publishPacket(SondeInfo *si) char topic[128]; snprintf(topic, 128, "%s%s", sonde.config.mqtt.prefix, "packet"); LOG_D(TAG, "publishPacket: %s\n", payload); - mqttClient.publish(topic, 1, 1, payload); + mqttClient.publish(topic, MQTT_QOS, MQTT_RETAIN_TRUE, payload); } String MQTT::getStatus() { diff --git a/RX_FSK/src/conn-mqtt.h b/RX_FSK/src/conn-mqtt.h index cc44ebf5..982932b0 100644 --- a/RX_FSK/src/conn-mqtt.h +++ b/RX_FSK/src/conn-mqtt.h @@ -17,6 +17,11 @@ #define MQTT_SEND_DEBUG 0x80 #define MQTT_SEND_ANY (MQTT_SEND_UPTIME|MQTT_SEND_SONDE|MQTT_SEND_PMU|MQTT_SEND_GPS|MQTT_SEND_RFINFO|MQTT_SEND_DEBUG) +#define MQTT_QOS_NONE 0 +#define MQTT_QOS 1 +#define MQTT_RETAIN_TRUE true +#define MQTT_RETAIN_FALSE false + class MQTT : public Conn { public: @@ -62,6 +67,7 @@ class MQTT : public Conn void publishUptime(); void publishPmuInfo(); void publishGps(); + void publishLwt(const char *message); void timeFormat(); int mqttGate(uint flag); int connectToMqtt();