-
Notifications
You must be signed in to change notification settings - Fork 334
Description
Hey all,
I'm trying to measure the theoretical throughput as distance changes of the GATT protocol, and for that I have two identical Raspberry Pi 3b+, running Debian 12 Bookwork on aarch64 with the latest BlueZ version 5.83 which I built from source.
To do that I have a bluetoothctl GATT server running on the first pi, gr1, that I set up with the following script:
sudo bluetoothctl
advertise.manufacturer 0xffff 0x12 0x34
advertise.name gr1
advertise.tx-power on
gatt.register-service ffffffff-ffff-ffff-ffff-ffffffffffff
gatt.register-characteristic AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA read,write
gatt.register-application
advertise on
discoverable on
pairable on
(If somebody has an idea how I could automate it without the GATT server dying as the bluetoothctl command executes, please let me know!)
To connect to the server, I have another pi sitting directly on top of the other one: gr2, which uses the following bleak python script to read the attribute value as many times as possible within a minute, after gr1 was discovered and service discovery was completed:
#!/usr/bin/env python3
import asyncio
import logging
import argparse
import time
from bleak import (BleakScanner, BleakClient)
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from socket import gethostname
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class textcolor:
GREEN = '\033[92m'
RED = '\033[91m'
CYAN = '\033[96m'
YELLOW = '\033[93m'
GOLD = '\033[33m'
BOLD = '\033[1m'
END = '\033[0m'
class ThroughputTestPeer:
def __init__(self, device_name: str, scan_duration: float = 5.0):
self.device_name = device_name
self.loop = asyncio.get_event_loop()
self.trigger = asyncio.Event()
self.scan_duration = scan_duration
def is_trusted_peer(self, name: str) -> bool:
return name.startswith("gr")
async def scan_for_peers(self):
"""Scan for trusted GATT servers. A trusted server is a server whose name begins with 'gr'."""
devices: dict[str, tuple[BLEDevice, AdvertisementData]] = {}
def detection_callback(device: BLEDevice, advertisement_data: AdvertisementData):
device_name: str = (advertisement_data.local_name and advertisement_data.local_name) or (device.name and device.name) or ""
if self.is_trusted_peer(device_name):
print(f"🤝 Found trusted peer: {device_name} ({device.address}) RSSI: {advertisement_data.rssi}")
devices[device.address] = (device, advertisement_data)
scanner = BleakScanner(detection_callback=detection_callback)
await scanner.start()
await asyncio.sleep(self.scan_duration)
await scanner.stop()
print(f"🤝 Found {len(devices.keys())} trusted GATT servers.")
if len(devices.keys()) == 0:
return
print("Sleeping (open btmon now!!!)")
time.sleep(5)
print("Requesting services...")
SERVICE_UUID = "ffffffff-ffff-ffff-ffff-ffffffffffff"
for device_address in devices:
device,_adv = devices[device_address]
try:
async with BleakClient(device, pair=True) as client:
for service in client.services:
if service.uuid == SERVICE_UUID:
for c in service.characteristics:
if "aaaa" in c.uuid.lower():
count = 0
print(f"{textcolor.CYAN}Reading characteristic for 60 seconds...{textcolor.END}")
start = time.time()
end = start + 60 # seconds
while time.time() <= end:
value = await client.read_gatt_char(c.uuid)
value_parsed = int.from_bytes(value)
print(f"📦 Read value: {value_parsed}")
count += len(value)
time.sleep(0.250)
print(f"{textcolor.GREEN}✅ Done! Read {count} bytes in 60 seconds ({count/60} B/s){textcolor.END}")
except asyncio.exceptions.TimeoutError:
print(f'{textcolor.RED}TimeoutError:{textcolor.END} Device at address `{device_address}` timed out.')
except Exception as error:
print(f'Exception: An error occurred while connecting to device `{device_address}`:\n\t{error}')
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='BLE GATT Test')
peer_name = gethostname()
parser.add_argument('-n', '--peer-name', default=peer_name,
help=f'Peer name (default: {peer_name})')
return parser.parse_args()
async def main():
args = parse_arguments()
peer = ThroughputTestPeer(
device_name=args.peer_name
)
await peer.scan_for_peers()
if __name__ == "__main__":
asyncio.run(main())The thing is, I get a disconnect directly after the second read. I put the sleep there in order to not overwhelm the bluetooth stack, but it doesn't help. With a lower sleep time I can get more reads in before the disconnect, but it still disconnects.
Having another terminal window open with a bluetoothctl connected session on the second pi completely solved the problem, so my thinking is that it has something to do with a connection timeout on the bleak stack? I am not sure. Anyway I am happy to attach any btmon logs as needed.