Skip to content

Commit d952af5

Browse files
committed
Add passive_scan.py example
1 parent 5b74fab commit d952af5

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed

examples/passive_scan.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
"""
2+
Scanner using passive scanning mode
3+
--------------
4+
5+
Example similar to detection_callback.py, but using passive scanning
6+
7+
Updated on 2022-11-24 by bojanpotocnik <[email protected]>
8+
9+
"""
10+
import argparse
11+
import asyncio
12+
import contextlib
13+
import logging
14+
from typing import Optional, List, Dict, Any
15+
16+
import bleak
17+
from bleak import BLEDevice, AdvertisementData
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
def _get_os_specific_scanning_params(
23+
uuids: Optional[List[str]],
24+
rssi: Optional[int] = None,
25+
macos_use_bdaddr: bool = False,
26+
) -> Dict[str, Any]:
27+
def get_bluez_dbus_scanning_params() -> Dict[str, Any]:
28+
from bleak.assigned_numbers import AdvertisementDataType
29+
from bleak.backends.bluezdbus.advertisement_monitor import OrPattern
30+
from bleak.backends.bluezdbus.scanner import (
31+
BlueZScannerArgs,
32+
BlueZDiscoveryFilters,
33+
)
34+
35+
filters = BlueZDiscoveryFilters(
36+
# UUIDs= Added below, because it cannot be None
37+
# RSSI= Added below, because it cannot be None
38+
Transport="le",
39+
DuplicateData=True,
40+
)
41+
if uuids:
42+
filters["UUIDs"] = uuids
43+
if rssi:
44+
filters["RSSI"] = rssi
45+
46+
# or_patterns ar required for BlueZ passive scanning
47+
or_patterns = [
48+
# General Discoverable (peripherals)
49+
OrPattern(0, AdvertisementDataType.FLAGS, b"\x02"),
50+
# BR/EDR Not Supported (BLE peripherals)
51+
OrPattern(0, AdvertisementDataType.FLAGS, b"\x04"),
52+
# General Discoverable, BR/EDR Not Supported (BLE peripherals)
53+
OrPattern(0, AdvertisementDataType.FLAGS, b"\x06"),
54+
# General Discoverable, LE and BR/EDR Capable (Controller), LE and BR/EDR Capable (Host) (computers, phones)
55+
OrPattern(0, AdvertisementDataType.FLAGS, b"\x1A"),
56+
]
57+
58+
return {"bluez": BlueZScannerArgs(filters=filters, or_patterns=or_patterns)}
59+
60+
def get_core_bluetooth_scanning_params() -> Dict[str, Any]:
61+
from bleak.backends.corebluetooth.scanner import CBScannerArgs
62+
63+
return {"cb": CBScannerArgs(use_bdaddr=macos_use_bdaddr)}
64+
65+
return {
66+
"BleakScannerBlueZDBus": get_bluez_dbus_scanning_params,
67+
"BleakScannerCoreBluetooth": get_core_bluetooth_scanning_params,
68+
# "BleakScannerP4Android": get_p4android_scanning_params,
69+
# "BleakScannerWinRT": get_winrt_scanning_params,
70+
}.get(bleak.get_platform_scanner_backend_type().__name__, lambda: {})()
71+
72+
73+
@contextlib.asynccontextmanager
74+
async def scanner(**kwargs):
75+
try:
76+
async with bleak.BleakScanner(**kwargs) as bleak_scanner:
77+
yield bleak_scanner
78+
79+
except bleak.exc.BleakNoPassiveScanError as e:
80+
logger.error(f"Passive scanning not possible, using active scanning ({e})")
81+
82+
del kwargs["scanning_mode"]
83+
async with bleak.BleakScanner(**kwargs) as bleak_scanner:
84+
yield bleak_scanner
85+
86+
87+
async def main(args: argparse.Namespace):
88+
def scan_callback(device: BLEDevice, adv_data: AdvertisementData):
89+
logger.info("%s: %r", device.address, adv_data)
90+
91+
async with scanner(
92+
detection_callback=scan_callback,
93+
**_get_os_specific_scanning_params(
94+
uuids=args.services, macos_use_bdaddr=args.macos_use_bdaddr
95+
),
96+
scanning_mode="passive",
97+
):
98+
await asyncio.sleep(60)
99+
100+
101+
if __name__ == "__main__":
102+
logging.basicConfig(
103+
level=logging.INFO,
104+
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
105+
)
106+
107+
parser = argparse.ArgumentParser()
108+
parser.add_argument(
109+
"--macos-use-bdaddr",
110+
action="store_true",
111+
help="when true use Bluetooth address instead of UUID on macOS",
112+
)
113+
parser.add_argument(
114+
"--services",
115+
metavar="<uuid>",
116+
nargs="*",
117+
help="UUIDs of one or more services to filter for",
118+
)
119+
120+
arguments = parser.parse_args()
121+
122+
try:
123+
asyncio.run(main(arguments))
124+
except KeyboardInterrupt:
125+
pass

0 commit comments

Comments
 (0)