-
Notifications
You must be signed in to change notification settings - Fork 334
Description
Summary
I am experiencing intermittent and reproducible BLE connection failures on device with Linux when using Bleak with multiple BLE33 peripherals (IMU sensors). The behavior depends on:
- the number of peripherals,
- the sampling/notification rate,
- whether multiple processes access the same adapter simultaneously.
Key symptoms:
-
On a Raspberry Pi 4 (Raspbian):
A single script connecting and sampling at a frequency of 10 Hz to 2 IMUs works fine.
Launching a second script connecting to 2 additional IMUs causes all connections in the second script to fail immediately withle-connection-abort-by-local.
In general, the other two IMUs struggle to connect even if I make several attempts. Rarely, only one of these IMUs manages to connect, but only after several attempts.Launching a single script connecting and sampling always at a frequency of 10 Hz to 4 IMUs works fine.
-
On a desktop laptop (Linux Mint):
One script connecting to 6 IMUs disconnects randomly depending on frequency of sampling:- 10 Hz → 2 IMUs disconnect after a few seconds
- 2 Hz → 1 IMU disconnects at random
- 3 IMUs @ 10 Hz → stable
I’m opening this issue to understand whether these behaviors are expected, whether Bleak has known limitations with many concurrent devices or multi-process scenarios, and whether Bleak could expose recommended practices or safer defaults.
Environment PC Desktop (system info)
- Bleak version: 1.0.1
- Python version: 3.10.12
- BlueZ version:
5.64 - Kernel:
#88~22.04.1-Ubuntu - Adapter info:
hci0: Type: Primary Bus: USB
BD Address: 00:45:E2:BD:1D:78 ACL MTU: 1021:6 SCO MTU: 255:12
UP RUNNING
RX bytes:2502 acl:0 sco:0 events:234 errors:0
TX bytes:36824 acl:0 sco:0 commands:234 errors:0
Features: 0xff 0xff 0xff 0xfe 0xdb 0xfd 0x7b 0x87
Packet type: DM1 DM3 DM5 DH1 DH3 DH5 HV1 HV2 HV3
Link policy: RSWITCH HOLD SNIFF PARK
Link mode: PERIPHERAL ACCEPT
Name: 'host1'
Class: 0x7c010c
Service Classes: Rendering, Capturing, Object Transfer, Audio, Telephony
Device Class: Computer, Laptop
HCI Version: 5.1 (0xa) Revision: 0x19b7
LMP Version: 5.1 (0xa) Subversion: 0x6d7d
Manufacturer: Realtek Semiconductor Corporation (93)- Tested on:
- Raspberry Pi 3 (Raspbian)
- Desktop PC (Linux Mint)
Reproduction steps (high level)
Scenario A — Raspberry Pi (multi-process failure)
- Start Script A using Bleak, connecting to IMU1 + IMU2, then starting sampling at 10 Hz.
- Start Script B (same script duplicated) connecting to IMU3 + IMU4.
- Expected: both scripts connect successfully.
- Observed:
- Script B fails to connect to any IMU.
- BlueZ reports
le-connection-abort-by-local. bluetoothctlbriefly showsConnected: yesfollowed by immediate disconnection.
Scenario B — Desktop Linux Mint (single process, multiple IMUs)
- Script connects to 6 IMUs using Bleak.
- Subscribes to notifications at different sampling rates.
- Results:
- 10 Hz per IMU → after a few seconds, two IMUs disconnect randomly.
- 2 Hz per IMU → one IMU disconnects at random.
- 3 IMUs @ 10 Hz → stable.
This appears to correlate with the total aggregate notification rate and/or connection scheduling load in BlueZ but i'm not sure.
Observed errors
From bluetoothctl:
Attempting to connect to 0E:FC:50:D2:37:3B
[CHG] Device 0E:FC:50:D2:37:3B Connected: yes
Failed to connect: org.bluez.Error.Failed le-connection-abort-by-local
[CHG] Device 0E:FC:50:D2:37:3B Connected: no
Device 0E:FC:50:D2:37:3B not availableFrom journalctl -u bluetooth:
src/battery.c:btd_battery_update() error updating battery: percentage is not valid
No matching connection for deviceDiagnostics performed
- Confirmed devices are advertising normally.
- Verified manual connect/pair; BlueZ still aborts the connection.
- Restarted Bluetooth:
sudo systemctl restart bluetooth. - Removed BlueZ cache:
/var/lib/bluetooth/<adapter>/*. - Confirmed adapter UP with
hciconfig. - Tested on two systems (Pi + desktop) with similar results.
- Varied notification rate:
- 6 IMUs @ 10 Hz → random disconnections (two IMU)
- 6 IMUs @ 2 Hz → random disconnection (one IMU)
- Multi-process competition consistently causes immediate connection aborts on the Raspberry Pi and on PC Desktop.
These results suggest an interaction between Bleak’s DBus usage and BlueZ’s connection scheduling / controller limits.
My thoughts:
Errors seem to occur when:
-
Multiple processes attempt to access the same BlueZ adapter simultaneously (two scripts on Pi).
-
High aggregate notification throughput + many simultaneous connections cause random disconnections.
-
Bleak opens DBus connections and performs Device1.Connect() / GATT operations. It may be important to know whether Bleak performs adapter-level operations that are risky when multiple processes are present, or whether I should use a particular concurrency model.
I am unsure whether these issues are:
-
purely BlueZ/controller/hardware limitations (predictable), or
-
caused in part by the way Bleak handles DBus connections, timeouts, or per-client handling.
Questions
-
Are there known limits or recommended patterns for using Bleak with many concurrent BLE connections?
-
Does Bleak support or recommend a single-process “connection manager” approach?
Multiple processes seem to trigger BlueZ aborting connections. -
Are there recommended Bleak APIs or settings for stabilizing many-notification setups?
-
Is it expected that BlueZ aborts connections when multiple processes simultaneously use the adapter?
-
Does Bleak have a way to serialise or stagger connections in order to reduce the load on the controller?
I’d appreciate guidance whether this is expected (BlueZ/controller limit) or if I'm doing something wrong.
I'll paste the code of the script I used for testing here as well.
import asyncio
from bleak import BleakClient, BleakScanner
import paho.mqtt.publish as publish
import logging
import time
import struct
import sys
from datetime import datetime
# Replace with your device name or address
MQTT_BROKER = "*****************"
MQTT_PORT = ****
MQTT_TOPIC = "aaac/campaign/imu"
## BLE RELATED SETTINGS
# UUID of the characteristic sending notifications (from Arduino sketch)
ACK_UUID = "555A0002-0010-467A-9538-01F0652C74E8"
#
RAWSENSOR_UUID = "555A0002-0030-467A-9538-01F0652C74E8"
QUAT_UUID = "555A0002-0031-467A-9538-01F0652C74E8"
TIMESTAMP_CHAR_UUID = "555A0002-0034-467A-9538-01F0652C74E8"
#
CAMPAIGN_UUID = "555A0002-0035-467A-9538-01F0652C74E8"
ACTIVITY_UUID = "555A0002-0040-467A-9538-01F0652C74E8" # start and stop
BATTERY_SRV_UID = "0000180F-0000-1000-8000-00805F9B34FB" # "180F"
BATTERY_LEVEL_UID = "00002A19-0000-1000-8000-00805F9B34FB" # "2A19"
START_CAMPAIGN = 0x0
STOP_CAMPAIGN = 0xF
ACK_OK = 0x00
CAMPAIGN_RAW = 0x0
CAMPAIGN_QUAT = 0xF
MAX_WAIT = 3
WAIT_ON_DISCONNECT = 15
## SENSOR SPECIFIC SETTINGS
NUM_SENSORS = 3
NUM_READINGS_PER_SENSOR = 3
NUM_READINGS = NUM_SENSORS * NUM_READINGS_PER_SENSOR
SENSOR_PRECISION = 2 # bytes
COUNTER_PRECISION = 2 # bytes 0 - 65535
TIMESTAMP_SIZE = 4
SENSOR_PACKET_TEMPLATE = ["ax", "ay", "az", "gx", "gy", "gz", "mx", "my", "mz", "nth", "ts"]
SENSOR_PACKET_LENGTH = SENSOR_PRECISION * NUM_READINGS + COUNTER_PRECISION + TIMESTAMP_SIZE
SENSOR_PACKET_DICT = {"ax": 0x0000, "ay": 0x0000, "az": 0x0000,
"gx": 0x0000, "gy": 0x0000, "gz": 0x0000,
"mx": 0x0000, "my": 0x0000, "mz": 0x0000,
"nth": 0x0000, "ts": 0x00000000}
BATTERY_LEVEL_DICT = {"battery": 0x00, "ts": 0x00000000}
LOOP_DELAY = 30 # every 30 secs
# expected input parameters
CAMPAIGN_TEMPLATE = {"name": "", "init_counter": 0, "sampling_frequency": 10, "raw": True}
## GLOBAL VARIABLES
### collected data from BLE readings
sensor_reading_payload = {}
sensor_battery_payload = {}
### IMUs
available_imus = set()
connected_imus = {}
connected_clients = {}
ready_imus = set()
missing_imus = {}
### flag to start BLE data collection
startALL = True
## closing everything
stop_event = asyncio.Event()
def match_device(device, adv_data):
#with lock:
return adv_data.local_name.startswith('BLE') and device.address not in available_imus
def match_known_name(device, adv_data):
return device.name in BSN_IMUS and device.address not in available_imus
# Notification callback
def notification_handler(client):
def notification_payload_handler(sender, data):
packlen = len(data)
imu = connected_imus[client.address][0]
rs_payload = sensor_reading_payload[imu] #SENSOR_PACKET_DICT.copy()
if(packlen < SENSOR_PACKET_LENGTH): # xiao nRF52840 - messages are split to have max 24 bytes
devtype = "xiao"
# initial part of the transmission
if (data[19] * 256 + data[18]) != rs_payload["nth"]: # rs_payload["nth"]
for i in range (0, NUM_READINGS):
rs_payload[SENSOR_PACKET_TEMPLATE[i]] = (data[i*2+1] * 256 + data[i*2])
rs_payload["nth"] = (data[NUM_READINGS*2+1] * 256 + data[NUM_READINGS*2])
# second part of the message, automatically created, overlapping the remaining 8 bytes that
# did not fit in the first half of the message
else: # timestamp, only 8 bytes
sensorts = 0
for i in range(0, TIMESTAMP_SIZE):
sensorts = sensorts + data[i] * (256**i)
rs_payload["ts"] = sensorts
# sensor_reading_payload[imu] = rs_payload
csv_data_d = ",".join(f"{k}=0x{v:x}" for k, v in rs_payload.items())
csv_data_d += "," + "imuid=" + imu
csv_data_d += "," + "cname=" + CAMPAIGN_TEMPLATE["name"]
else: # nano 33 BLE - 28 byte messages
devtype = "nano33BLE"
for i in range (0, NUM_READINGS):
rs_payload[SENSOR_PACKET_TEMPLATE[i]] = (data[i*2+1] * 256 + data[i*2]) # Little-Endian
rs_payload["nth"] = (data[NUM_READINGS*2+1] * 256 + data[NUM_READINGS*2])
# timestamp
sensorts = 0
j = 0
for i in range(NUM_READINGS*2+2, NUM_READINGS*2+2+TIMESTAMP_SIZE):
sensorts = sensorts + data[i] * (256**j)
j += 1
rs_payload["ts"] = sensorts
# sensor_reading_payload[imu] = rs_payload
csv_data_d = ",".join(f"{k}=0x{v:04x}" for k, v in rs_payload.items())
csv_data_d += "," + "imuid=" + imu
csv_data_d += "," + "cname=" + CAMPAIGN_TEMPLATE["name"]
try:
#publish.single(topic=MQTT_TOPIC, payload=csv_data_d, hostname=MQTT_BROKER, port=MQTT_PORT)
print(f"{imu}: {csv_data_d[:40]}")
except Exception as e:
print(f"notification_payload_handler {devtype}: {e} | {data}")
return notification_payload_handler
#battery ... do I need a separate one?
def notification_handler_battery(client):
def notification_battery_handler(sender, data):
try:
imus = connected_imus[client.address]
imu = imus[0]
batval = (data[1] * 256 + data[0])
# global variable updated at every reading
batteryLevel_hex = hex(batval)
csv_data = "battery=" + batteryLevel_hex
sensor_battery_payload[imu] = batval
csv_data += "," + "imuid=" + imu
csv_data += "," + "cname=" + CAMPAIGN_TEMPLATE["name"]
publish.single(topic=MQTT_TOPIC, payload=csv_data, hostname=MQTT_BROKER, port=MQTT_PORT)
print(f"{imu}: {csv_data}")
except Exception as e:
imu = connected_imus[client.address][0]
print(f"notification_battery_handler ({imu}): {e} | {data}")
return notification_battery_handler
# Handling the disconnection
def disconnected_callback(client):
imu_addr = client.address
if imu_addr in connected_imus:
imu_name = connected_imus.get(imu_addr)
missing_imus[imu_addr] = [imu_addr]
print(f"{imu_name} unexpectedly disconnected")
asyncio.create_task(handle_unexpected_disconnection(client))
## Already connected
async def send_timestamp(client):
timestamp = int(time.time())
value = struct.pack(">i", timestamp)
print("timestamp", TIMESTAMP_CHAR_UUID, value)
await client.write_gatt_char(TIMESTAMP_CHAR_UUID, value)
print(f"Sent timestamp: {timestamp}")
## Already connected
async def send_campaign_parameters(client):
co = CAMPAIGN_TEMPLATE["init_counter"]
fr = CAMPAIGN_TEMPLATE["sampling_frequency"]
if CAMPAIGN_TEMPLATE["raw"]:
ra = CAMPAIGN_RAW
else:
ra = CAMPAIGN_QUAT
data = bytearray(co.to_bytes(2, byteorder='little') + bytes([fr, ra]))
print("configuration", CAMPAIGN_UUID, data)
await client.write_gatt_char(CAMPAIGN_UUID, data, len(data))
print(f"Sent campaign parameters: {CAMPAIGN_TEMPLATE}")
async def send_start_new(client):
data = bytearray(co.to_bytes(1, byteorder='little') + bytes([START_CAMPAIGN]))
print("start ", ACTIVITY_UUID, data)
await client.write_gatt_char(ACTIVITY_UUID, data, len(data))
print(f"Sent START")
## Already connected
async def read_ack(client):
response = await client.read_gatt_char(ACK_UUID)
if (response[0] == ACK_OK):
print("IMU is ready to start")
state = True
else:
state = False
return state
async def send_start(client):
data = bytearray([START_CAMPAIGN])
print("start", ACTIVITY_UUID, data)
await client.write_gatt_char(ACTIVITY_UUID, data)
print(f"Sent START")
# maybe connected, maybe not
async def send_stop(client, imu):
data = bytearray([STOP_CAMPAIGN])
if client:
await client.write_gatt_char(ACTIVITY_UUID, data)
print(f"Sent STOP")
else:
async with BleakClient(imu) as clientlocal:
await clientlocal.write_gatt_char(ACTIVITY_UUID, data)
print(f"Sent STOP")
async def setup_all_connections():
print("Scanning for IMUs ", BSN_IMUS)
## waits for all imus to show up and be available
num_imus = len(BSN_IMUS)
num_imus_ok = 0
tot_waiting = 0
while num_imus > num_imus_ok:
devices = await BleakScanner.discover(timeout=5.0)
for dev in devices:
if dev.name and dev.name in BSN_IMUS and not dev.name in connected_imus:
print(f"Found {dev.name}:", end=" ")
available_imus.add(dev.address)
## CB 4 disconnect
client = BleakClient(dev.address, disconnected_callback=disconnected_callback)
try:
print("trying to connect")
await client.connect()
if client.is_connected:
print(f"Connected to {dev.name}")
connected_clients[dev.address] = client
num_imus_ok += 1
connected_imus[dev.address] = [dev.name]
else:
print(f"Impossible to connect to {dev.name}")
except Exception as e:
print(f"connect_all: Failed to connect to {dev.name}: {e}")
if tot_waiting < MAX_WAIT and num_imus_ok < num_imus:
tot_waiting += 1
#wait a moment
print(f"Waiting 2 sec before retrying for the {num_imus - num_imus_ok} IMU(s)")
await asyncio.sleep(2)
elif num_imus_ok < num_imus:
print(f"Missing {num_imus - num_imus_ok} IMUs. Retry later. Exiting")
return False
print("All expected IMUs are alive")
return True
async def setup_all_campaigns():
num_imus_ok = 0
for imu_addr, client in connected_clients.items():
try:
# moved later
# await client.start_notify(RAWSENSOR_UUID, notification_handler(client))
# print("Subscribed to ", RAWSENSOR_UUID, " for IMU ", connected_imus[imu_addr])
await send_timestamp(client)
print("Timestamp")
await send_campaign_parameters(client)
print("Configuration")
response = await client.read_gatt_char(ACK_UUID)
if response:
print("ACK received")
ready_imus.add(imu_addr)
num_imus_ok += 1
except:
print(f"Failed connecting to {connected_imus[imu_addr]}")
result = (num_imus_ok == len(BSN_IMUS))
return result
async def send_STOP_to_all_imus():
## send stop to all, as in parallel as possible, it is anyhow sequential
startALL = False
print("Stopping all IMUs")
tasks = [asyncio.create_task(send_stop_to_imu(client, imu_addr)) for imu_addr, client in connected_clients.items()]
await asyncio.gather(*tasks)
async def send_stop_to_imu(client, imu_addr):
data = bytearray([STOP_CAMPAIGN])
try:
await client.write_gatt_char(ACTIVITY_UUID, data)
print(f"Sent STOP to {connected_imus[imu_addr]} ({imu_addr})")
except Exception as e:
print(f"Failed to send STOP to {connected_imus[imu_addr]} ({imu_addr}): {e}")
async def send_START_to_all_imus():
## send start to all, as in parallel as possible, it is anyhow sequential
tasks = [asyncio.create_task(send_start_to_imu(client, imu_addr)) for imu_addr, client in connected_clients.items()]
await asyncio.gather(*tasks)
async def send_start_to_imu(client, imu_addr):
data = bytearray([START_CAMPAIGN])
try:
await client.write_gatt_char(ACTIVITY_UUID, data)
print(f"Sent START to {connected_imus[imu_addr]} ({imu_addr})")
except Exception as e:
print(f"Failed to send START to {connected_imus[imu_addr]} ({imu_addr}): {e}")
startALL = False
async def collect_data_from_all_imus():
try:
# print("[collect_data_from_all_imus]")
tasks = [asyncio.create_task(collect_data_from_imu(client, imu_addr)) for imu_addr, client in connected_clients.items()]
await asyncio.gather(*tasks)
except:
print("Problem during data collection")
# not stopping, hoping to reconnect without restarting
# await stop_imus()
async def collect_data_from_imu(client, imu_addr):
try:
#prepare payload
# print("[collect_data_from_imu]", imu_addr)
imu = connected_imus[imu_addr][0]
# make a copy of the payload dictionary to have all fields set for each imu
# it is there prepared
sensor_reading_payload[imu] = SENSOR_PACKET_DICT.copy()
await client.start_notify(RAWSENSOR_UUID, notification_handler(client))
print(f"Subscribed to [{connected_imus[imu_addr]}] ", RAWSENSOR_UUID)
try: ## if the battery is not avaiable, still collecting the rest
# battery
await client.start_notify(BATTERY_LEVEL_UID, notification_handler_battery(client))
print(f"included battery level] ", BATTERY_LEVEL_UID)
except:
print(f"collect_data_from_imu({imu_addr}) not sending battery data, only sensor")
# Keep listening
while True:
await asyncio.sleep(LOOP_DELAY)
except Exception as e:
print(f"collect_data_from_imu({imu_addr}) Error: {e}")
async def collect_data_from_one_imu(client, imu_addr):
try:
tasks = [asyncio.create_task(collect_data_from_imu(client, imu_addr))]
await asyncio.gather(*tasks)
except Exception as e:
print(f"collect_data_from_one_imu: Problem during data collection: {e}")
async def disconnect_imus():
for imu_addr, client in connected_clients.items():
imu_name = connected_imus.get(imu_addr)
del connected_imus[imu_addr]
await client.disconnect()
print(f"Successfully disconnected from {imu_name}")
async def stop_imus():
for imu_addr, client in connected_clients.items():
imu_name = connected_imus.get(imu_addr)
await client.stop_notify(RAWSENSOR_UUID)
ready_imus.remove(imu_addr)
print("Successfully stopped {imu_name}")
## Try to reconnect
async def handle_unexpected_disconnection(client):
imu_addr = client.address
if imu_addr in connected_imus:
imu_name = connected_imus.get(imu_addr)[0]
print(f"Trying to reconnect to {imu_name} within {WAIT_ON_DISCONNECT} seconds")
start_time = time.time()
reconnected = False
while time.time() - start_time < WAIT_ON_DISCONNECT:
client_rec = BleakClient(imu_addr, disconnected_callback=disconnected_callback)
try:
print(f"Trying to reconnect to {imu_name}")
await client_rec.connect()
if client_rec.is_connected:
print(f"Connected to {imu_name}")
del missing_imus[imu_addr]
connected_clients[imu_addr] = client_rec
await collect_data_from_one_imu(client_rec, imu_addr)
#for the next disconnection
reconnected = True
else:
print(f"{imu_name} did not reconnect ...")
except Exception as e:
print(f"Failed to reconnect to {imu_name} | {e}")
await asyncio.sleep(1)
if not reconnected:
available_imus.remove(imu_addr)
del connected_clients[imu_addr]
stop_event.set()
# await ending_campaign()
async def ending_campaign():
print("Cleaning up: stopping and disconnecting.")
await send_STOP_to_all_imus()
await disconnect_imus()
print("Finished")
# stop_event.set()
async def main_process(blnStartCampaign):
result = await setup_all_connections()
if result:
if blnStartCampaign:
result = await setup_all_campaigns()
if result:
await send_START_to_all_imus()
if startALL:
# start everything
print("ALL IMUs started")
collect_task = asyncio.create_task(collect_data_from_all_imus())
await stop_event.wait()
print("Ending campaign")
collect_task.cancel() # ferma la raccolta
await ending_campaign()
else: # not all IMUs are available
await disconnect_imus()
if __name__ == '__main__':
# Run the asyncio event loop
try:
# IMUX-IMUY-IMUZ
BSN_IMUS = sys.argv[1].split("-")
CAMPAIGN_TEMPLATE["name"] = sys.argv[2]
CAMPAIGN_TEMPLATE["init_counter"] = int(sys.argv[3])
CAMPAIGN_TEMPLATE["sampling_frequency"] = int(sys.argv[4])
CAMPAIGN_TEMPLATE["raw"] = bool(sys.argv[5])
blnStartCampaign = not (len(sys.argv) > 6)
if blnStartCampaign:
print(CAMPAIGN_TEMPLATE)
asyncio.run(main_process(blnStartCampaign))
except KeyboardInterrupt:
print("Stopped by user.")
stop_event.set()
# stop_imus()