Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ An effective PC client and server is [mosquitto](https://mosquitto.org/).

# This repository

This contains two separate projects:
This contains three separate projects:
1. A "resilient" asynchronous non-blocking MQTT driver.
2. A means of using a cheap ESP8266 module to bring MQTT to MicroPython
platforms which lack a WiFi interface.
3. A basic network hardware controller (WLAN) which the mqtt client uses

## 1. The "resilient" driver

Expand Down
Empty file added mqtt_as/__init__.py
Empty file.
92 changes: 92 additions & 0 deletions mqtt_as/interfaces/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from uerrno import EINPROGRESS, ETIMEDOUT
import usocket
import uasyncio as asyncio


async def _g():
pass


_type_coro = type(_g())


# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, _type_coro):
res = asyncio.create_task(res)
return res
Comment on lines +13 to +20
Copy link

@spacemanspiff2007 spacemanspiff2007 Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I get while you would want functions I deliberately chose to allow only async functions.
If you have a longer task and and you launch it with create task and it's duration is e.g. 10 secs you have a state change the next callback will run in parallel to the already launched tasks and anything can happen.
If you only allow async functions then you can at least try to cancel them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I think you somehow mixed callbacks and coroutines in this answer and now it doesn't really make much sense but I think I get your point.
Callbacks would be the better option for wifi state changes because they have to be executed quickly without any delays in it.
Coroutines can cause the problem of multiple coroutines running in parallel if the state changes quickly, which is quite possible. Cancelling those is not implemented in this simple interface and the user can't cancel them either because the tasks are not bound anywhere.

Personally I'd go with callbacks only and not allow coroutines but I have to think about backwards compatibility and the original mqtt_as has a wifi_coro, a coroutine. So I have to allow it in order to not break existing implementations.

We have the same problem with the connected_coro in mqtt_as, which isn't a callback either. The only reason neither coroutines cause trouble in mqtt_as is that there's a 5 seconds delay in the wlan connect to ensure wlan stability. Most coroutines should have finished after 5 seconds. But if we use a different interface or reduce the wlan reconnects, this will become a problem.
But again, backwards compatibility.. can't change that. Have to keep it that way. However, the wifi connected callback is now called directly after the connection is created and not after the broker was connected. This might result in more wifi callbacks as the wifi could get disconnected quickly if the connection is bad but I think it's an acceptable compromise and not many people will do much in the wifi connected callback because in mqtt_as this doesn't really mean anything because wifi will get disconnected again if the broker connection fails. (which is generally something I'd like to change in a more advanced interface because if your broker vanishes e.g. because you changed its ip adress or switched devices, your micropython device becomes inaccessible because you only have 5 seconds to establich a webrepl connection and interrupt the reconnect process..)

This is the reason why in my project, I subclassed the mqtt client and only have one connected coroutine that launches all my custom coroutines and also cancels all those tasks if the wifi gets disconnected again before it could finish (the coroutines just subscribe to the topics again, which would get paused if the client gets disconnected, so those would easily stack up).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I think you somehow mixed callbacks and coroutines in this answer and now it doesn't really make much sense but I think I get your point.

Is a coro that is called on connect not a callback? I am not sure about the correct terminology.

Personally I'd go with callbacks only and not allow coroutines but I have to think about backwards compatibility and the original mqtt_as has a wifi_coro, a coroutine. So I have to allow it in order to not break existing implementations.

If the has a long running sync callback things still can block, that's why I think a coro is better because it's immediately clear they have to be non blocking. And if it's all coros they have to be canceled by the base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks in python (afaik) are always synchronous functions. Even the asyncio module in Python uses callbacks on certain Task events and those are all synchronous functions. Therefore my distinction between a callback=synchronous function and a coroutine=asynchronous function.

If the has a long running sync callback things still can block, that's why I think a coro is better because it's immediately clear they have to be non blocking. And if it's all coros they have to be canceled by the base class.

A long sync function will block no matter where it is implemented. If someone writes blocking code, he will do so no matter if the context is a callback or a coroutine.
If someone assumes he can write blocking code while using uasyncio, he doesn't undertand the environment he's programming in.
That said, callbacks are always supposed to be short whereas when using coroutines, people might write long code, which is not blocking but can take several seconds to finish.
Of course, not a problem if that task can be cancelled, but that would require the user to be aware of it and make their code handle cancellation correctly, which could be problematic for people new to python/asyncio and therefore it's best to stay away from that. Also it would make the base class more complex, which I'd like to avoid, not many will need that feature. I myself only log the connection state to the console in my project. A very short callback.

(As mentioned elsewhere my connected coro is a lot longer and more complex and actually cancels all tasks if the connection state changes. But that isn't part of the hardware interface but mqtt_as..)



class BaseInterface:
def __init__(self, socket=None):
# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
self.BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT]
self.socket = socket or usocket # support for custom socket implementations
self._subs = []
self._state = None

async def connect(self):
"""Serve connect request. Triggers callbacks if state changes"""
if await self._connect():
self._change_state(True)
return True
return False
Comment on lines +31 to +36

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a variable that prevents the method from being called when it's already running (e.g. self._is_connecting).
Connecting can take up huge amounts of time and it doesn't make sense to have those running in parallel.
With the variable it's possible to just spawn connect tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree actually. It is not a problem if only one project/library is controlling the interface but if multiple interfaces try it, then it becomes a problem. A 2nd interface calling connect should just wait for the result of the 1st call.
However, it might be better to create a more advanced interface blueprint in order to keep this one as simple as possible and as close to the source as possible. The goal of the current files is to just separate mqtt_as from the hardware interface without adding features or complexity.
I do agree that more features and complexity is needed for a safe approach handling multiple libraries. So once the current changes/approach is approved, I should add such a blueprint (by making the wlan interfaces better).


async def _connect(self):
"""Hardware specific connect method"""
# return True # if connection is successful, otherwise False
raise NotImplementedError()

async def disconnect(self):
"""Serve disconnect request. Triggers callbacks if state changes"""
if await self._disconnect():
self._change_state(False)
return True
return False

async def _disconnect(self):
"""Hardware specific disconnect method"""
# return True # if disconnect is successful, otherwise False
raise NotImplementedError()

async def reconnect(self):
"""Serve reconnect request"""
return await self._reconnect()

async def _reconnect(self):
"""Hardware specific reconnect method"""
if await self._disconnect():
return await self._connect()
return False

def isconnected(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my choice would be is_connected because I think it's much more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I don't disagree, the micropython network module uses isconnected() and so does mqtt_as so I think it makes more sense to go with that.

""""Checks if the interface is connected. Triggers callbacks if state changes"""
st = self._isconnected()
self._change_state(st)
return st

def _isconnected(self):
"""Hardware specific isconnected method"""
raise NotImplementedError()

def _change_state(self, state):
"""
Private method executing all callbacks or creating asyncio tasks
on connection state changes
"""
st = self._state
if st != state:
self._state = state
if st is None and state is False:
# not triggering disconnect cbs when interface state was unknown
# (probably disconnected on startup)
return
for cb in self._subs:
launch(cb, (state,))

def subscribe(self, cb):
"""Subscribe to interface connection state changes"""
self._subs.append(cb)
11 changes: 11 additions & 0 deletions mqtt_as/interfaces/wlan/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from sys import platform

if platform == "esp8266":
from .esp8266 import WLAN
elif platform == "esp32":
from .esp32 import WLAN
elif platform == "pyboard":
from .pyboard import WLAN
else:
# just try esp32 implementation. Seems most mature.
from .esp32 import WLAN
41 changes: 41 additions & 0 deletions mqtt_as/interfaces/wlan/esp32.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from .. import BaseInterface
import network
import uasyncio as asyncio


class WLAN(BaseInterface):
def __init__(self, ssid, wifi_pw):
super().__init__()
self.DEBUG = False
# https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942
self.BUSY_ERRORS += [118, 119] # Add in weird ESP32 errors
self._ssid = ssid
self._wifi_pw = wifi_pw

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about self._pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about self._pw but kept it like that for now for better recognizability since it was named this way in mqtt_as. Once @peterhinch is happy with the changes, I'll shorten this variable. In this context it is very obvious what it does, so no need for longer names.

# wifi credentials required for ESP32 / Pyboard D. Optional ESP8266
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)

async def _connect(self):
s = self._sta_if
while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
await asyncio.sleep(1)

if not s.isconnected():
return False
# Ensure connection stays up for a few secs.
if self.DEBUG:
print('Checking WiFi integrity.')
for _ in range(5):
if not s.isconnected():
return False # in 1st 5 secs
await asyncio.sleep(1)
if self.DEBUG:
print('Got reliable connection')
return True

async def _disconnect(self):
self._sta_if.disconnect()
return True # not checking if really disconnected.

def _isconnected(self):
return self._sta_if.isconnected()
54 changes: 54 additions & 0 deletions mqtt_as/interfaces/wlan/esp8266.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from .. import BaseInterface
import network
import uasyncio as asyncio


class WLAN(BaseInterface):
def __init__(self, ssid=None, wifi_pw=None):
super().__init__()
self.DEBUG = False
self._ssid = ssid
self._wifi_pw = wifi_pw
# wifi credentials required for ESP32 / Pyboard D. Optional ESP8266
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)
import esp
esp.sleep_type(0) # Improve connection integrity at cost of power consumption.

async def _connect(self):
s = self._sta_if
if s.isconnected(): # 1st attempt, already connected.
return True
s.active(True)
s.connect() # ESP8266 remembers connection.
for _ in range(60):
if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
break
await asyncio.sleep(1)
if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else
s.disconnect()
await asyncio.sleep(1)
if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None:
s.connect(self._ssid, self._wifi_pw)
while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
await asyncio.sleep(1)

if not s.isconnected():
return False
# Ensure connection stays up for a few secs.
if self.DEBUG:
print('Checking WiFi integrity.')
for _ in range(5):
if not s.isconnected():
return False # in 1st 5 secs
await asyncio.sleep(1)
if self.DEBUG:
print('Got reliable connection')
return True

async def _disconnect(self):
self._sta_if.disconnect()
return True # not checking if really disconnected.

def _isconnected(self):
return self._sta_if.isconnected()
42 changes: 42 additions & 0 deletions mqtt_as/interfaces/wlan/pyboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from .. import BaseInterface
import network
import uasyncio as asyncio


class WLAN(BaseInterface):
def __init__(self, ssid, wifi_pw):
super().__init__()
self.DEBUG = False
self._ssid = ssid
self._wifi_pw = wifi_pw
# wifi credentials required for ESP32 / Pyboard D. Optional ESP8266
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)

async def _connect(self):
s = self._sta_if
s.active(True)
s.connect(self._ssid, self._wifi_pw)
# Pyboard doesn't yet have STAT_CONNECTING constant
while s.status() in (1, 2):
await asyncio.sleep(1)

if not s.isconnected():
return False
# Ensure connection stays up for a few secs.
if self.DEBUG:
print('Checking WiFi integrity.')
for _ in range(5):
if not s.isconnected():
return False # in 1st 5 secs
await asyncio.sleep(1)
if self.DEBUG:
print('Got reliable connection')
return True

async def _disconnect(self):
self._sta_if.disconnect()
return True # not checking if really disconnected.

def _isconnected(self):
return self._sta_if.isconnected()
Loading