Skip to content

Commit

Permalink
scanner: detect OF devices in Nearby state
Browse files Browse the repository at this point in the history
  • Loading branch information
malmeloo committed Jul 15, 2024
1 parent 90eaa68 commit 88bd519
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 49 deletions.
43 changes: 33 additions & 10 deletions examples/device_scanner.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,50 @@
import asyncio
import logging

from findmy.scanner import OfflineFindingScanner
from findmy.scanner import (
NearbyOfflineFindingDevice,
OfflineFindingScanner,
SeparatedOfflineFindingDevice,
)

logging.basicConfig(level=logging.INFO)


def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
print(f"NEARBY Device - {device.mac_address}")
print(f" Status byte: {device.status:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()


def _print_separated(device: SeparatedOfflineFindingDevice) -> None:
print(f"SEPARATED Device - {device.mac_address}")
print(f" Public key: {device.adv_key_b64}")
print(f" Lookup key: {device.hashed_adv_key_b64}")
print(f" Status byte: {device.status:x}")
print(f" Hint byte: {device.hint:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()


async def scan() -> None:
scanner = await OfflineFindingScanner.create()

print("Scanning for FindMy-devices...")
print()

async for device in scanner.scan_for(10, extend_timeout=True):
print(f"Device - {device.mac_address}")
print(f" Public key: {device.adv_key_b64}")
print(f" Lookup key: {device.hashed_adv_key_b64}")
print(f" Status byte: {device.status:x}")
print(f" Hint byte: {device.hint:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()
if isinstance(device, NearbyOfflineFindingDevice):
_print_nearby(device)
elif isinstance(device, SeparatedOfflineFindingDevice):
_print_separated(device)
else:
print(f"Unknown device: {device}")
print()


if __name__ == "__main__":
Expand Down
12 changes: 10 additions & 2 deletions findmy/scanner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
"""Utilities related to physically discoverable FindMy-devices."""
from .scanner import OfflineFindingScanner
from .scanner import (
NearbyOfflineFindingDevice,
OfflineFindingScanner,
SeparatedOfflineFindingDevice,
)

__all__ = ("OfflineFindingScanner",)
__all__ = (
"OfflineFindingScanner",
"NearbyOfflineFindingDevice",
"SeparatedOfflineFindingDevice",
)
195 changes: 158 additions & 37 deletions findmy/scanner/scanner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Airtag scanner."""

from __future__ import annotations

import asyncio
import logging
import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, AsyncGenerator

from bleak import BleakScanner
Expand All @@ -18,27 +20,26 @@
logging.getLogger(__name__)


class OfflineFindingDevice(HasPublicKey):
class OfflineFindingDevice(ABC):
"""Device discoverable through Apple's bluetooth-based Offline Finding protocol."""

OF_HEADER_SIZE = 2
OF_TYPE = 0x12
OF_DATA_LEN = 25

def __init__( # noqa: PLR0913
@classmethod
@property
@abstractmethod
def payload_len(cls) -> int:
"""Length of OfflineFinding data payload in bytes."""
raise NotImplementedError

def __init__(
self,
mac_bytes: bytes,
status: int,
public_key: bytes,
hint: int,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Initialize an `OfflineFindingDevice`."""
"""Instantiate an OfflineFindingDevice."""
self._mac_bytes: bytes = mac_bytes
self._status: int = status
self._public_key: bytes = public_key
self._hint: int = hint

self._additional_data: dict[Any, Any] = additional_data or {}

@property
Expand All @@ -47,6 +48,139 @@ def mac_address(self) -> str:
mac = self._mac_bytes.hex().upper()
return ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))

@property
def additional_data(self) -> dict[Any, Any]:
"""Any additional data. No guarantees about the contents of this dictionary."""
return self._additional_data

@classmethod
@abstractmethod
def from_payload(
cls,
mac_address: str,
payload: bytes,
additional_data: dict[Any, Any] | None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
raise NotImplementedError

@classmethod
def from_ble_payload(
cls,
mac_address: str,
ble_payload: bytes,
additional_data: dict[Any, Any] | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
if len(ble_payload) < cls.OF_HEADER_SIZE:
logging.error("Not enough bytes to decode: %s", len(ble_payload))
return None
if ble_payload[0] != cls.OF_TYPE:
logging.debug("Unsupported OF type: %s", ble_payload[0])
return None

device_type = next(
(dev for dev in cls.__subclasses__() if dev.payload_len == ble_payload[1]),
None,
)
if device_type is None:
logging.error("Invalid OF payload length: %s", ble_payload[1])
return None

return device_type.from_payload(
mac_address,
ble_payload[cls.OF_HEADER_SIZE :],
additional_data,
)

@override
def __eq__(self, other: object) -> bool:
if isinstance(other, OfflineFindingDevice):
return self.mac_address == other.mac_address

return NotImplemented

@override
def __hash__(self) -> int:
return int.from_bytes(self._mac_bytes)


class NearbyOfflineFindingDevice(OfflineFindingDevice):
"""Offline-Finding device in nearby state."""

@classmethod
@property
@override
def payload_len(cls) -> int:
"""Length of OfflineFinding data payload in bytes."""
return 0x02 # 2

def __init__(
self,
mac_bytes: bytes,
status_byte: int,
extra_byte: int,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Instantiate a NearbyOfflineFindingDevice."""
super().__init__(mac_bytes, additional_data)

self._status_byte: int = status_byte
self._extra_byte: int = extra_byte

@property
def status(self) -> int:
"""Status value as reported by the device."""
return self._status_byte % 255

@classmethod
@override
def from_payload(
cls,
mac_address: str,
payload: bytes,
additional_data: dict[Any, Any] | None = None,
) -> NearbyOfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.payload_len:
logging.error(
"Invalid OF data length: %s instead of %s",
len(payload),
payload[1],
)

mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
status_byte = payload[0]
extra_byte = payload[1]

return NearbyOfflineFindingDevice(mac_bytes, status_byte, extra_byte, additional_data)


class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
"""Offline-Finding device in separated state."""

@classmethod
@property
@override
def payload_len(cls) -> int:
"""Length of OfflineFinding data in bytes."""
return 0x19 # 25

def __init__( # noqa: PLR0913
self,
mac_bytes: bytes,
status: int,
public_key: bytes,
hint: int,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Initialize a `SeparatedOfflineFindingDevice`."""
super().__init__(mac_bytes, additional_data)

self._status: int = status
self._public_key: bytes = public_key
self._hint: int = hint

@property
def status(self) -> int:
"""Status value as reported by the device."""
Expand All @@ -57,56 +191,43 @@ def hint(self) -> int:
"""Hint value as reported by the device."""
return self._hint % 255

@property
def additional_data(self) -> dict[Any, Any]:
"""Any additional data. No guarantees about the contents of this dictionary."""
return self._additional_data

@property
@override
def adv_key_bytes(self) -> bytes:
"""See `HasPublicKey.adv_key_bytes`."""
return self._public_key

@classmethod
@override
def from_payload(
cls,
mac_address: str,
payload: bytes,
additional_data: dict[Any, Any],
) -> OfflineFindingDevice | None:
"""Get an OfflineFindingDevice object from a BLE payload."""
if len(payload) < cls.OF_HEADER_SIZE:
logging.error("Not enough bytes to decode: %s", len(payload))
return None
if payload[0] != cls.OF_TYPE:
logging.debug("Unsupported OF type: %s", payload[0])
return None
if payload[1] != cls.OF_DATA_LEN:
logging.debug("Unknown OF data length: %s", payload[1])
return None
if len(payload) != cls.OF_HEADER_SIZE + cls.OF_DATA_LEN:
logging.debug(
additional_data: dict[Any, Any] | None = None,
) -> SeparatedOfflineFindingDevice | None:
"""Get a SeparatedOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.payload_len:
logging.error(
"Invalid OF data length: %s instead of %s",
len(payload) - cls.OF_HEADER_SIZE,
len(payload),
payload[1],
)
return None

mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))

status = payload[cls.OF_HEADER_SIZE + 0]
status = payload[0]

pubkey_end = payload[cls.OF_HEADER_SIZE + 1 : cls.OF_HEADER_SIZE + 23]
pubkey_end = payload[1:23]
pubkey_middle = mac_bytes[1:]
pubkey_start_ms = payload[cls.OF_HEADER_SIZE + 23] << 6
pubkey_start_ms = payload[23] << 6
pubkey_start_ls = mac_bytes[0] & 0b00111111
pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big")
pubkey = pubkey_start + pubkey_middle + pubkey_end

hint = payload[cls.OF_HEADER_SIZE + 24]
hint = payload[24]

return OfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data)
return SeparatedOfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data)

@override
def __repr__(self) -> str:
Expand Down Expand Up @@ -179,7 +300,7 @@ async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None:
# Likely Windows host, where details is a '_RawAdvData' object.
# See: https://github.com/malmeloo/FindMy.py/issues/24
additional_data = {}
return OfflineFindingDevice.from_payload(device.address, apple_data, additional_data)
return OfflineFindingDevice.from_ble_payload(device.address, apple_data, additional_data)

async def scan_for(
self,
Expand Down

0 comments on commit 88bd519

Please sign in to comment.