Skip to content

Commit 3a94cad

Browse files
authored
Merge pull request #6 from agessaman/dev/stats-integration
Tested via BLE and TCP.
2 parents 83904e9 + 0132af9 commit 3a94cad

File tree

3 files changed

+157
-5
lines changed

3 files changed

+157
-5
lines changed

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ bash <(curl -fsSL https://raw.githubusercontent.com/agessaman/meshcore-packet-ca
3939
- **Connection Types**: Supports BLE, serial, and TCP connections to Companion radios
4040
- **Packet Analysis**: Parses packet headers, routes, payloads, and metadata
4141
- **RF Data**: Captures signal quality metrics (SNR, RSSI)
42+
- **Status Telemetry Stats**: MQTT status messages optionally contain battery/uptime/radio metrics
4243
- **Multi-Broker MQTT**: Supports up to 4 MQTT brokers simultaneously
4344
- **Auth Token Authentication**: JWT-based authentication using device private key
4445
- **TLS/WebSocket Support**: Secure connections with TLS/SSL and WebSocket transport
@@ -48,7 +49,7 @@ bash <(curl -fsSL https://raw.githubusercontent.com/agessaman/meshcore-packet-ca
4849
## Requirements
4950

5051
- Python 3.7+
51-
- `meshcore` package (official MeshCore Python library)
52+
- `meshcore` package (official MeshCore Python library) version 2.2.2 or later (required for stats support)
5253
- `paho-mqtt` package (for MQTT functionality)
5354

5455
**Note**: For Docker deployment, this application is best deployed on Linux systems due to Bluetooth Low Energy (BLE) and serial device access requirements. While Docker containers can run on macOS and Windows, BLE functionality may be limited or require additional configuration.
@@ -116,6 +117,12 @@ Configuration is handled via environment variables and `.env` files. The install
116117
- `PACKETCAPTURE_LOG_LEVEL`: Log level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`) - default: `INFO`
117118
- Command line arguments (`--debug`, `--verbose`) override this setting
118119

120+
#### Status Telemetry / Stats
121+
- `PACKETCAPTURE_STATS_IN_STATUS_ENABLED`: Toggle stat collection in status payloads (default: `true`)
122+
- `PACKETCAPTURE_STATS_REFRESH_INTERVAL`: Seconds between stat refreshes/status republishes (default: `300`, i.e. 5 minutes)
123+
124+
When enabled, status messages published to MQTT include a `stats` object with battery, uptime, queue depth, and radio runtime metrics refreshed at the configured cadence.
125+
119126
#### MQTT Settings
120127
The script supports up to 4 MQTT brokers (MQTT1, MQTT2, MQTT3, MQTT4). Each broker can be configured independently:
121128

packet_capture.py

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,16 @@ def __init__(self, output_file: Optional[str] = None, verbose: bool = False, deb
232232
self.mqtt_connected = False
233233
self.should_exit = False # Flag to exit when reconnection attempts fail
234234

235+
# Stats/status publishing
236+
self.stats_status_enabled = self.get_env_bool('STATS_IN_STATUS_ENABLED', True)
237+
self.stats_refresh_interval = self.get_env_int('STATS_REFRESH_INTERVAL', 300) # seconds
238+
self.latest_stats = None
239+
self.last_stats_fetch = 0
240+
self.stats_supported = False
241+
self.stats_capability_state = None
242+
self.stats_update_task = None
243+
self.stats_fetch_lock = asyncio.Lock()
244+
235245
# Service-level failure tracking for systemd restart
236246
self.service_failure_count = 0
237247
self.max_service_failures = self.get_env_int('MAX_SERVICE_FAILURES', 3)
@@ -1788,7 +1798,7 @@ def disconnect_mqtt(self):
17881798

17891799

17901800

1791-
async def publish_status(self, status, client=None, broker_num=None):
1801+
async def publish_status(self, status, client=None, broker_num=None, refresh_stats=True):
17921802
"""Publish status with additional information"""
17931803
firmware_info = await self.get_firmware_info()
17941804
status_msg = {
@@ -1801,13 +1811,137 @@ async def publish_status(self, status, client=None, broker_num=None):
18011811
"radio": self.radio_info or "unknown",
18021812
"client_version": self._load_client_version()
18031813
}
1814+
1815+
# Attach stats (online status only) if supported and enabled
1816+
if (
1817+
status.lower() == "online"
1818+
and self.stats_status_enabled
1819+
):
1820+
stats_payload = None
1821+
if refresh_stats:
1822+
# Always force refresh stats right before publishing to ensure fresh data
1823+
stats_payload = await self.refresh_stats(force=True)
1824+
if not stats_payload:
1825+
self.logger.debug("Stats refresh returned no data - stats will not be included in status message")
1826+
elif self.latest_stats:
1827+
stats_payload = dict(self.latest_stats)
1828+
1829+
if stats_payload:
1830+
status_msg["stats"] = stats_payload
1831+
elif self.debug:
1832+
self.logger.debug("No stats payload available - status message will not include stats")
1833+
18041834
if client:
18051835
self.safe_publish(None, json.dumps(status_msg), retain=True, client=client, broker_num=broker_num, topic_type="status")
18061836
else:
18071837
self.safe_publish(None, json.dumps(status_msg), retain=True, topic_type="status")
18081838
if self.debug:
18091839
self.logger.debug(f"Published status: {status}")
18101840

1841+
def stats_commands_available(self) -> bool:
1842+
"""Detect whether the connected meshcore build exposes stats commands."""
1843+
if not self.meshcore or not hasattr(self.meshcore, "commands"):
1844+
return False
1845+
1846+
commands = self.meshcore.commands
1847+
required = ["get_stats_core", "get_stats_radio"]
1848+
available = all(callable(getattr(commands, attr, None)) for attr in required)
1849+
state = "available" if available else "missing"
1850+
if state != self.stats_capability_state:
1851+
if available:
1852+
self.logger.info("MeshCore stats commands detected - status messages will include device stats")
1853+
else:
1854+
self.logger.info("MeshCore stats commands not available - skipping stats in status messages")
1855+
self.stats_capability_state = state
1856+
self.stats_supported = available
1857+
return available
1858+
1859+
async def refresh_stats(self, force: bool = False):
1860+
"""Fetch stats from the radio and cache them for status publishing."""
1861+
if not self.stats_status_enabled:
1862+
if self.debug:
1863+
self.logger.debug("Stats refresh skipped: stats_status_enabled is False")
1864+
return None
1865+
1866+
if not self.meshcore or not self.meshcore.is_connected:
1867+
if self.debug:
1868+
self.logger.debug("Stats refresh skipped: meshcore not connected")
1869+
return None
1870+
1871+
if self.stats_refresh_interval <= 0:
1872+
if self.debug:
1873+
self.logger.debug("Stats refresh skipped: stats_refresh_interval is 0 or negative")
1874+
return None
1875+
1876+
if not self.stats_commands_available():
1877+
if self.debug:
1878+
self.logger.debug("Stats refresh skipped: stats commands not available")
1879+
return None
1880+
1881+
now = time.time()
1882+
if (
1883+
not force
1884+
and self.latest_stats
1885+
and (now - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2)
1886+
):
1887+
return dict(self.latest_stats)
1888+
1889+
async with self.stats_fetch_lock:
1890+
# Another coroutine may have completed the refresh while we waited
1891+
if (
1892+
not force
1893+
and self.latest_stats
1894+
and (time.time() - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2)
1895+
):
1896+
return dict(self.latest_stats)
1897+
1898+
stats_payload = {}
1899+
try:
1900+
core_result = await self.meshcore.commands.get_stats_core()
1901+
if core_result.type == EventType.STATS_CORE and core_result.payload:
1902+
stats_payload.update(core_result.payload)
1903+
elif core_result.type == EventType.ERROR:
1904+
self.logger.debug(f"Core stats unavailable: {core_result.payload}")
1905+
except Exception as exc:
1906+
self.logger.debug(f"Error fetching core stats: {exc}")
1907+
1908+
try:
1909+
radio_result = await self.meshcore.commands.get_stats_radio()
1910+
if radio_result.type == EventType.STATS_RADIO and radio_result.payload:
1911+
stats_payload.update(radio_result.payload)
1912+
elif radio_result.type == EventType.ERROR:
1913+
self.logger.debug(f"Radio stats unavailable: {radio_result.payload}")
1914+
except Exception as exc:
1915+
self.logger.debug(f"Error fetching radio stats: {exc}")
1916+
1917+
if stats_payload:
1918+
self.latest_stats = stats_payload
1919+
self.last_stats_fetch = time.time()
1920+
if self.debug:
1921+
self.logger.debug(f"Updated stats cache: {self.latest_stats}")
1922+
elif self.debug:
1923+
self.logger.debug("Stats refresh completed but returned no data")
1924+
1925+
return dict(self.latest_stats) if self.latest_stats else None
1926+
1927+
async def stats_refresh_scheduler(self):
1928+
"""Periodically refresh stats and publish them via MQTT."""
1929+
if self.stats_refresh_interval <= 0 or not self.stats_status_enabled:
1930+
return
1931+
1932+
while not self.should_exit:
1933+
try:
1934+
# Only fetch stats when we're about to publish status
1935+
if self.enable_mqtt and self.mqtt_connected:
1936+
await self.publish_status("online", refresh_stats=True)
1937+
except asyncio.CancelledError:
1938+
break
1939+
except Exception as exc:
1940+
self.logger.debug(f"Stats refresh error: {exc}")
1941+
1942+
if await self.wait_with_shutdown(self.stats_refresh_interval):
1943+
break
1944+
18111945
def safe_publish(self, topic, payload, retain=False, client=None, broker_num=None, topic_type=None):
18121946
"""Publish to one or all MQTT brokers and return publish metrics."""
18131947
metrics = {"attempted": 0, "succeeded": 0}
@@ -2511,6 +2645,10 @@ async def start(self):
25112645
if self.jwt_renewal_interval > 0:
25122646
self.jwt_renewal_task = asyncio.create_task(self.jwt_renewal_scheduler())
25132647

2648+
# Start stats refresh scheduler
2649+
if self.stats_status_enabled and self.stats_refresh_interval > 0:
2650+
self.stats_update_task = asyncio.create_task(self.stats_refresh_scheduler())
2651+
25142652

25152653
try:
25162654
while not self.should_exit:
@@ -2545,6 +2683,8 @@ async def start(self):
25452683
self.advert_task.cancel()
25462684
if self.jwt_renewal_task:
25472685
self.jwt_renewal_task.cancel()
2686+
if self.stats_update_task:
2687+
self.stats_update_task.cancel()
25482688

25492689
# Cancel all tracked active tasks
25502690
for task in self.active_tasks.copy():
@@ -2565,6 +2705,11 @@ async def start(self):
25652705
await self.jwt_renewal_task
25662706
except asyncio.CancelledError:
25672707
pass
2708+
if self.stats_update_task:
2709+
try:
2710+
await self.stats_update_task
2711+
except asyncio.CancelledError:
2712+
pass
25682713

25692714
# Wait for all active tasks to complete
25702715
if self.active_tasks:
@@ -2580,7 +2725,7 @@ async def stop(self):
25802725
try:
25812726
# Publish offline status with timeout
25822727
if self.enable_mqtt and self.mqtt_connected:
2583-
await asyncio.wait_for(self.publish_status("offline"), timeout=5.0)
2728+
await asyncio.wait_for(self.publish_status("offline", refresh_stats=False), timeout=5.0)
25842729
except asyncio.TimeoutError:
25852730
self.logger.warning("Timeout publishing offline status")
25862731
except Exception as e:

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Core dependencies for MeshCore packet capture
22
paho-mqtt>=1.6.0
33

4-
# MeshCore package - now using PyPI version
5-
meshcore>=2.1.10
4+
# MeshCore package - requires 2.2.2+ for stats command support
5+
meshcore>=2.2.2
66

77
# MeshCore package dependencies (installed automatically with meshcore)
88
bleak>=0.21.0

0 commit comments

Comments
 (0)