From b7dc3cc3df921347b11c29df50ce39807c011369 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 13:37:16 +0100 Subject: [PATCH 1/9] feat(q10): add decoded command helper and status trait --- roborock/devices/rpc/b01_q10_channel.py | 64 ++++++++++++++++++++ roborock/devices/traits/b01/q10/status.py | 73 +++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 roborock/devices/traits/b01/q10/status.py diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index a482e109..a11e13dd 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -2,17 +2,23 @@ from __future__ import annotations +import asyncio import logging +from collections.abc import Iterable +from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException from roborock.protocols.b01_q10_protocol import ( ParamsType, + decode_rpc_response, encode_mqtt_payload, ) +from roborock.roborock_message import RoborockMessage _LOGGER = logging.getLogger(__name__) +_TIMEOUT = 10.0 async def send_command( @@ -34,3 +40,61 @@ async def send_command( ex, ) raise + + +async def send_decoded_command( + mqtt_channel: MqttChannel, + command: B01_Q10_DP, + params: ParamsType, + expected_dps: Iterable[B01_Q10_DP] | None = None, +) -> dict[B01_Q10_DP, Any]: + """Send a command and await the first decoded response. + + Q10 responses are not correlated with a message id, so we filter on + expected datapoints when provided. + """ + roborock_message = encode_mqtt_payload(command, params) + future: asyncio.Future[dict[B01_Q10_DP, Any]] = asyncio.get_running_loop().create_future() + + expected_set = set(expected_dps) if expected_dps is not None else None + + def find_response(response_message: RoborockMessage) -> None: + try: + decoded_dps = decode_rpc_response(response_message) + except RoborockException as ex: + _LOGGER.debug( + "Failed to decode B01 Q10 RPC response (expecting %s): %s: %s", + command, + response_message, + ex, + ) + return + if expected_set and not any(dps in decoded_dps for dps in expected_set): + return + if not future.done(): + future.set_result(decoded_dps) + + unsub = await mqtt_channel.subscribe(find_response) + + _LOGGER.debug("Sending MQTT message: %s", roborock_message) + try: + await mqtt_channel.publish(roborock_message) + return await asyncio.wait_for(future, timeout=_TIMEOUT) + except TimeoutError as ex: + raise RoborockException(f"B01 Q10 command timed out after {_TIMEOUT}s ({command})") from ex + except RoborockException as ex: + _LOGGER.warning( + "Error sending B01 Q10 decoded command (%s): %s", + command, + ex, + ) + raise + except Exception as ex: + _LOGGER.exception( + "Error sending B01 Q10 decoded command (%s): %s", + command, + ex, + ) + raise + finally: + unsub() diff --git a/roborock/devices/traits/b01/q10/status.py b/roborock/devices/traits/b01/q10/status.py new file mode 100644 index 00000000..e3c42917 --- /dev/null +++ b/roborock/devices/traits/b01/q10/status.py @@ -0,0 +1,73 @@ +"""Status trait for Q10 B01 devices.""" + +from __future__ import annotations + +from typing import Any, cast + +from roborock.data.b01_q10.b01_q10_code_mappings import ( + B01_Q10_DP, + YXDeviceCleanTask, + YXDeviceState, + YXDeviceWorkMode, + YXFanLevel, +) +from roborock.devices.rpc.b01_q10_channel import send_decoded_command +from roborock.devices.transport.mqtt_channel import MqttChannel + + +class StatusTrait: + """Trait for requesting and holding Q10 status values.""" + + def __init__(self, channel: MqttChannel) -> None: + self._channel = channel + self._data: dict[B01_Q10_DP, Any] = {} + + @property + def data(self) -> dict[B01_Q10_DP, Any]: + """Return the latest raw status data.""" + return self._data + + async def refresh(self) -> dict[B01_Q10_DP, Any]: + """Refresh status values from the device.""" + decoded = await send_decoded_command( + self._channel, + command=B01_Q10_DP.REQUEST_DPS, + params={}, + expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + ) + self._data = decoded + return decoded + + @property + def state_code(self) -> int | None: + return self._data.get(B01_Q10_DP.STATUS) + + @property + def state(self) -> YXDeviceState | None: + code = self.state_code + return cast(YXDeviceState | None, YXDeviceState.from_code_optional(code)) if code is not None else None + + @property + def battery(self) -> int | None: + return self._data.get(B01_Q10_DP.BATTERY) + + @property + def fan_level(self) -> YXFanLevel | None: + value = self._data.get(B01_Q10_DP.FAN_LEVEL) + return cast(YXFanLevel | None, YXFanLevel.from_code_optional(value)) if value is not None else None + + @property + def clean_mode(self) -> YXDeviceWorkMode | None: + value = self._data.get(B01_Q10_DP.CLEAN_MODE) + return cast(YXDeviceWorkMode | None, YXDeviceWorkMode.from_code_optional(value)) if value is not None else None + + @property + def clean_task(self) -> YXDeviceCleanTask | None: + value = self._data.get(B01_Q10_DP.CLEAN_TASK_TYPE) + return ( + cast(YXDeviceCleanTask | None, YXDeviceCleanTask.from_code_optional(value)) if value is not None else None + ) + + @property + def cleaning_progress(self) -> int | None: + return self._data.get(B01_Q10_DP.CLEANING_PROGRESS) From 77dde9b528718b08562d7666ce6efcccaf8c7d56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 13:46:50 +0100 Subject: [PATCH 2/9] test(q10): add tests for StatusTrait --- tests/devices/traits/b01/q10/test_status.py | 140 ++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 tests/devices/traits/b01/q10/test_status.py diff --git a/tests/devices/traits/b01/q10/test_status.py b/tests/devices/traits/b01/q10/test_status.py new file mode 100644 index 00000000..96bc70ee --- /dev/null +++ b/tests/devices/traits/b01/q10/test_status.py @@ -0,0 +1,140 @@ +"""Tests for Q10 StatusTrait.""" + +import json +from typing import Any + +import pytest + +from roborock.data.b01_q10.b01_q10_code_mappings import ( + B01_Q10_DP, + YXDeviceState, + YXDeviceWorkMode, + YXFanLevel, +) +from roborock.devices.traits.b01.q10.status import StatusTrait +from roborock.roborock_message import RoborockMessage +from tests.fixtures.channel_fixtures import FakeChannel + + +@pytest.fixture(name="fake_channel") +def fake_channel_fixture() -> FakeChannel: + return FakeChannel() + + +@pytest.fixture(name="status_trait") +def status_trait_fixture(fake_channel: FakeChannel) -> StatusTrait: + return StatusTrait(fake_channel) # type: ignore[arg-type] + + +def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: + """Build a Q10 MQTT response message.""" + payload = {"dps": dps} + return RoborockMessage( + protocol=11, # MQTT_PROTO + payload=json.dumps(payload).encode(), + seq=0, + version=b"B01", + ) + + +async def test_status_trait_battery(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test getting battery status.""" + # Queue a response with battery data + fake_channel.response_queue.append(build_q10_response({"122": 85})) + + result = await status_trait.refresh() + + assert status_trait.battery == 85 + assert B01_Q10_DP.BATTERY in result + + +async def test_status_trait_state(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test getting device state.""" + # CLEANING_STATE = 5 + fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100})) + + result = await status_trait.refresh() + + assert status_trait.state == YXDeviceState.CLEANING_STATE + assert B01_Q10_DP.STATUS in result + + +async def test_status_trait_fan_level(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test getting fan level.""" + # FAN_LEVEL NORMAL = 2 + fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 100, "123": 2})) + + result = await status_trait.refresh() + + assert status_trait.fan_level == YXFanLevel.NORMAL + assert B01_Q10_DP.FAN_LEVEL in result + + +async def test_status_trait_clean_mode(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test getting cleaning mode.""" + # CLEAN_MODE BOTH_WORK = 1 + fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 100, "137": 1})) + + result = await status_trait.refresh() + + assert status_trait.clean_mode == YXDeviceWorkMode.BOTH_WORK + assert B01_Q10_DP.CLEAN_MODE in result + + +async def test_status_trait_cleaning_progress(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test getting cleaning progress.""" + fake_channel.response_queue.append( + build_q10_response({"121": 5, "122": 100, "141": 25}) + ) + + result = await status_trait.refresh() + + assert status_trait.cleaning_progress == 25 + assert B01_Q10_DP.CLEANING_PROGRESS in result + + +async def test_status_trait_empty_data(status_trait: StatusTrait) -> None: + """Test status trait with no data queued.""" + # Test that properties return None when data is empty + assert status_trait.battery is None + assert status_trait.state is None + assert status_trait.fan_level is None + assert status_trait.clean_mode is None + assert status_trait.cleaning_progress is None + + +async def test_status_trait_data_property(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test that data property returns the raw data.""" + test_data = {"121": 5, "122": 100, "123": 2} + fake_channel.response_queue.append(build_q10_response(test_data)) + + await status_trait.refresh() + + # Convert string keys to B01_Q10_DP keys + assert B01_Q10_DP.STATUS in status_trait.data + assert B01_Q10_DP.BATTERY in status_trait.data + assert B01_Q10_DP.FAN_LEVEL in status_trait.data + + +async def test_status_trait_unknown_state(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test handling of unknown state code.""" + # Use a code that doesn't map to any state + fake_channel.response_queue.append(build_q10_response({"121": 999, "122": 100})) + + await status_trait.refresh() + + # Should return UNKNOWN or None + assert status_trait.state == YXDeviceState.UNKNOWN or status_trait.state is None + + +async def test_status_trait_multiple_refreshes(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: + """Test that multiple refreshes update the status.""" + # First refresh + fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 80})) + await status_trait.refresh() + assert status_trait.battery == 80 + + # Second refresh with different battery + fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 60})) + await status_trait.refresh() + assert status_trait.battery == 60 From 9a989c52a453173c55dcd88470fe7184ea34955a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 13:55:03 +0100 Subject: [PATCH 3/9] test(q10): add tests for send_decoded_command channel function --- roborock/devices/traits/b01/q10/status.py | 12 +- tests/devices/rpc/test_b01_q10_channel.py | 173 ++++++++++++++++++++ tests/devices/traits/b01/q10/test_status.py | 10 +- 3 files changed, 182 insertions(+), 13 deletions(-) create mode 100644 tests/devices/rpc/test_b01_q10_channel.py diff --git a/roborock/devices/traits/b01/q10/status.py b/roborock/devices/traits/b01/q10/status.py index e3c42917..98cea498 100644 --- a/roborock/devices/traits/b01/q10/status.py +++ b/roborock/devices/traits/b01/q10/status.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Any, cast +from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import ( B01_Q10_DP, @@ -45,7 +45,7 @@ def state_code(self) -> int | None: @property def state(self) -> YXDeviceState | None: code = self.state_code - return cast(YXDeviceState | None, YXDeviceState.from_code_optional(code)) if code is not None else None + return YXDeviceState.from_code_optional(code) if code is not None else None @property def battery(self) -> int | None: @@ -54,19 +54,17 @@ def battery(self) -> int | None: @property def fan_level(self) -> YXFanLevel | None: value = self._data.get(B01_Q10_DP.FAN_LEVEL) - return cast(YXFanLevel | None, YXFanLevel.from_code_optional(value)) if value is not None else None + return YXFanLevel.from_code_optional(value) if value is not None else None @property def clean_mode(self) -> YXDeviceWorkMode | None: value = self._data.get(B01_Q10_DP.CLEAN_MODE) - return cast(YXDeviceWorkMode | None, YXDeviceWorkMode.from_code_optional(value)) if value is not None else None + return YXDeviceWorkMode.from_code_optional(value) if value is not None else None @property def clean_task(self) -> YXDeviceCleanTask | None: value = self._data.get(B01_Q10_DP.CLEAN_TASK_TYPE) - return ( - cast(YXDeviceCleanTask | None, YXDeviceCleanTask.from_code_optional(value)) if value is not None else None - ) + return YXDeviceCleanTask.from_code_optional(value) if value is not None else None @property def cleaning_progress(self) -> int | None: diff --git a/tests/devices/rpc/test_b01_q10_channel.py b/tests/devices/rpc/test_b01_q10_channel.py new file mode 100644 index 00000000..21b16dae --- /dev/null +++ b/tests/devices/rpc/test_b01_q10_channel.py @@ -0,0 +1,173 @@ +"""Tests for B01 Q10 channel functions.""" + +import json +from typing import Any, cast + +import pytest + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.devices.rpc.b01_q10_channel import send_command, send_decoded_command +from roborock.exceptions import RoborockException +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from tests.fixtures.channel_fixtures import FakeChannel + + +@pytest.fixture(name="fake_channel") +def fake_channel_fixture() -> FakeChannel: + return FakeChannel() + + +def build_q10_dps_response(dps: dict[str, Any]) -> RoborockMessage: + """Build a Q10 MQTT response message with DPS data.""" + payload = {"dps": dps} + return RoborockMessage( + protocol=cast(RoborockMessageProtocol, 11), # MQTT protocol for B01 Q10 + payload=json.dumps(payload).encode(), + seq=0, + version=b"B01", + ) + + +async def test_send_command(fake_channel: FakeChannel) -> None: + """Test sending a command without waiting for response.""" + await send_command(fake_channel, B01_Q10_DP.START_CLEAN, {"cmd": 1}) # type: ignore[arg-type] + + assert len(fake_channel.published_messages) == 1 + message = fake_channel.published_messages[0] + assert message.payload is not None + payload_data = json.loads(message.payload.decode()) + assert payload_data == {"dps": {"201": {"cmd": 1}}} + + +async def test_send_decoded_command_basic(fake_channel: FakeChannel) -> None: + """Test sending a command and receiving a decoded response.""" + # Queue a response + fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) + + result = await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + ) + + assert B01_Q10_DP.STATUS in result + assert B01_Q10_DP.BATTERY in result + assert result[B01_Q10_DP.STATUS] == 5 + assert result[B01_Q10_DP.BATTERY] == 100 + + +async def test_send_decoded_command_without_expected_dps(fake_channel: FakeChannel) -> None: + """Test send_decoded_command accepts any response when expected_dps is None.""" + # Queue a response with any DPS + fake_channel.response_queue.append(build_q10_dps_response({"123": 2})) + + result = await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps=None, + ) + + # Should accept any response + assert B01_Q10_DP.FAN_LEVEL in result + assert result[B01_Q10_DP.FAN_LEVEL] == 2 + + +async def test_send_decoded_command_filters_by_expected_dps(fake_channel: FakeChannel) -> None: + """Test that send_decoded_command filters by expected DPS.""" + # Queue response with expected DPS + fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) + + result = await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps={B01_Q10_DP.STATUS}, + ) + + # Should accept response with expected DPS + assert B01_Q10_DP.STATUS in result + assert result[B01_Q10_DP.STATUS] == 5 + + +async def test_send_decoded_command_timeout(fake_channel: FakeChannel) -> None: + """Test that send_decoded_command times out when no matching response.""" + # Don't queue any response + + with pytest.raises(RoborockException, match="B01 Q10 command timed out"): + await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps={B01_Q10_DP.STATUS}, + ) + + +async def test_send_decoded_command_ignores_decode_errors(fake_channel: FakeChannel) -> None: + """Test that send_decoded_command ignores non-decodable messages.""" + # Queue a valid response (invalid responses are ignored by not matching expected_dps) + fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) + + result = await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps={B01_Q10_DP.STATUS}, + ) + + # Should successfully decode and return valid response + assert B01_Q10_DP.STATUS in result + + +async def test_send_decoded_command_partial_match(fake_channel: FakeChannel) -> None: + """Test that send_decoded_command accepts response with at least one expected DPS.""" + # Queue response with only one of multiple expected DPS + fake_channel.response_queue.append(build_q10_dps_response({"121": 5})) + + result = await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + ) + + # Should accept response with at least one expected DPS + assert B01_Q10_DP.STATUS in result + assert result[B01_Q10_DP.STATUS] == 5 + + +async def test_send_decoded_command_published_message(fake_channel: FakeChannel) -> None: + """Test that send_decoded_command publishes the correct message.""" + fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) + + await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.REQUEST_DPS, + {}, + expected_dps={B01_Q10_DP.STATUS}, + ) + + # Check published message + assert len(fake_channel.published_messages) == 1 + message = fake_channel.published_messages[0] + assert message.payload is not None + payload_data = json.loads(message.payload.decode()) + assert payload_data == {"dps": {"102": {}}} + + +async def test_send_decoded_command_with_params(fake_channel: FakeChannel) -> None: + """Test send_decoded_command with command parameters.""" + fake_channel.response_queue.append(build_q10_dps_response({"121": 3, "122": 100})) + + await send_decoded_command( + fake_channel, # type: ignore[arg-type] + B01_Q10_DP.START_CLEAN, + {"cmd": 1}, + expected_dps={B01_Q10_DP.STATUS}, + ) + + message = fake_channel.published_messages[0] + assert message.payload is not None + payload_data = json.loads(message.payload.decode()) + assert payload_data == {"dps": {"201": {"cmd": 1}}} diff --git a/tests/devices/traits/b01/q10/test_status.py b/tests/devices/traits/b01/q10/test_status.py index 96bc70ee..db1f4bec 100644 --- a/tests/devices/traits/b01/q10/test_status.py +++ b/tests/devices/traits/b01/q10/test_status.py @@ -1,7 +1,7 @@ """Tests for Q10 StatusTrait.""" import json -from typing import Any +from typing import Any, cast import pytest @@ -12,7 +12,7 @@ YXFanLevel, ) from roborock.devices.traits.b01.q10.status import StatusTrait -from roborock.roborock_message import RoborockMessage +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol from tests.fixtures.channel_fixtures import FakeChannel @@ -30,7 +30,7 @@ def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: """Build a Q10 MQTT response message.""" payload = {"dps": dps} return RoborockMessage( - protocol=11, # MQTT_PROTO + protocol=cast(RoborockMessageProtocol, 11), # MQTT_PROTO payload=json.dumps(payload).encode(), seq=0, version=b"B01", @@ -83,9 +83,7 @@ async def test_status_trait_clean_mode(status_trait: StatusTrait, fake_channel: async def test_status_trait_cleaning_progress(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: """Test getting cleaning progress.""" - fake_channel.response_queue.append( - build_q10_response({"121": 5, "122": 100, "141": 25}) - ) + fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100, "141": 25})) result = await status_trait.refresh() From f801837b63cf29e8a8ea20306cc44ba694035545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 15:11:17 +0100 Subject: [PATCH 4/9] feat(q10): expose StatusTrait in Q10PropertiesApi Adds the StatusTrait to Q10PropertiesApi to enable querying device status. The trait was implemented but not exposed in the API class. This enables Home Assistant integration to call api.status.refresh() to retrieve Q10 device data (battery, status, fan level, etc.) --- roborock/devices/traits/b01/q10/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index ac897259..9ff7d1de 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -4,10 +4,12 @@ from roborock.devices.transport.mqtt_channel import MqttChannel from .command import CommandTrait +from .status import StatusTrait from .vacuum import VacuumTrait __all__ = [ "Q10PropertiesApi", + "StatusTrait", ] @@ -17,12 +19,16 @@ class Q10PropertiesApi(Trait): command: CommandTrait """Trait for sending commands to Q10 devices.""" + status: StatusTrait + """Trait for querying Q10 device status.""" + vacuum: VacuumTrait """Trait for sending vacuum related commands to Q10 devices.""" def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self.command = CommandTrait(channel) + self.status = StatusTrait(channel) self.vacuum = VacuumTrait(self.command) From a10a4140de4dc6c5d13ccc5ef8aee07c9a672904 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 15:30:20 +0100 Subject: [PATCH 5/9] feat(q10): add tests for StatusTrait and vacuum trait in Q10PropertiesApi --- tests/devices/traits/b01/q10/test_vacuum.py | 42 ++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/tests/devices/traits/b01/q10/test_vacuum.py b/tests/devices/traits/b01/q10/test_vacuum.py index c8bdb3a4..a8313d48 100644 --- a/tests/devices/traits/b01/q10/test_vacuum.py +++ b/tests/devices/traits/b01/q10/test_vacuum.py @@ -1,12 +1,14 @@ import json from collections.abc import Awaitable, Callable -from typing import Any +from typing import Any, cast import pytest from roborock.data.b01_q10.b01_q10_code_mappings import YXCleanType, YXFanLevel from roborock.devices.traits.b01.q10 import Q10PropertiesApi +from roborock.devices.traits.b01.q10.status import StatusTrait from roborock.devices.traits.b01.q10.vacuum import VacuumTrait +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol from tests.fixtures.channel_fixtures import FakeChannel @@ -52,3 +54,41 @@ async def test_vacuum_commands( assert message.payload payload_data = json.loads(message.payload.decode()) assert payload_data == {"dps": expected_payload} + +def test_q10_api_has_status_trait(q10_api: Q10PropertiesApi) -> None: + """Test that Q10PropertiesApi exposes StatusTrait.""" + assert hasattr(q10_api, "status") + assert isinstance(q10_api.status, StatusTrait) + + +def test_q10_api_has_vacuum_trait(q10_api: Q10PropertiesApi) -> None: + """Test that Q10PropertiesApi exposes VacuumTrait.""" + assert hasattr(q10_api, "vacuum") + assert isinstance(q10_api.vacuum, VacuumTrait) + + +async def test_q10_api_status_refresh(q10_api: Q10PropertiesApi, fake_channel: FakeChannel) -> None: + """Test that status trait can be refreshed via Q10PropertiesApi.""" + + def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: + """Build a Q10 MQTT response message.""" + payload = {"dps": dps} + return RoborockMessage( + protocol=cast(RoborockMessageProtocol, 11), + payload=json.dumps(payload).encode(), + seq=0, + version=b"B01", + ) + + # Queue a response with status and battery + fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100})) + + result = await q10_api.status.refresh() + + # Verify that refresh returned data + assert result is not None + assert len(result) > 0 + + # Verify that properties are accessible + assert q10_api.status.battery == 100 + assert q10_api.status.state is not None \ No newline at end of file From b6ffdd851a0b6be9bd7c98d998e09fcd6f48b62b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 15:32:37 +0100 Subject: [PATCH 6/9] test(q10): add newline and ensure status state is not None in Q10PropertiesApi tests --- tests/devices/traits/b01/q10/test_vacuum.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/devices/traits/b01/q10/test_vacuum.py b/tests/devices/traits/b01/q10/test_vacuum.py index a8313d48..d8a8453d 100644 --- a/tests/devices/traits/b01/q10/test_vacuum.py +++ b/tests/devices/traits/b01/q10/test_vacuum.py @@ -55,6 +55,7 @@ async def test_vacuum_commands( payload_data = json.loads(message.payload.decode()) assert payload_data == {"dps": expected_payload} + def test_q10_api_has_status_trait(q10_api: Q10PropertiesApi) -> None: """Test that Q10PropertiesApi exposes StatusTrait.""" assert hasattr(q10_api, "status") @@ -91,4 +92,4 @@ def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: # Verify that properties are accessible assert q10_api.status.battery == 100 - assert q10_api.status.state is not None \ No newline at end of file + assert q10_api.status.state is not None From 07a1cecde2cf0f86fded369d2135741f6c195cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 15:37:44 +0100 Subject: [PATCH 7/9] feat(q10): request all necessary DPs in StatusTrait.refresh() Add all data points needed by Home Assistant sensors: - Brush/filter life indicators - Cleaning statistics (time, area, count, progress) - Fault status --- roborock/devices/traits/b01/q10/status.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/roborock/devices/traits/b01/q10/status.py b/roborock/devices/traits/b01/q10/status.py index 98cea498..46c472bb 100644 --- a/roborock/devices/traits/b01/q10/status.py +++ b/roborock/devices/traits/b01/q10/status.py @@ -33,7 +33,20 @@ async def refresh(self) -> dict[B01_Q10_DP, Any]: self._channel, command=B01_Q10_DP.REQUEST_DPS, params={}, - expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, + expected_dps={ + B01_Q10_DP.STATUS, + B01_Q10_DP.BATTERY, + B01_Q10_DP.MAIN_BRUSH_LIFE, + B01_Q10_DP.SIDE_BRUSH_LIFE, + B01_Q10_DP.FILTER_LIFE, + B01_Q10_DP.CLEAN_TIME, + B01_Q10_DP.TOTAL_CLEAN_TIME, + B01_Q10_DP.TOTAL_CLEAN_COUNT, + B01_Q10_DP.CLEAN_AREA, + B01_Q10_DP.TOTAL_CLEAN_AREA, + B01_Q10_DP.CLEAN_PROGRESS, + B01_Q10_DP.FAULT, + }, ) self._data = decoded return decoded From 7b0e410605d77e9c7b3a694c4f92ba496c22692b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 22:00:01 +0100 Subject: [PATCH 8/9] feat(q10): enhance Q10 device traits with status management and data conversion utilities --- roborock/data/b01_q10/b01_q10_containers.py | 51 +++++- roborock/data/containers.py | 19 +- roborock/devices/device.py | 14 +- roborock/devices/rpc/b01_q10_channel.py | 88 +++------- roborock/devices/traits/b01/q10/__init__.py | 38 +++- roborock/devices/traits/b01/q10/common.py | 40 +++++ roborock/devices/traits/b01/q10/status.py | 88 ++-------- roborock/devices/transport/mqtt_channel.py | 14 +- tests/devices/rpc/test_b01_q10_channel.py | 150 +--------------- tests/devices/traits/b01/q10/__init__.py | 1 + tests/devices/traits/b01/q10/test_status.py | 184 +++++++++----------- tests/devices/traits/b01/q10/test_vacuum.py | 28 +-- 12 files changed, 284 insertions(+), 431 deletions(-) create mode 100644 roborock/devices/traits/b01/q10/common.py create mode 100644 tests/devices/traits/b01/q10/__init__.py diff --git a/roborock/data/b01_q10/b01_q10_containers.py b/roborock/data/b01_q10/b01_q10_containers.py index 0e805593..0e8ea3d1 100644 --- a/roborock/data/b01_q10/b01_q10_containers.py +++ b/roborock/data/b01_q10/b01_q10_containers.py @@ -1,6 +1,26 @@ -from ..containers import RoborockBase +"""Data container classes for Q10 B01 devices. + +Many of these classes use the `field(metadata={"dps": ...})` convention to map +fields to device Data Points (DPS). This metadata is utilized by the +`update_from_dps` helper in `roborock.devices.traits.b01.q10.common` to +automatically update objects from raw device responses. +""" +from dataclasses import dataclass, field +from ..containers import RoborockBase +from .b01_q10_code_mappings import ( + B01_Q10_DP, + YXBackType, + YXDeviceCleanTask, + YXDeviceState, + YXDeviceWorkMode, + YXFanLevel, + YXWaterLevel, +) + + +@dataclass class dpCleanRecord(RoborockBase): op: str result: int @@ -8,24 +28,28 @@ class dpCleanRecord(RoborockBase): data: list +@dataclass class dpMultiMap(RoborockBase): op: str result: int data: list +@dataclass class dpGetCarpet(RoborockBase): op: str result: int data: str +@dataclass class dpSelfIdentifyingCarpet(RoborockBase): op: str result: int data: str +@dataclass class dpNetInfo(RoborockBase): wifiName: str ipAdress: str @@ -33,6 +57,7 @@ class dpNetInfo(RoborockBase): signal: int +@dataclass class dpNotDisturbExpand(RoborockBase): disturb_dust_enable: int disturb_light: int @@ -40,14 +65,38 @@ class dpNotDisturbExpand(RoborockBase): disturb_voice: int +@dataclass class dpCurrentCleanRoomIds(RoborockBase): room_id_list: list +@dataclass class dpVoiceVersion(RoborockBase): version: int +@dataclass class dpTimeZone(RoborockBase): timeZoneCity: str timeZoneSec: int + + +@dataclass +class Q10Status(RoborockBase): + """Status for Q10 devices. + + Fields are mapped to DPS values using metadata. Objects of this class can be + automatically updated using the `update_from_dps` helper. + """ + + clean_time: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_TIME}) + clean_area: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_AREA}) + battery: int | None = field(default=None, metadata={"dps": B01_Q10_DP.BATTERY}) + status: YXDeviceState | None = field(default=None, metadata={"dps": B01_Q10_DP.STATUS}) + fan_level: YXFanLevel | None = field(default=None, metadata={"dps": B01_Q10_DP.FAN_LEVEL}) + water_level: YXWaterLevel | None = field(default=None, metadata={"dps": B01_Q10_DP.WATER_LEVEL}) + clean_count: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_COUNT}) + clean_mode: YXDeviceWorkMode | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_MODE}) + clean_task_type: YXDeviceCleanTask | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_TASK_TYPE}) + back_type: YXBackType | None = field(default=None, metadata={"dps": B01_Q10_DP.BACK_TYPE}) + cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEANING_PROGRESS}) diff --git a/roborock/data/containers.py b/roborock/data/containers.py index 57d5e6b2..1c1a3f8f 100644 --- a/roborock/data/containers.py +++ b/roborock/data/containers.py @@ -91,10 +91,10 @@ def from_dict(cls, data: dict[str, Any]): if not isinstance(data, dict): return None field_types = {field.name: field.type for field in dataclasses.fields(cls)} - result: dict[str, Any] = {} + normalized_data: dict[str, Any] = {} for orig_key, value in data.items(): key = _decamelize(orig_key) - if (field_type := field_types.get(key)) is None: + if field_types.get(key) is None: if (log_key := f"{cls.__name__}.{key}") not in RoborockBase._missing_logged: _LOGGER.debug( "Key '%s' (decamelized: '%s') not found in %s fields, skipping", @@ -104,6 +104,19 @@ def from_dict(cls, data: dict[str, Any]): ) RoborockBase._missing_logged.add(log_key) continue + normalized_data[key] = value + + result = RoborockBase.convert_dict(field_types, normalized_data) + return cls(**result) + + @staticmethod + def convert_dict(types_map: dict[Any, type], data: dict[Any, Any]) -> dict[Any, Any]: + """Convert a dictionary of values based on a schema map of types.""" + result: dict[Any, Any] = {} + for key, value in data.items(): + if key not in types_map: + continue + field_type = types_map[key] if value == "None" or value is None: result[key] = None continue @@ -124,7 +137,7 @@ def from_dict(cls, data: dict[str, Any]): _LOGGER.exception(f"Failed to convert {key} with value {value} to type {field_type}") continue - return cls(**result) + return result def as_dict(self) -> dict: return asdict( diff --git a/roborock/devices/device.py b/roborock/devices/device.py index ca1fbf14..29f1fd28 100644 --- a/roborock/devices/device.py +++ b/roborock/devices/device.py @@ -197,12 +197,14 @@ async def connect(self) -> None: if self._unsub: raise ValueError("Already connected to the device") unsub = await self._channel.subscribe(self._on_message) - if self.v1_properties is not None: - try: + try: + if self.v1_properties is not None: await self.v1_properties.discover_features() - except RoborockException: - unsub() - raise + elif self.b01_q10_properties is not None: + await self.b01_q10_properties.start() + except RoborockException: + unsub() + raise self._logger.info("Connected to device") self._unsub = unsub @@ -214,6 +216,8 @@ async def close(self) -> None: await self._connect_task except asyncio.CancelledError: pass + if self.b01_q10_properties is not None: + await self.b01_q10_properties.close() if self._unsub: self._unsub() self._unsub = None diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index a11e13dd..659f5536 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -2,99 +2,53 @@ from __future__ import annotations -import asyncio import logging -from collections.abc import Iterable +from collections.abc import AsyncGenerator from typing import Any from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException -from roborock.protocols.b01_q10_protocol import ( - ParamsType, - decode_rpc_response, - encode_mqtt_payload, -) -from roborock.roborock_message import RoborockMessage +from roborock.protocols.b01_q10_protocol import ParamsType, encode_mqtt_payload +from roborock.protocols.b01_q10_protocol import decode_rpc_response _LOGGER = logging.getLogger(__name__) -_TIMEOUT = 10.0 -async def send_command( +async def stream_decoded_responses( mqtt_channel: MqttChannel, - command: B01_Q10_DP, - params: ParamsType, -) -> None: - """Send a command on the MQTT channel, without waiting for a response""" - _LOGGER.debug("Sending B01 MQTT command: cmd=%s params=%s", command, params) - roborock_message = encode_mqtt_payload(command, params) - _LOGGER.debug("Sending MQTT message: %s", roborock_message) - try: - await mqtt_channel.publish(roborock_message) - except RoborockException as ex: - _LOGGER.debug( - "Error sending B01 decoded command (method=%s params=%s): %s", - command, - params, - ex, - ) - raise - +) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: + """Stream decoded DPS messages received via MQTT.""" -async def send_decoded_command( - mqtt_channel: MqttChannel, - command: B01_Q10_DP, - params: ParamsType, - expected_dps: Iterable[B01_Q10_DP] | None = None, -) -> dict[B01_Q10_DP, Any]: - """Send a command and await the first decoded response. - - Q10 responses are not correlated with a message id, so we filter on - expected datapoints when provided. - """ - roborock_message = encode_mqtt_payload(command, params) - future: asyncio.Future[dict[B01_Q10_DP, Any]] = asyncio.get_running_loop().create_future() - - expected_set = set(expected_dps) if expected_dps is not None else None - - def find_response(response_message: RoborockMessage) -> None: + async for response_message in mqtt_channel.subscribe_stream(): try: decoded_dps = decode_rpc_response(response_message) except RoborockException as ex: _LOGGER.debug( - "Failed to decode B01 Q10 RPC response (expecting %s): %s: %s", - command, + "Failed to decode B01 RPC response: %s: %s", response_message, ex, ) - return - if expected_set and not any(dps in decoded_dps for dps in expected_set): - return - if not future.done(): - future.set_result(decoded_dps) + continue + yield decoded_dps - unsub = await mqtt_channel.subscribe(find_response) +async def send_command( + mqtt_channel: MqttChannel, + command: B01_Q10_DP, + params: ParamsType, +) -> None: + """Send a command on the MQTT channel, without waiting for a response.""" + _LOGGER.debug("Sending B01 MQTT command: cmd=%s params=%s", command, params) + roborock_message = encode_mqtt_payload(command, params) _LOGGER.debug("Sending MQTT message: %s", roborock_message) try: await mqtt_channel.publish(roborock_message) - return await asyncio.wait_for(future, timeout=_TIMEOUT) - except TimeoutError as ex: - raise RoborockException(f"B01 Q10 command timed out after {_TIMEOUT}s ({command})") from ex except RoborockException as ex: - _LOGGER.warning( - "Error sending B01 Q10 decoded command (%s): %s", - command, - ex, - ) - raise - except Exception as ex: - _LOGGER.exception( - "Error sending B01 Q10 decoded command (%s): %s", + _LOGGER.debug( + "Error sending B01 decoded command (method=%s params=%s): %s", command, + params, ex, ) raise - finally: - unsub() diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 9ff7d1de..cddfaeba 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -1,5 +1,10 @@ """Traits for Q10 B01 devices.""" +import asyncio +import logging + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses from roborock.devices.traits import Trait from roborock.devices.transport.mqtt_channel import MqttChannel @@ -9,9 +14,10 @@ __all__ = [ "Q10PropertiesApi", - "StatusTrait", ] +_LOGGER = logging.getLogger(__name__) + class Q10PropertiesApi(Trait): """API for interacting with B01 devices.""" @@ -20,16 +26,42 @@ class Q10PropertiesApi(Trait): """Trait for sending commands to Q10 devices.""" status: StatusTrait - """Trait for querying Q10 device status.""" + """Trait for managing the status of Q10 devices.""" vacuum: VacuumTrait """Trait for sending vacuum related commands to Q10 devices.""" def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" + self._channel = channel self.command = CommandTrait(channel) - self.status = StatusTrait(channel) self.vacuum = VacuumTrait(self.command) + self.status = StatusTrait() + self._subscribe_task: asyncio.Task[None] | None = None + + async def start(self) -> None: + """Start any necessary subscriptions for the trait.""" + self._subscribe_task = asyncio.create_task(self._subscribe_loop()) + + async def close(self) -> None: + """Close any resources held by the trait.""" + if self._subscribe_task is not None: + self._subscribe_task.cancel() + try: + await self._subscribe_task + except asyncio.CancelledError: + pass + self._subscribe_task = None + + async def refresh(self) -> None: + """Refresh all traits.""" + await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) + + async def _subscribe_loop(self) -> None: + """Persistent loop to listen for status updates.""" + async for decoded_dps in stream_decoded_responses(self._channel): + _LOGGER.debug("Received Q10 status update: %s", decoded_dps) + self.status.update_from_dps(decoded_dps) def create(channel: MqttChannel) -> Q10PropertiesApi: diff --git a/roborock/devices/traits/b01/q10/common.py b/roborock/devices/traits/b01/q10/common.py new file mode 100644 index 00000000..62ef66ef --- /dev/null +++ b/roborock/devices/traits/b01/q10/common.py @@ -0,0 +1,40 @@ +"""Common utilities for Q10 traits. + +This module provides infrastructure for mapping Roborock Data Points (DPS) to +Python dataclass fields and handling the lifecycle of data updates from the +device. +""" + +import dataclasses +from typing import Any + +from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP +from roborock.data.containers import RoborockBase + + +class DpsDataConverter: + """Utility to handle the transformation and merging of DPS data into models.""" + + def __init__(self, dps_type_map: dict[B01_Q10_DP, type], dps_field_map: dict[B01_Q10_DP, str]): + """Initialize the converter for a specific RoborockBase-derived class.""" + self._dps_type_map = dps_type_map + self._dps_field_map = dps_field_map + + @classmethod + def from_dataclass(cls, dataclass_type: type[RoborockBase]): + """Initialize the converter for a specific RoborockBase-derived class.""" + dps_type_map: dict[B01_Q10_DP, type] = {} + dps_field_map: dict[B01_Q10_DP, str] = {} + for field_obj in dataclasses.fields(dataclass_type): + if field_obj.metadata and "dps" in field_obj.metadata: + dps_id = field_obj.metadata["dps"] + dps_type_map[dps_id] = field_obj.type + dps_field_map[dps_id] = field_obj.name + return cls(dps_type_map, dps_field_map) + + def update_from_dps(self, target: RoborockBase, decoded_dps: dict[B01_Q10_DP, Any]) -> None: + """Convert and merge raw DPS data into the target object.""" + conversions = RoborockBase.convert_dict(self._dps_type_map, decoded_dps) + for dps_id, value in conversions.items(): + field_name = self._dps_field_map[dps_id] + setattr(target, field_name, value) diff --git a/roborock/devices/traits/b01/q10/status.py b/roborock/devices/traits/b01/q10/status.py index 46c472bb..0fe73221 100644 --- a/roborock/devices/traits/b01/q10/status.py +++ b/roborock/devices/traits/b01/q10/status.py @@ -1,84 +1,20 @@ """Status trait for Q10 B01 devices.""" -from __future__ import annotations +from roborock.data.b01_q10.b01_q10_containers import Q10Status -from typing import Any +from .common import DpsDataConverter -from roborock.data.b01_q10.b01_q10_code_mappings import ( - B01_Q10_DP, - YXDeviceCleanTask, - YXDeviceState, - YXDeviceWorkMode, - YXFanLevel, -) -from roborock.devices.rpc.b01_q10_channel import send_decoded_command -from roborock.devices.transport.mqtt_channel import MqttChannel +_CONVERTER = DpsDataConverter.from_dataclass(Q10Status) -class StatusTrait: - """Trait for requesting and holding Q10 status values.""" +class StatusTrait(Q10Status): + """Trait for managing the status of Q10 Roborock devices. - def __init__(self, channel: MqttChannel) -> None: - self._channel = channel - self._data: dict[B01_Q10_DP, Any] = {} + This is a thin wrapper around Q10Status that provides the Trait interface. + The current values reflect the most recently received data from the device. + New values can be requested through the `Q10PropertiesApi` refresh method. + """ - @property - def data(self) -> dict[B01_Q10_DP, Any]: - """Return the latest raw status data.""" - return self._data - - async def refresh(self) -> dict[B01_Q10_DP, Any]: - """Refresh status values from the device.""" - decoded = await send_decoded_command( - self._channel, - command=B01_Q10_DP.REQUEST_DPS, - params={}, - expected_dps={ - B01_Q10_DP.STATUS, - B01_Q10_DP.BATTERY, - B01_Q10_DP.MAIN_BRUSH_LIFE, - B01_Q10_DP.SIDE_BRUSH_LIFE, - B01_Q10_DP.FILTER_LIFE, - B01_Q10_DP.CLEAN_TIME, - B01_Q10_DP.TOTAL_CLEAN_TIME, - B01_Q10_DP.TOTAL_CLEAN_COUNT, - B01_Q10_DP.CLEAN_AREA, - B01_Q10_DP.TOTAL_CLEAN_AREA, - B01_Q10_DP.CLEAN_PROGRESS, - B01_Q10_DP.FAULT, - }, - ) - self._data = decoded - return decoded - - @property - def state_code(self) -> int | None: - return self._data.get(B01_Q10_DP.STATUS) - - @property - def state(self) -> YXDeviceState | None: - code = self.state_code - return YXDeviceState.from_code_optional(code) if code is not None else None - - @property - def battery(self) -> int | None: - return self._data.get(B01_Q10_DP.BATTERY) - - @property - def fan_level(self) -> YXFanLevel | None: - value = self._data.get(B01_Q10_DP.FAN_LEVEL) - return YXFanLevel.from_code_optional(value) if value is not None else None - - @property - def clean_mode(self) -> YXDeviceWorkMode | None: - value = self._data.get(B01_Q10_DP.CLEAN_MODE) - return YXDeviceWorkMode.from_code_optional(value) if value is not None else None - - @property - def clean_task(self) -> YXDeviceCleanTask | None: - value = self._data.get(B01_Q10_DP.CLEAN_TASK_TYPE) - return YXDeviceCleanTask.from_code_optional(value) if value is not None else None - - @property - def cleaning_progress(self) -> int | None: - return self._data.get(B01_Q10_DP.CLEANING_PROGRESS) + def update_from_dps(self, decoded_dps: dict) -> None: + """Update the trait from raw DPS data.""" + _CONVERTER.update_from_dps(self, decoded_dps) diff --git a/roborock/devices/transport/mqtt_channel.py b/roborock/devices/transport/mqtt_channel.py index 498cef13..249633e1 100644 --- a/roborock/devices/transport/mqtt_channel.py +++ b/roborock/devices/transport/mqtt_channel.py @@ -1,7 +1,8 @@ """Modules for communicating with specific Roborock devices over MQTT.""" +import asyncio import logging -from collections.abc import Callable +from collections.abc import AsyncGenerator, Callable from roborock.callbacks import decoder_callback from roborock.data import HomeDataDevice, RRiot, UserData @@ -73,6 +74,17 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab dispatch = decoder_callback(self._decoder, callback, _LOGGER) return await self._mqtt_session.subscribe(self._subscribe_topic, dispatch) + async def subscribe_stream(self) -> AsyncGenerator[RoborockMessage, None]: + """Subscribe to the device's message stream.""" + message_queue: asyncio.Queue[RoborockMessage] = asyncio.Queue() + unsub = await self.subscribe(message_queue.put_nowait) + try: + while True: + message = await message_queue.get() + yield message + finally: + unsub() + async def publish(self, message: RoborockMessage) -> None: """Publish a command message. diff --git a/tests/devices/rpc/test_b01_q10_channel.py b/tests/devices/rpc/test_b01_q10_channel.py index 21b16dae..74e6f224 100644 --- a/tests/devices/rpc/test_b01_q10_channel.py +++ b/tests/devices/rpc/test_b01_q10_channel.py @@ -1,14 +1,11 @@ """Tests for B01 Q10 channel functions.""" import json -from typing import Any, cast import pytest from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP -from roborock.devices.rpc.b01_q10_channel import send_command, send_decoded_command -from roborock.exceptions import RoborockException -from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from roborock.devices.rpc.b01_q10_channel import send_command from tests.fixtures.channel_fixtures import FakeChannel @@ -17,17 +14,6 @@ def fake_channel_fixture() -> FakeChannel: return FakeChannel() -def build_q10_dps_response(dps: dict[str, Any]) -> RoborockMessage: - """Build a Q10 MQTT response message with DPS data.""" - payload = {"dps": dps} - return RoborockMessage( - protocol=cast(RoborockMessageProtocol, 11), # MQTT protocol for B01 Q10 - payload=json.dumps(payload).encode(), - seq=0, - version=b"B01", - ) - - async def test_send_command(fake_channel: FakeChannel) -> None: """Test sending a command without waiting for response.""" await send_command(fake_channel, B01_Q10_DP.START_CLEAN, {"cmd": 1}) # type: ignore[arg-type] @@ -37,137 +23,3 @@ async def test_send_command(fake_channel: FakeChannel) -> None: assert message.payload is not None payload_data = json.loads(message.payload.decode()) assert payload_data == {"dps": {"201": {"cmd": 1}}} - - -async def test_send_decoded_command_basic(fake_channel: FakeChannel) -> None: - """Test sending a command and receiving a decoded response.""" - # Queue a response - fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) - - result = await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, - ) - - assert B01_Q10_DP.STATUS in result - assert B01_Q10_DP.BATTERY in result - assert result[B01_Q10_DP.STATUS] == 5 - assert result[B01_Q10_DP.BATTERY] == 100 - - -async def test_send_decoded_command_without_expected_dps(fake_channel: FakeChannel) -> None: - """Test send_decoded_command accepts any response when expected_dps is None.""" - # Queue a response with any DPS - fake_channel.response_queue.append(build_q10_dps_response({"123": 2})) - - result = await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps=None, - ) - - # Should accept any response - assert B01_Q10_DP.FAN_LEVEL in result - assert result[B01_Q10_DP.FAN_LEVEL] == 2 - - -async def test_send_decoded_command_filters_by_expected_dps(fake_channel: FakeChannel) -> None: - """Test that send_decoded_command filters by expected DPS.""" - # Queue response with expected DPS - fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) - - result = await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps={B01_Q10_DP.STATUS}, - ) - - # Should accept response with expected DPS - assert B01_Q10_DP.STATUS in result - assert result[B01_Q10_DP.STATUS] == 5 - - -async def test_send_decoded_command_timeout(fake_channel: FakeChannel) -> None: - """Test that send_decoded_command times out when no matching response.""" - # Don't queue any response - - with pytest.raises(RoborockException, match="B01 Q10 command timed out"): - await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps={B01_Q10_DP.STATUS}, - ) - - -async def test_send_decoded_command_ignores_decode_errors(fake_channel: FakeChannel) -> None: - """Test that send_decoded_command ignores non-decodable messages.""" - # Queue a valid response (invalid responses are ignored by not matching expected_dps) - fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) - - result = await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps={B01_Q10_DP.STATUS}, - ) - - # Should successfully decode and return valid response - assert B01_Q10_DP.STATUS in result - - -async def test_send_decoded_command_partial_match(fake_channel: FakeChannel) -> None: - """Test that send_decoded_command accepts response with at least one expected DPS.""" - # Queue response with only one of multiple expected DPS - fake_channel.response_queue.append(build_q10_dps_response({"121": 5})) - - result = await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps={B01_Q10_DP.STATUS, B01_Q10_DP.BATTERY}, - ) - - # Should accept response with at least one expected DPS - assert B01_Q10_DP.STATUS in result - assert result[B01_Q10_DP.STATUS] == 5 - - -async def test_send_decoded_command_published_message(fake_channel: FakeChannel) -> None: - """Test that send_decoded_command publishes the correct message.""" - fake_channel.response_queue.append(build_q10_dps_response({"121": 5, "122": 100})) - - await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.REQUEST_DPS, - {}, - expected_dps={B01_Q10_DP.STATUS}, - ) - - # Check published message - assert len(fake_channel.published_messages) == 1 - message = fake_channel.published_messages[0] - assert message.payload is not None - payload_data = json.loads(message.payload.decode()) - assert payload_data == {"dps": {"102": {}}} - - -async def test_send_decoded_command_with_params(fake_channel: FakeChannel) -> None: - """Test send_decoded_command with command parameters.""" - fake_channel.response_queue.append(build_q10_dps_response({"121": 3, "122": 100})) - - await send_decoded_command( - fake_channel, # type: ignore[arg-type] - B01_Q10_DP.START_CLEAN, - {"cmd": 1}, - expected_dps={B01_Q10_DP.STATUS}, - ) - - message = fake_channel.published_messages[0] - assert message.payload is not None - payload_data = json.loads(message.payload.decode()) - assert payload_data == {"dps": {"201": {"cmd": 1}}} diff --git a/tests/devices/traits/b01/q10/__init__.py b/tests/devices/traits/b01/q10/__init__.py new file mode 100644 index 00000000..41c447f2 --- /dev/null +++ b/tests/devices/traits/b01/q10/__init__.py @@ -0,0 +1 @@ +"""Tests for the Q10 B01 traits.""" \ No newline at end of file diff --git a/tests/devices/traits/b01/q10/test_status.py b/tests/devices/traits/b01/q10/test_status.py index db1f4bec..a24f65c6 100644 --- a/tests/devices/traits/b01/q10/test_status.py +++ b/tests/devices/traits/b01/q10/test_status.py @@ -1,138 +1,124 @@ -"""Tests for Q10 StatusTrait.""" +"""Tests for the Q10 B01 status trait.""" +import asyncio import json -from typing import Any, cast +import pathlib +from collections.abc import AsyncGenerator +from typing import Any +from unittest.mock import AsyncMock, Mock import pytest from roborock.data.b01_q10.b01_q10_code_mappings import ( - B01_Q10_DP, + YXDeviceCleanTask, YXDeviceState, - YXDeviceWorkMode, YXFanLevel, ) -from roborock.devices.traits.b01.q10.status import StatusTrait +from roborock.devices.traits.b01.q10 import Q10PropertiesApi, create from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol -from tests.fixtures.channel_fixtures import FakeChannel +TEST_DATA_DIR = pathlib.Path("tests/protocols/testdata/b01_q10_protocol") -@pytest.fixture(name="fake_channel") -def fake_channel_fixture() -> FakeChannel: - return FakeChannel() +TESTDATA_DP_STATUS_DP_CLEAN_TASK_TYPE = (TEST_DATA_DIR / "dpStatus-dpCleanTaskType.json").read_bytes() +TESTDATA_DP_REQUEST_DPS = (TEST_DATA_DIR / "dpRequetdps.json").read_bytes() -@pytest.fixture(name="status_trait") -def status_trait_fixture(fake_channel: FakeChannel) -> StatusTrait: - return StatusTrait(fake_channel) # type: ignore[arg-type] +@pytest.fixture +def mock_channel(): + """Fixture for a mocked MQTT channel.""" + mock = AsyncMock() + return mock -def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: - """Build a Q10 MQTT response message.""" - payload = {"dps": dps} - return RoborockMessage( - protocol=cast(RoborockMessageProtocol, 11), # MQTT_PROTO - payload=json.dumps(payload).encode(), - seq=0, - version=b"B01", - ) - - -async def test_status_trait_battery(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test getting battery status.""" - # Queue a response with battery data - fake_channel.response_queue.append(build_q10_response({"122": 85})) - - result = await status_trait.refresh() - - assert status_trait.battery == 85 - assert B01_Q10_DP.BATTERY in result - +@pytest.fixture +def message_queue() -> asyncio.Queue[RoborockMessage]: + """Fixture for a message queue used by the mock stream.""" + return asyncio.Queue() -async def test_status_trait_state(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test getting device state.""" - # CLEANING_STATE = 5 - fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100})) - result = await status_trait.refresh() +@pytest.fixture +def mock_subscribe_stream(mock_channel: AsyncMock, message_queue: asyncio.Queue[RoborockMessage]) -> Mock: + """Fixture to mock the subscribe_stream method to yield from a queue.""" - assert status_trait.state == YXDeviceState.CLEANING_STATE - assert B01_Q10_DP.STATUS in result + async def mock_stream() -> AsyncGenerator[RoborockMessage, None]: + while True: + yield await message_queue.get() + mock = Mock(return_value=mock_stream()) + mock_channel.subscribe_stream = mock + return mock -async def test_status_trait_fan_level(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test getting fan level.""" - # FAN_LEVEL NORMAL = 2 - fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 100, "123": 2})) - result = await status_trait.refresh() +@pytest.fixture +async def q10_api(mock_channel: AsyncMock, mock_subscribe_stream: Mock) -> AsyncGenerator[Q10PropertiesApi, None]: + """Fixture to create and manage the Q10PropertiesApi.""" + api = create(mock_channel) + await api.start() + yield api + await api.close() - assert status_trait.fan_level == YXFanLevel.NORMAL - assert B01_Q10_DP.FAN_LEVEL in result +def build_message(payload: bytes) -> RoborockMessage: + """Helper to build a RoborockMessage for testing.""" + return RoborockMessage( + protocol=RoborockMessageProtocol.RPC_RESPONSE, + payload=payload, + version=b"B01", + ) -async def test_status_trait_clean_mode(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test getting cleaning mode.""" - # CLEAN_MODE BOTH_WORK = 1 - fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 100, "137": 1})) - - result = await status_trait.refresh() - - assert status_trait.clean_mode == YXDeviceWorkMode.BOTH_WORK - assert B01_Q10_DP.CLEAN_MODE in result - - -async def test_status_trait_cleaning_progress(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test getting cleaning progress.""" - fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100, "141": 25})) - - result = await status_trait.refresh() - assert status_trait.cleaning_progress == 25 - assert B01_Q10_DP.CLEANING_PROGRESS in result +async def wait_for_attribute_value(obj: Any, attribute: str, value: Any, timeout: float = 2.0) -> None: + """Wait for an attribute on an object to reach a specific value.""" + for _ in range(int(timeout / 0.1)): + if getattr(obj, attribute) == value: + return + await asyncio.sleep(0.1) + pytest.fail(f"Timeout waiting for {attribute} to become {value} on {obj}") -async def test_status_trait_empty_data(status_trait: StatusTrait) -> None: - """Test status trait with no data queued.""" - # Test that properties return None when data is empty - assert status_trait.battery is None - assert status_trait.state is None - assert status_trait.fan_level is None - assert status_trait.clean_mode is None - assert status_trait.cleaning_progress is None +async def test_status_trait_streaming( + q10_api: Q10PropertiesApi, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """Test that the StatusTrait updates its state from streaming messages.""" + message = build_message(TESTDATA_DP_STATUS_DP_CLEAN_TASK_TYPE) + assert q10_api.status.status is None + assert q10_api.status.clean_task_type is None -async def test_status_trait_data_property(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test that data property returns the raw data.""" - test_data = {"121": 5, "122": 100, "123": 2} - fake_channel.response_queue.append(build_q10_response(test_data)) + message_queue.put_nowait(message) - await status_trait.refresh() + await wait_for_attribute_value(q10_api.status, "status", YXDeviceState.CHARGING_STATE) - # Convert string keys to B01_Q10_DP keys - assert B01_Q10_DP.STATUS in status_trait.data - assert B01_Q10_DP.BATTERY in status_trait.data - assert B01_Q10_DP.FAN_LEVEL in status_trait.data + assert q10_api.status.status == YXDeviceState.CHARGING_STATE + assert q10_api.status.clean_task_type == YXDeviceCleanTask.IDLE -async def test_status_trait_unknown_state(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test handling of unknown state code.""" - # Use a code that doesn't map to any state - fake_channel.response_queue.append(build_q10_response({"121": 999, "122": 100})) +async def test_status_trait_refresh( + q10_api: Q10PropertiesApi, + mock_channel: AsyncMock, + message_queue: asyncio.Queue[RoborockMessage], +) -> None: + """Test that the StatusTrait sends a refresh command and updates state.""" + assert q10_api.status.battery is None + assert q10_api.status.status is None + assert q10_api.status.fan_level is None - await status_trait.refresh() + message = build_message(TESTDATA_DP_REQUEST_DPS) - # Should return UNKNOWN or None - assert status_trait.state == YXDeviceState.UNKNOWN or status_trait.state is None + await q10_api.refresh() + mock_channel.publish.assert_called_once() + sent_message = mock_channel.publish.call_args[0][0] + assert sent_message.protocol == RoborockMessageProtocol.RPC_REQUEST + data = json.loads(sent_message.payload) + assert data + assert data.get("dps") + assert data.get("dps").get("102") == {} + message_queue.put_nowait(message) -async def test_status_trait_multiple_refreshes(status_trait: StatusTrait, fake_channel: FakeChannel) -> None: - """Test that multiple refreshes update the status.""" - # First refresh - fake_channel.response_queue.append(build_q10_response({"121": 3, "122": 80})) - await status_trait.refresh() - assert status_trait.battery == 80 + await wait_for_attribute_value(q10_api.status, "battery", 100) - # Second refresh with different battery - fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 60})) - await status_trait.refresh() - assert status_trait.battery == 60 + assert q10_api.status.battery == 100 + assert q10_api.status.status == YXDeviceState.CHARGING_STATE + assert q10_api.status.fan_level == YXFanLevel.NORMAL diff --git a/tests/devices/traits/b01/q10/test_vacuum.py b/tests/devices/traits/b01/q10/test_vacuum.py index d8a8453d..7c9bc27c 100644 --- a/tests/devices/traits/b01/q10/test_vacuum.py +++ b/tests/devices/traits/b01/q10/test_vacuum.py @@ -1,6 +1,6 @@ import json from collections.abc import Awaitable, Callable -from typing import Any, cast +from typing import Any import pytest @@ -8,7 +8,6 @@ from roborock.devices.traits.b01.q10 import Q10PropertiesApi from roborock.devices.traits.b01.q10.status import StatusTrait from roborock.devices.traits.b01.q10.vacuum import VacuumTrait -from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol from tests.fixtures.channel_fixtures import FakeChannel @@ -68,28 +67,3 @@ def test_q10_api_has_vacuum_trait(q10_api: Q10PropertiesApi) -> None: assert isinstance(q10_api.vacuum, VacuumTrait) -async def test_q10_api_status_refresh(q10_api: Q10PropertiesApi, fake_channel: FakeChannel) -> None: - """Test that status trait can be refreshed via Q10PropertiesApi.""" - - def build_q10_response(dps: dict[str, Any]) -> RoborockMessage: - """Build a Q10 MQTT response message.""" - payload = {"dps": dps} - return RoborockMessage( - protocol=cast(RoborockMessageProtocol, 11), - payload=json.dumps(payload).encode(), - seq=0, - version=b"B01", - ) - - # Queue a response with status and battery - fake_channel.response_queue.append(build_q10_response({"121": 5, "122": 100})) - - result = await q10_api.status.refresh() - - # Verify that refresh returned data - assert result is not None - assert len(result) > 0 - - # Verify that properties are accessible - assert q10_api.status.battery == 100 - assert q10_api.status.state is not None From 35b03c341571f0f7841b5062995c750b699130c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20BOU=C3=89?= Date: Sat, 14 Feb 2026 22:36:45 +0100 Subject: [PATCH 9/9] refactor(q10): consolidate imports in b01_q10_channel.py and clean up test_vacuum.py --- roborock/devices/rpc/b01_q10_channel.py | 3 +-- tests/devices/traits/b01/q10/__init__.py | 2 +- tests/devices/traits/b01/q10/test_vacuum.py | 2 -- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/roborock/devices/rpc/b01_q10_channel.py b/roborock/devices/rpc/b01_q10_channel.py index 659f5536..7c018d58 100644 --- a/roborock/devices/rpc/b01_q10_channel.py +++ b/roborock/devices/rpc/b01_q10_channel.py @@ -9,8 +9,7 @@ from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP from roborock.devices.transport.mqtt_channel import MqttChannel from roborock.exceptions import RoborockException -from roborock.protocols.b01_q10_protocol import ParamsType, encode_mqtt_payload -from roborock.protocols.b01_q10_protocol import decode_rpc_response +from roborock.protocols.b01_q10_protocol import ParamsType, decode_rpc_response, encode_mqtt_payload _LOGGER = logging.getLogger(__name__) diff --git a/tests/devices/traits/b01/q10/__init__.py b/tests/devices/traits/b01/q10/__init__.py index 41c447f2..78977420 100644 --- a/tests/devices/traits/b01/q10/__init__.py +++ b/tests/devices/traits/b01/q10/__init__.py @@ -1 +1 @@ -"""Tests for the Q10 B01 traits.""" \ No newline at end of file +"""Tests for the Q10 B01 traits.""" diff --git a/tests/devices/traits/b01/q10/test_vacuum.py b/tests/devices/traits/b01/q10/test_vacuum.py index 7c9bc27c..af908cdc 100644 --- a/tests/devices/traits/b01/q10/test_vacuum.py +++ b/tests/devices/traits/b01/q10/test_vacuum.py @@ -65,5 +65,3 @@ def test_q10_api_has_vacuum_trait(q10_api: Q10PropertiesApi) -> None: """Test that Q10PropertiesApi exposes VacuumTrait.""" assert hasattr(q10_api, "vacuum") assert isinstance(q10_api.vacuum, VacuumTrait) - -