Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ jobs:
run: make .venv
- name: Run lint
run: make lint
- name: Run typecheck
run: make typecheck
- name: Run tests
run: make test
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ dependencies = [
"bluetooth-adapters ~= 0.16",
"cachetools ~= 5.3",
"dbus-fast ~= 2.15",
"pybricks ~= 3.5.0",
"pybricks ~= 3.6.0",
]

[dependency-groups]
debug = ["bluetooth-data-tools ~= 1.15"]
dev = [
"async-timer ~= 1.1.6",
"mypy ~= 1.15.0",
"mypy ~= 1.16.0",
"pytest ~= 8.3",
"pytest-asyncio ~= 0.26.0",
"pytest-asyncio ~= 1.0.0",
"python-dbusmock ~= 0.34.3",
"ruff ~= 0.11.8",
"types-cachetools ~= 5.3",
Expand Down
6 changes: 4 additions & 2 deletions src/pb_ble/bluezdbus/broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ async def stop_broadcast(self, adv: str | BroadcastAdvertisement):
pass
finally:
self.bus.unexport(path)
del self.advertisements[path]
if path in self.advertisements:
del self.advertisements[path]

async def stop(self):
"""
Expand Down Expand Up @@ -114,7 +115,8 @@ async def broadcast(self, adv: BroadcastAdvertisement):
def release_advertisement(path):
try:
self.bus.unexport(path)
del self.advertisements[path]
if path in self.advertisements:
del self.advertisements[path]
finally:
on_release(path)

Expand Down
2 changes: 1 addition & 1 deletion src/pb_ble/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""Type of a value that can be broadcast."""

PybricksBroadcastData: TypeAlias = (
PybricksBroadcastValue | tuple[PybricksBroadcastValue]
PybricksBroadcastValue | tuple[PybricksBroadcastValue, ...]
)
"""Type of the broadcast data."""

Expand Down
39 changes: 31 additions & 8 deletions src/pb_ble/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@

"""

import logging
from enum import IntEnum
from struct import pack, unpack, unpack_from
from typing import Literal, Tuple
from typing import Literal, Tuple, cast

from .constants import PybricksBroadcast, PybricksBroadcastValue
from .constants import PybricksBroadcast, PybricksBroadcastData, PybricksBroadcastValue

logger = logging.getLogger(__name__)


def decode_message(
Expand All @@ -30,24 +33,32 @@ def decode_message(
or a tuple.
"""

logger.debug(f"decoding[{len(data)}]: {data!r}")

# idx 0 is the channel
channel: int = unpack_from("<B", data)[0] # uint8
logger.debug(f"channel: {channel}")
# idx 1 is the message start
idx = 1
decoded_data = []
decoded_data: list[PybricksBroadcastValue] = []
single_object = False

while idx < len(data):
idx, val = _decode_next_value(idx, data)
if val is None:
logger.debug(f"data[{len(decoded_data)}]: SINGLE_OBJECT marker")
single_object = True
else:
logger.debug(f"data[{len(decoded_data)}] of {type(val)!s:<15}: {val!r}")
decoded_data.append(val)

if single_object:
return PybricksBroadcast(channel, decoded_data[0])
decoded_value: PybricksBroadcastValue = decoded_data[0]
return PybricksBroadcast(channel, decoded_value)
else:
return PybricksBroadcast(channel, tuple(decoded_data)) # type: ignore # https://github.com/python/mypy/issues/7509
return PybricksBroadcast(
channel, cast(PybricksBroadcastData, tuple(decoded_data))
)


def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:
Expand All @@ -63,17 +74,22 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:

# idx 0 is the channel
encoded_channel = pack("<B", channel)
logger.debug(f"channel: {channel} -> {encoded_channel!r}")

# idx 1 is the message start
encoded_data = bytearray(encoded_channel)

if len(values) == 1:
# set SINGLE_OBJECT marker
header = PybricksBleBroadcastDataType.SINGLE_OBJECT << 5
logger.debug(f"data[{len(encoded_data)}]: SINGLE_OBJECT marker")
encoded_data.append(header)

for val in values:
header, encoded_val = _encode_value(val)
logger.debug(
f"data[{len(encoded_data)}] of {type(val)!s}: {val!r} -> ({header!r}, {encoded_val!r})"
)
encoded_data.append(header)

if encoded_val is not None:
Expand All @@ -85,7 +101,9 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:
f"Payload too large: {len(encoded_data)} bytes (maximum is {OBSERVED_DATA_MAX_SIZE} bytes)"
)

return bytes(encoded_data)
message = bytes(encoded_data)
logger.debug(f"encoded[{len(message)}]: {message!r}")
return message


def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]:
Expand All @@ -99,9 +117,14 @@ def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]:
:return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor
ID, the product ID and the product revision.
"""
vid_type: int
vid: int
pid: int
rev: int

vid_type, vid, pid, rev = unpack("<BHHH", data)
vid_type = "BT" if vid_type else "USB"
return vid_type, vid, pid, rev

return "BT" if vid_type else "USB", vid, pid, rev


def pack_pnp_id(
Expand Down
Empty file added src/pb_ble/py.typed
Empty file.
25 changes: 20 additions & 5 deletions src/pb_ble/vhub.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys
from contextlib import AsyncExitStack
from typing import ClassVar, Optional, Sequence, Tuple, Union
from typing import ClassVar, Optional, Sequence, Tuple, Union, cast

from dbus_fast.aio import MessageBus, ProxyObject
from dbus_fast.constants import BusType
Expand All @@ -16,6 +16,8 @@
from .constants import (
PYBRICKS_MAX_CHANNEL,
PYBRICKS_MIN_CHANNEL,
PybricksBroadcastData,
PybricksBroadcastValue,
ScanningMode,
)

Expand Down Expand Up @@ -61,19 +63,32 @@ async def __aenter__(self):
raise
return self

async def broadcast(self, data: Union[bool, int, float, str, bytes]) -> None:
if data is None:
async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override]
if len(data) == 0:
raise ValueError("Broadcast must be a value or tuple.")
if None in data:
await self._broadcaster.stop_broadcast(self._adv)
else:
if not self._broadcaster.is_broadcasting(self._adv):
await self._broadcaster.broadcast(self._adv)
self._adv.message = data
self._adv.message = cast(PybricksBroadcastData, tuple(data))

def observe(
self, channel: int
) -> Optional[Tuple[Union[bool, int, float, str, bytes], ...]]:
advertisement = self._observer.observe(channel)
return advertisement.data if advertisement is not None else None

if advertisement is not None:
if isinstance(advertisement.data, tuple):
return advertisement.data
else:
# TODO: Pybricks does expose single-value broadcasts
# in a single-object tuple. However, that doesn't match
# the type signature of the observe() method. To adhere
# to the type signature, we only return wrapped values.
return (advertisement.data,)
else:
return None

def signal_strength(self, channel: int) -> int:
advertisement = self._observer.observe(channel)
Expand Down
8 changes: 3 additions & 5 deletions tests/fixtures/bluez5_mock.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import sys
import unittest.mock

import dbus
import pytest
from dbus.proxies import ProxyObject
from dbusmock import SpawnedMock
from dbusmock.testcase import PrivateDBus

Expand All @@ -19,7 +19,7 @@ def pytest_runtest_setup(item) -> None:


@pytest.fixture
def bluez_mock(dbusmock_system: PrivateDBus) -> YieldFixture[dbus.proxies.ProxyObject]:
def bluez_mock(dbusmock_system: PrivateDBus) -> YieldFixture[ProxyObject]:
template = "bluez5"
parameters = {
"advertise": True,
Expand All @@ -34,9 +34,7 @@ def bluez_mock(dbusmock_system: PrivateDBus) -> YieldFixture[dbus.proxies.ProxyO


@pytest.fixture(autouse=True)
def adapter_mock(
bluez_mock: dbus.proxies.ProxyObject, adapter_name: str
) -> YieldFixture[str]:
def adapter_mock(bluez_mock: ProxyObject, adapter_name: str) -> YieldFixture[str]:
device_name = adapter_name

# Mock out the DBus adapter
Expand Down
24 changes: 19 additions & 5 deletions tests/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


class TestPybricksBleDecodeMessage:
# h.ble.broadcast(5)
def test_decode_message_single_object(self):
# channel 200
# single object marker
Expand All @@ -13,22 +14,35 @@ def test_decode_message_single_object(self):
channel, data = decode_message(message)

assert channel == 200
assert not isinstance(data, tuple)
assert isinstance(data, int), type(data)

assert data == 5

# TODO: Check behaviour against reference implementation
# h.ble.broadcast((5,))
def test_decode_message_single_object_tuple(self):
# channel 200
# int8: 5
message = b"\xc8\x61\x05"
channel, data = decode_message(message)

assert channel == 200
assert isinstance(data, tuple), type(data)
assert len(data) == 1

assert data == (5,)

# h.ble.broadcast((5,))
def test_decode_message_single_object_tuple_0(self):
# channel 0
# int8: 5
message = b"\x00a\x05"
channel, data = decode_message(message)

assert channel == 0
assert isinstance(data, tuple)
assert len(data) == 1

assert data[0] == 5
assert data == (5,)

def test_decode_message_int8_int16_int32(self):
# channel: 200
Expand Down Expand Up @@ -127,9 +141,9 @@ def test_encode_message_single_object(self):
data = encode_message(200, 5)
assert data == b"\xc8\x00\x61\x05"

@pytest.mark.skip("Check behaviour against reference implementation")
@pytest.mark.skip("Encoding single-object tuples is not supported")
def test_encode_message_single_object_tuple(self):
data = encode_message(200, (1))
data = encode_message(200, (1,)) # type: ignore[arg-type]
assert data == b"\xc8\x61\x01"

def test_encode_message_int8_int16_int32(self):
Expand Down
26 changes: 24 additions & 2 deletions tests/test_vhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,37 @@ class TestVirtualBLE:
async def test_create_vble(self, adapter):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
assert ble is not None
assert not ble._broadcaster.is_broadcasting()

async def test_observe(self):
async def test_observe_none(self):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
data = ble.observe(2)
assert data is None
assert not ble._broadcaster.is_broadcasting()

async def test_broadcast(self):
async def test_broadcast_single(self):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
await ble.broadcast(42)
assert ble._broadcaster.is_broadcasting()
assert ble._adv.message == 42

async def test_broadcast_multiple(self):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
await ble.broadcast(42, 24)
assert ble._broadcaster.is_broadcasting()
assert ble._adv.message == (42, 24)

async def test_broadcast_none(self):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
await ble.broadcast(None)
assert not ble._broadcaster.is_broadcasting()

async def test_broadcast_start_stop(self):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
await ble.broadcast(42)
assert ble._broadcaster.is_broadcasting()
await ble.broadcast(None)
assert not ble._broadcaster.is_broadcasting()

async def test_context(self):
async with await get_virtual_ble(
Expand Down