Skip to content
51 changes: 50 additions & 1 deletion roborock/data/b01_q10/b01_q10_containers.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,102 @@
from ..containers import RoborockBase
"""Data container classes for Q10 B01 devices.

Many of these classes use the `field(metadata={"dps": ...})` convention to map
fields to device Data Points (DPS). This metadata is utilized by the
`update_from_dps` helper in `roborock.devices.traits.b01.q10.common` to
automatically update objects from raw device responses.
"""

from dataclasses import dataclass, field

from ..containers import RoborockBase
from .b01_q10_code_mappings import (
B01_Q10_DP,
YXBackType,
YXDeviceCleanTask,
YXDeviceState,
YXDeviceWorkMode,
YXFanLevel,
YXWaterLevel,
)


@dataclass
class dpCleanRecord(RoborockBase):
op: str
result: int
id: str
data: list


@dataclass
class dpMultiMap(RoborockBase):
op: str
result: int
data: list


@dataclass
class dpGetCarpet(RoborockBase):
op: str
result: int
data: str


@dataclass
class dpSelfIdentifyingCarpet(RoborockBase):
op: str
result: int
data: str


@dataclass
class dpNetInfo(RoborockBase):
wifiName: str
ipAdress: str
mac: str
signal: int


@dataclass
class dpNotDisturbExpand(RoborockBase):
disturb_dust_enable: int
disturb_light: int
disturb_resume_clean: int
disturb_voice: int


@dataclass
class dpCurrentCleanRoomIds(RoborockBase):
room_id_list: list


@dataclass
class dpVoiceVersion(RoborockBase):
version: int


@dataclass
class dpTimeZone(RoborockBase):
timeZoneCity: str
timeZoneSec: int


@dataclass
class Q10Status(RoborockBase):
"""Status for Q10 devices.

Fields are mapped to DPS values using metadata. Objects of this class can be
automatically updated using the `update_from_dps` helper.
"""

clean_time: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_TIME})
clean_area: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_AREA})
battery: int | None = field(default=None, metadata={"dps": B01_Q10_DP.BATTERY})
status: YXDeviceState | None = field(default=None, metadata={"dps": B01_Q10_DP.STATUS})
fan_level: YXFanLevel | None = field(default=None, metadata={"dps": B01_Q10_DP.FAN_LEVEL})
water_level: YXWaterLevel | None = field(default=None, metadata={"dps": B01_Q10_DP.WATER_LEVEL})
clean_count: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_COUNT})
clean_mode: YXDeviceWorkMode | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_MODE})
clean_task_type: YXDeviceCleanTask | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_TASK_TYPE})
back_type: YXBackType | None = field(default=None, metadata={"dps": B01_Q10_DP.BACK_TYPE})
cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEANING_PROGRESS})
19 changes: 16 additions & 3 deletions roborock/data/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def from_dict(cls, data: dict[str, Any]):
if not isinstance(data, dict):
return None
field_types = {field.name: field.type for field in dataclasses.fields(cls)}
result: dict[str, Any] = {}
normalized_data: dict[str, Any] = {}
for orig_key, value in data.items():
key = _decamelize(orig_key)
if (field_type := field_types.get(key)) is None:
if field_types.get(key) is None:
if (log_key := f"{cls.__name__}.{key}") not in RoborockBase._missing_logged:
_LOGGER.debug(
"Key '%s' (decamelized: '%s') not found in %s fields, skipping",
Expand All @@ -104,6 +104,19 @@ def from_dict(cls, data: dict[str, Any]):
)
RoborockBase._missing_logged.add(log_key)
continue
normalized_data[key] = value

result = RoborockBase.convert_dict(field_types, normalized_data)
return cls(**result)

@staticmethod
def convert_dict(types_map: dict[Any, type], data: dict[Any, Any]) -> dict[Any, Any]:
"""Convert a dictionary of values based on a schema map of types."""
result: dict[Any, Any] = {}
for key, value in data.items():
if key not in types_map:
continue
field_type = types_map[key]
if value == "None" or value is None:
result[key] = None
continue
Expand All @@ -124,7 +137,7 @@ def from_dict(cls, data: dict[str, Any]):
_LOGGER.exception(f"Failed to convert {key} with value {value} to type {field_type}")
continue

return cls(**result)
return result

def as_dict(self) -> dict:
return asdict(
Expand Down
14 changes: 9 additions & 5 deletions roborock/devices/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,14 @@ async def connect(self) -> None:
if self._unsub:
raise ValueError("Already connected to the device")
unsub = await self._channel.subscribe(self._on_message)
if self.v1_properties is not None:
try:
try:
if self.v1_properties is not None:
await self.v1_properties.discover_features()
except RoborockException:
unsub()
raise
elif self.b01_q10_properties is not None:
await self.b01_q10_properties.start()
except RoborockException:
Comment on lines 199 to +205
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For B01/Q10 devices, connect() subscribes to _on_message (which currently only logs) and then Q10PropertiesApi.start() creates a second subscription via subscribe_stream(). This means every MQTT message will be decoded/dispatched twice, increasing CPU/log noise and complicating future message routing. Consider skipping the _on_message subscription for B01/Q10 (or making _on_message forward into the Q10 stream/decoder so only one subscription exists).

Copilot uses AI. Check for mistakes.
unsub()
raise
self._logger.info("Connected to device")
self._unsub = unsub

Expand All @@ -214,6 +216,8 @@ async def close(self) -> None:
await self._connect_task
except asyncio.CancelledError:
pass
if self.b01_q10_properties is not None:
await self.b01_q10_properties.close()
if self._unsub:
self._unsub()
self._unsub = None
Expand Down
27 changes: 22 additions & 5 deletions roborock/devices/rpc/b01_q10_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,41 @@
from __future__ import annotations

import logging
from collections.abc import AsyncGenerator
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import (
ParamsType,
encode_mqtt_payload,
)
from roborock.protocols.b01_q10_protocol import ParamsType, decode_rpc_response, encode_mqtt_payload

_LOGGER = logging.getLogger(__name__)


async def stream_decoded_responses(
mqtt_channel: MqttChannel,
) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]:
"""Stream decoded DPS messages received via MQTT."""

async for response_message in mqtt_channel.subscribe_stream():
try:
decoded_dps = decode_rpc_response(response_message)
except RoborockException as ex:
_LOGGER.debug(
"Failed to decode B01 RPC response: %s: %s",
response_message,
ex,
)
continue
yield decoded_dps


async def send_command(
mqtt_channel: MqttChannel,
command: B01_Q10_DP,
params: ParamsType,
) -> None:
Comment on lines +17 to 39
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions adding a send_decoded_command() helper in b01_q10_channel.py that awaits decoded responses with optional DPS filtering, but this file currently only adds stream_decoded_responses() plus send_command(). Either implement the described helper (similar to roborock/devices/rpc/b01_q7_channel.py) or update the PR description/API to match what’s actually being introduced.

Copilot uses AI. Check for mistakes.
"""Send a command on the MQTT channel, without waiting for a response"""
"""Send a command on the MQTT channel, without waiting for a response."""
_LOGGER.debug("Sending B01 MQTT command: cmd=%s params=%s", command, params)
roborock_message = encode_mqtt_payload(command, params)
_LOGGER.debug("Sending MQTT message: %s", roborock_message)
Expand Down
38 changes: 38 additions & 0 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,67 @@
"""Traits for Q10 B01 devices."""

import asyncio
import logging

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel

from .command import CommandTrait
from .status import StatusTrait
from .vacuum import VacuumTrait

__all__ = [
"Q10PropertiesApi",
]

_LOGGER = logging.getLogger(__name__)


class Q10PropertiesApi(Trait):
"""API for interacting with B01 devices."""

command: CommandTrait
"""Trait for sending commands to Q10 devices."""

status: StatusTrait
"""Trait for managing the status of Q10 devices."""

vacuum: VacuumTrait
"""Trait for sending vacuum related commands to Q10 devices."""

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.command = CommandTrait(channel)
self.vacuum = VacuumTrait(self.command)
self.status = StatusTrait()
self._subscribe_task: asyncio.Task[None] | None = None

async def start(self) -> None:
"""Start any necessary subscriptions for the trait."""
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start() overwrites _subscribe_task unconditionally. If start() is called twice, this will leak the first task and create multiple concurrent subscription loops updating the same status object. Add a guard (no-op if already started) or cancel/await the existing task before creating a new one.

Suggested change
"""Start any necessary subscriptions for the trait."""
"""Start any necessary subscriptions for the trait."""
if self._subscribe_task is not None and not self._subscribe_task.done():
# Subscription loop already running; avoid starting another one.
return

Copilot uses AI. Check for mistakes.
self._subscribe_task = asyncio.create_task(self._subscribe_loop())

async def close(self) -> None:
"""Close any resources held by the trait."""
if self._subscribe_task is not None:
self._subscribe_task.cancel()
try:
await self._subscribe_task
except asyncio.CancelledError:
pass
self._subscribe_task = None

Comment on lines +48 to +55
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() only suppresses CancelledError. If _subscribe_task has already exited with an exception (e.g., subscribe_stream() raises during disconnect), await self._subscribe_task will re-raise and make Device.close() fail. Consider checking task.done()/task.exception() and suppressing/logging non-cancellation exceptions during shutdown.

Suggested change
if self._subscribe_task is not None:
self._subscribe_task.cancel()
try:
await self._subscribe_task
except asyncio.CancelledError:
pass
self._subscribe_task = None
if self._subscribe_task is None:
return
# Take a local reference and clear the attribute early to avoid
# reusing a task that is in the process of shutting down.
task = self._subscribe_task
self._subscribe_task = None
# If the task is still running, request cancellation.
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
# Expected during normal shutdown when the task is cancelled.
pass
except Exception:
# Suppress unexpected exceptions from the background task during
# shutdown, but log them for debugging purposes.
_LOGGER.exception(
"Error while waiting for subscribe task to finish during close()"
)

Copilot uses AI. Check for mistakes.
async def refresh(self) -> None:
"""Refresh all traits."""
await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})

async def _subscribe_loop(self) -> None:
"""Persistent loop to listen for status updates."""
async for decoded_dps in stream_decoded_responses(self._channel):
_LOGGER.debug("Received Q10 status update: %s", decoded_dps)
self.status.update_from_dps(decoded_dps)


def create(channel: MqttChannel) -> Q10PropertiesApi:
Expand Down
40 changes: 40 additions & 0 deletions roborock/devices/traits/b01/q10/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Common utilities for Q10 traits.

This module provides infrastructure for mapping Roborock Data Points (DPS) to
Python dataclass fields and handling the lifecycle of data updates from the
device.
"""

import dataclasses
from typing import Any

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.data.containers import RoborockBase


class DpsDataConverter:
"""Utility to handle the transformation and merging of DPS data into models."""

def __init__(self, dps_type_map: dict[B01_Q10_DP, type], dps_field_map: dict[B01_Q10_DP, str]):
"""Initialize the converter for a specific RoborockBase-derived class."""
self._dps_type_map = dps_type_map
self._dps_field_map = dps_field_map

@classmethod
def from_dataclass(cls, dataclass_type: type[RoborockBase]):
"""Initialize the converter for a specific RoborockBase-derived class."""
dps_type_map: dict[B01_Q10_DP, type] = {}
dps_field_map: dict[B01_Q10_DP, str] = {}
for field_obj in dataclasses.fields(dataclass_type):
if field_obj.metadata and "dps" in field_obj.metadata:
dps_id = field_obj.metadata["dps"]
dps_type_map[dps_id] = field_obj.type
dps_field_map[dps_id] = field_obj.name
return cls(dps_type_map, dps_field_map)

def update_from_dps(self, target: RoborockBase, decoded_dps: dict[B01_Q10_DP, Any]) -> None:
"""Convert and merge raw DPS data into the target object."""
conversions = RoborockBase.convert_dict(self._dps_type_map, decoded_dps)
for dps_id, value in conversions.items():
field_name = self._dps_field_map[dps_id]
setattr(target, field_name, value)
20 changes: 20 additions & 0 deletions roborock/devices/traits/b01/q10/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Status trait for Q10 B01 devices."""

from roborock.data.b01_q10.b01_q10_containers import Q10Status

from .common import DpsDataConverter

_CONVERTER = DpsDataConverter.from_dataclass(Q10Status)


class StatusTrait(Q10Status):
"""Trait for managing the status of Q10 Roborock devices.

This is a thin wrapper around Q10Status that provides the Trait interface.
The current values reflect the most recently received data from the device.
New values can be requested through the `Q10PropertiesApi` refresh method.
"""

def update_from_dps(self, decoded_dps: dict) -> None:
"""Update the trait from raw DPS data."""
_CONVERTER.update_from_dps(self, decoded_dps)
14 changes: 13 additions & 1 deletion roborock/devices/transport/mqtt_channel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Modules for communicating with specific Roborock devices over MQTT."""

import asyncio
import logging
from collections.abc import Callable
from collections.abc import AsyncGenerator, Callable

from roborock.callbacks import decoder_callback
from roborock.data import HomeDataDevice, RRiot, UserData
Expand Down Expand Up @@ -73,6 +74,17 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
dispatch = decoder_callback(self._decoder, callback, _LOGGER)
return await self._mqtt_session.subscribe(self._subscribe_topic, dispatch)

async def subscribe_stream(self) -> AsyncGenerator[RoborockMessage, None]:
"""Subscribe to the device's message stream."""
message_queue: asyncio.Queue[RoborockMessage] = asyncio.Queue()
unsub = await self.subscribe(message_queue.put_nowait)
try:
while True:
message = await message_queue.get()
yield message
finally:
unsub()
Comment on lines +77 to +86
Copy link

Copilot AI Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe_stream() buffers all inbound messages in an unbounded asyncio.Queue(). If the consumer is slower than the producer (or stops iterating without closing), this can grow without bound and increase memory usage. Consider adding a reasonable maxsize plus an overflow strategy (e.g., drop oldest/newest with a debug log), or implement backpressure by awaiting put() in an async callback path.

Copilot uses AI. Check for mistakes.

async def publish(self, message: RoborockMessage) -> None:
"""Publish a command message.

Expand Down
Loading
Loading