Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Added
Changed
-------
* Raise new ``BleakBluetoothNotAvailableError`` when Bluetooth is not supported, turned off or permission is denied.
* Use AcquireNotify rather than StartNotify for Linux backend on supported characteristics

Fixed
-----
Expand Down
94 changes: 72 additions & 22 deletions bleak/backends/bluezdbus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(
self._disconnect_monitor_event: Optional[asyncio.Event] = None
# map of characteristic D-Bus object path to notification callback
self._notification_callbacks: dict[str, NotifyCallback] = {}
self._notification_fd_stop_events: dict[str, asyncio.Event] = {}

# used to override mtu_size property
self._mtu_size: Optional[int] = None
Expand Down Expand Up @@ -205,6 +206,10 @@ def on_value_changed(char_path: str, value: bytes) -> None:
stack.callback(local_disconnect_monitor_event.set)

async def disconnect_device() -> None:
# Clean up open notification file descriptors
for stop_event in self._notification_fd_stop_events.items():
stop_event.set()

# Calling Disconnect cancels any pending connect request. Also,
# if connection was successful but _get_services() raises (e.g.
# because task was cancelled), then we still need to disconnect
Expand Down Expand Up @@ -904,6 +909,24 @@ async def write_gatt_descriptor(
"Write Descriptor %s | %s: %s", descriptor.handle, descriptor.obj[0], data
)

async def _read_notify_fd(self, fd, callback, close_event):
loop = asyncio.get_running_loop()
with os.fdopen(fd, "rb", closefd=True) as f:
while True:
try:
if close_event.is_set():
break

data = await loop.run_in_executor(None, f.read, 1024)
if not data:
continue
callback(bytes(data))
except Exception as e:
logger.error(
"Exception occured while using AcquireNotify fd: %s",
e,
)

@override
async def start_notify(
self,
Expand All @@ -914,20 +937,41 @@ async def start_notify(
"""
Activate notifications/indications on a characteristic.
"""
self._notification_callbacks[characteristic.obj[0]] = callback

assert self._bus is not None

reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=characteristic.obj[0],
interface=defs.GATT_CHARACTERISTIC_INTERFACE,
member="StartNotify",
if "NotifyAcquired" in characteristic.obj[1]:
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=characteristic.obj[0],
interface=defs.GATT_CHARACTERISTIC_INTERFACE,
member="AcquireNotify",
body=[{}],
signature="a{sv}",
)
)
)
assert reply
assert_reply(reply)
assert reply
assert_reply(reply)

unix_fd = reply.unix_fds[0]
stop_event = asyncio.Event()
self._notification_fd_stop_events[characteristic.obj[0]] = stop_event
task = asyncio.create_task(
self._read_notify_fd(unix_fd, callback, stop_event)
)
_background_tasks.add(task)
else:
self._notification_callbacks[characteristic.obj[0]] = callback
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=characteristic.obj[0],
interface=defs.GATT_CHARACTERISTIC_INTERFACE,
member="StartNotify",
)
)
assert reply
assert_reply(reply)

@override
async def stop_notify(self, characteristic: BleakGATTCharacteristic) -> None:
Expand All @@ -942,15 +986,21 @@ async def stop_notify(self, characteristic: BleakGATTCharacteristic) -> None:

assert self._bus is not None

reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=characteristic.obj[0],
interface=defs.GATT_CHARACTERISTIC_INTERFACE,
member="StopNotify",
if "NotifyAcquired" in characteristic.obj[1]:
stop_event = self._notification_fd_stop_events.pop(
characteristic.obj[0], None
)
)
assert reply
assert_reply(reply)

self._notification_callbacks.pop(characteristic.obj[0], None)
if stop_event:
stop_event.set()
else:
reply = await self._bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=characteristic.obj[0],
interface=defs.GATT_CHARACTERISTIC_INTERFACE,
member="StopNotify",
)
)
assert reply
assert_reply(reply)
self._notification_callbacks.pop(characteristic.obj[0], None)
Loading