diff --git a/custom_components/schellenberg_usb/config_flow.py b/custom_components/schellenberg_usb/config_flow.py index ab21588..38b953f 100644 --- a/custom_components/schellenberg_usb/config_flow.py +++ b/custom_components/schellenberg_usb/config_flow.py @@ -3,7 +3,9 @@ from __future__ import annotations import logging -from typing import Any, Awaitable, cast +import re +from collections.abc import Awaitable +from typing import Any, cast import serial # NOTE: blocking open used only to sanity-check connectivity import voluptuous as vol @@ -18,7 +20,10 @@ from homeassistant.helpers.service_info.usb import UsbServiceInfo from .const import ( + CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, + CONF_DEVICE_ID, + CONF_INITIAL_POSITION, CONF_OPEN_TIME, CONF_SERIAL_PORT, DOMAIN, @@ -201,6 +206,7 @@ def __init__(self) -> None: self._pending_device_id: str | None = None self._pending_device_enum: str | None = None self._pending_device_name: str | None = None + self._pending_is_bidirectional: bool = False def _get_calibration_handler(self) -> CalibrationFlowHandler: """Return (and lazily create) the calibration flow handler.""" @@ -218,16 +224,146 @@ async def _await_subentry_result( async def async_step_blind( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: - """Entry point when the user clicks the 'Pair device' button. + """Entry point when the user clicks the 'Add device' button. Home Assistant calls async_step_{subentry_type}() where subentry_type is the key returned by async_get_supported_subentry_types. Since our type is - 'blind', we implement async_step_blind(). Previously this was named - async_step_pairing, which caused the flow to fall back and the - translation key for the initiate button to be missing. + 'blind', we implement async_step_blind(). Delegates to async_step_menu + so the user can choose between auto-pair and manual-add. """ - _LOGGER.debug("Subentry blind flow initiated (pairing new device)") - return await self.async_step_user(user_input) + _LOGGER.debug("Subentry blind flow initiated") + return await self.async_step_menu(user_input) + + async def async_step_menu( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Show menu: Pair automatically or Add manually.""" + return self.async_show_menu( + step_id="menu", + menu_options=["user", "manual_add"], + ) + + async def async_step_manual_add( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Collect device enum, mode, and optional name for manual-add.""" + errors: dict[str, str] = {} + + if user_input is not None: + # Normalize to uppercase before validation and storage (Pitfall 4) + device_enum = user_input.get("device_enum", "").upper() + + # Format check: exactly 2 hex characters + if not re.match(r"^[0-9A-Fa-f]{2}$", device_enum): + errors["device_enum"] = "invalid_enum_format" + else: + # Duplicate check across existing blind subentries + hub_entry = self._get_entry() + existing_enums = { + s.data.get("device_enum") + for s in hub_entry.subentries.values() + if s.subentry_type == SUBENTRY_TYPE_BLIND + } + if device_enum in existing_enums: + errors["device_enum"] = "duplicate_enum" + + if not errors: + # Resolve mode — BooleanSelector returns a real Python bool + is_bidirectional: bool = bool( + user_input.get(CONF_BIDIRECTIONAL, True) + ) + device_name = ( + user_input.get("device_name") or f"Blind {device_enum}" + ) + self._pending_device_enum = device_enum + self._pending_device_name = device_name + self._pending_is_bidirectional = is_bidirectional + + if is_bidirectional: + _LOGGER.info( + "Creating bidirectional manual subentry for enum %s", + device_enum, + ) + return self.async_create_entry( + title=device_name, + data={ + CONF_DEVICE_ID: device_enum, + "device_enum": device_enum, + CONF_BIDIRECTIONAL: True, + }, + unique_id=device_enum, + ) + # Timed: advance to initial-position step + _LOGGER.debug( + "Timed motor %s: advancing to position step", device_enum + ) + return await self.async_step_manual_position() + + return self.async_show_form( + step_id="manual_add", + data_schema=vol.Schema( + { + vol.Required("device_enum"): selector.TextSelector(), + vol.Required( + CONF_BIDIRECTIONAL, default=True + ): selector.BooleanSelector(), + vol.Optional("device_name"): selector.TextSelector(), + } + ), + errors=errors, + ) + + async def async_step_manual_position( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Collect initial position for timed motors (shown only after mode=timed).""" + if not self._pending_device_enum: + return self.async_abort(reason="pairing_failed") + + if user_input is not None: + initial_position = int(user_input.get("initial_position", 100)) + # Clamp to 0-100 as defense in depth (slider already bounds, but be safe) + initial_position = max(0, min(100, initial_position)) + device_enum = self._pending_device_enum or "" + device_name = self._pending_device_name or f"Blind {device_enum}" + _LOGGER.info( + "Creating timed manual subentry for enum %s at initial position %d%%", + device_enum, + initial_position, + ) + return self.async_create_entry( + title=device_name, + data={ + CONF_DEVICE_ID: device_enum, + "device_enum": device_enum, + CONF_BIDIRECTIONAL: False, + CONF_INITIAL_POSITION: initial_position, + }, + unique_id=device_enum, + ) + + return self.async_show_form( + step_id="manual_position", + data_schema=vol.Schema( + { + vol.Optional( + "initial_position", default=100 + ): selector.NumberSelector( + selector.NumberSelectorConfig( + min=0, + max=100, + step=1, + unit_of_measurement="%", + mode=selector.NumberSelectorMode.SLIDER, + ) + ), + } + ), + description_placeholders={ + "device_name": self._pending_device_name or "", + }, + last_step=True, + ) async def async_step_user( self, user_input: dict[str, Any] | None = None @@ -329,6 +465,19 @@ async def async_step_reconfigure( if not device_id: return self.async_abort(reason="device_not_found") + # Guard: timed motors cannot calibrate via the event-waiting CalibrationFlowHandler + # (they never send EVENT_STARTED_MOVING_*/EVENT_STOPPED, so calibration hangs). + # Use the same missing-key default as cover.py (True = bidirectional) so legacy + # flag-less subentries are still treated as bidirectional here (REVIEW-2, T-02-04). + # Timed-motor calibration is deferred to Phase 4 / CAL-01. + is_bidirectional = bool(subentry.data.get(CONF_BIDIRECTIONAL, True)) + if not is_bidirectional: + _LOGGER.debug( + "Reconfigure blocked for timed motor %s: calibration not yet supported", + device_id, + ) + return self.async_abort(reason="timed_calibration_unavailable") + # Build a minimal device record; calibration handler will enrich after timing device_name = subentry.title or f"Blind {device_id}" handler.set_selected_device( diff --git a/custom_components/schellenberg_usb/const.py b/custom_components/schellenberg_usb/const.py index 694426a..f877d63 100644 --- a/custom_components/schellenberg_usb/const.py +++ b/custom_components/schellenberg_usb/const.py @@ -105,3 +105,7 @@ CONF_OPEN_TIME = "open_time" # Time it takes to open (up) in seconds CONF_CLOSE_TIME = "close_time" # Time it takes to close (down) in seconds CONF_DEVICE_ID = "device_id" # Device ID for calibration + +# Manual-add device mode flag (stored in subentry.data) +CONF_BIDIRECTIONAL = "bidirectional" # bool; False = timed/non-bidirectional +CONF_INITIAL_POSITION = "initial_position" # int 0-100; timed motors only diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 700289e..d693186 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -24,7 +24,9 @@ CMD_DOWN, CMD_STOP, CMD_UP, + CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, + CONF_INITIAL_POSITION, CONF_OPEN_TIME, CONF_SERIAL_PORT, DOMAIN, @@ -213,6 +215,7 @@ class SchellenbergCover(CoverEntity, RestoreEntity): _attr_has_entity_name = True _attr_should_poll = False + _unrecorded_attributes = frozenset({"mode"}) _attr_supported_features = ( CoverEntityFeature.OPEN @@ -260,6 +263,25 @@ def __init__( CONF_CLOSE_TIME, DEFAULT_TRAVEL_TIME ) + # Mode flag: True = bidirectional (can receive events), False = timed. + # Read-default is True so legacy Phase-1 auto-paired subentries that have + # NO CONF_BIDIRECTIONAL key are treated as bidirectional — preventing a + # CTRL-05 regression (Phase 3 would route them through timed control). + # Manual adds ALWAYS write the key explicitly, so this default only + # affects pre-existing flag-less subentries. (Phase 2 known limitation: + # bidirectional manual adds store device_id as 2-char enum, so inbound + # 6-char ss-frame device_id matches will miss _registered_devices — see + # RESEARCH.md "Signal Filter Coupling". No fix needed for timed motors + # as they produce no inbound frames. Tracked for a v2 story.) + self._is_bidirectional: bool = bool( + device_data_dict.get(CONF_BIDIRECTIONAL, True) + ) + self._initial_position: int | None = ( + int(device_data_dict[CONF_INITIAL_POSITION]) + if CONF_INITIAL_POSITION in device_data_dict + else None + ) + self._move_start_time: float | None = None self._move_start_position: int | None = None self._position_update_task: asyncio.Task[None] | None = None @@ -286,6 +308,13 @@ def entity_registry_enabled_default(self) -> bool: """Return if entity should be enabled by default.""" return True + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return device-specific state attributes.""" + return { + "mode": "bidirectional" if self._is_bidirectional else "timed", + } + async def async_added_to_hass(self) -> None: """Run when entity about to be added to hass.""" await super().async_added_to_hass() @@ -329,13 +358,24 @@ async def async_added_to_hass(self) -> None: ) if self._attr_current_cover_position is None: - self._attr_current_cover_position = 0 - self._attr_is_closed = True - _LOGGER.debug( - "No previous state for %s (%s); defaulting position to 0%% (closed)", - self._attr_name, - self._device_id, - ) + if self._initial_position is not None: + self._attr_current_cover_position = max( + 0, min(100, self._initial_position) + ) + self._attr_is_closed = self._attr_current_cover_position == 0 + _LOGGER.debug( + "Seeding initial position for %s to %d%% from subentry.data", + self._attr_name, + self._attr_current_cover_position, + ) + else: + self._attr_current_cover_position = 0 + self._attr_is_closed = True + _LOGGER.debug( + "No previous state for %s (%s); defaulting position to 0%% (closed)", + self._attr_name, + self._device_id, + ) self.async_write_ha_state() @@ -500,6 +540,10 @@ async def _async_position_update_loop(self) -> None: if self._target_position not in (0, 100): await self._api.control_blind(self._device_enum, CMD_STOP) + self._attr_is_opening = False + self._attr_is_closing = False + self._attr_is_closed = self._attr_current_cover_position == 0 + self._target_position = None self._position_update_task = None self._move_start_time = None self._move_start_position = None diff --git a/custom_components/schellenberg_usb/manifest.json b/custom_components/schellenberg_usb/manifest.json index 10d7b80..5b6a1fe 100644 --- a/custom_components/schellenberg_usb/manifest.json +++ b/custom_components/schellenberg_usb/manifest.json @@ -15,5 +15,5 @@ "manufacturer": "van ooijen" } ], - "version": "1.0.0" + "version": "1.1.0" } diff --git a/custom_components/schellenberg_usb/strings.json b/custom_components/schellenberg_usb/strings.json index 1e9f943..12ccdc3 100644 --- a/custom_components/schellenberg_usb/strings.json +++ b/custom_components/schellenberg_usb/strings.json @@ -4,8 +4,7 @@ "menu": { "title": "Schellenberg USB setup", "menu_options": { - "user": "Set up USB hub", - "pair_device_menu": "Pair new device" + "user": "Set up USB hub" } }, "user": { @@ -38,10 +37,33 @@ "blind": { "entry_type": "Blind", "initiate_flow": { - "user": "Pair device", + "user": "Add device", "reconfigure": "Calibrate" }, "step": { + "menu": { + "title": "Add a blind", + "menu_options": { + "user": "Pair automatically", + "manual_add": "Add manually (already paired)" + } + }, + "manual_add": { + "title": "Add blind by transmit address", + "description": "Enter the 2-character hex transmit address (enum slot) the motor already listens to. Example: 10, 1A, 2B.", + "data": { + "device_enum": "Transmit address (hex)", + "bidirectional": "Bidirectional motor", + "device_name": "Friendly name (optional)" + } + }, + "manual_position": { + "title": "Set initial position", + "description": "Where is {device_name} right now? This sets the starting position for the position slider. Note: positioning for timed motors uses a default travel time and is uncalibrated until calibration support is added in a later update.", + "data": { + "initial_position": "Current position" + } + }, "user": { "title": "Pair a new device", "description": "Press the pair button to activate pairing mode on your Schellenberg USB stick. Then press the pairing button on your blind motor within 2 minutes.", @@ -80,12 +102,15 @@ "pairing_failed": "Pairing failed.", "calibration_timeout": "Calibration timed out. The blind took more than 5 minutes to complete the movement. Please check the device and try again.", "calibration_start_timeout": "Timeout waiting for the blind to start moving. Please ensure the device is properly connected and try again.", - "unknown": "An unexpected error occurred during calibration. Please try again." + "unknown": "An unexpected error occurred during calibration. Please try again.", + "invalid_enum_format": "Transmit address must be exactly 2 hexadecimal characters (e.g. 10, 1A, 2B).", + "duplicate_enum": "A blind with this transmit address is already added to the integration." }, "abort": { "pairing_timeout": "No device responded within 2 minutes. Please try again.", "pairing_failed": "Pairing failed.", - "reconfigure_successful": "Calibration completed successfully" + "reconfigure_successful": "Calibration completed successfully", + "timed_calibration_unavailable": "Calibration for timed motors is not yet available. It will be added in a future update." } } }, diff --git a/custom_components/schellenberg_usb/translations/en.json b/custom_components/schellenberg_usb/translations/en.json index 9ad7f95..6a86371 100644 --- a/custom_components/schellenberg_usb/translations/en.json +++ b/custom_components/schellenberg_usb/translations/en.json @@ -4,8 +4,7 @@ "menu": { "title": "Schellenberg USB setup", "menu_options": { - "user": "Set up USB hub", - "pair_device_menu": "Pair new device" + "user": "Set up USB hub" } }, "user": { @@ -38,10 +37,33 @@ "blind": { "entry_type": "Blind", "initiate_flow": { - "user": "Pair device", + "user": "Add device", "reconfigure": "Calibrate" }, "step": { + "menu": { + "title": "Add a blind", + "menu_options": { + "user": "Pair automatically", + "manual_add": "Add manually (already paired)" + } + }, + "manual_add": { + "title": "Add blind by transmit address", + "description": "Enter the 2-character hex transmit address (enum slot) the motor already listens to. Example: 10, 1A, 2B.", + "data": { + "device_enum": "Transmit address (hex)", + "bidirectional": "Bidirectional motor", + "device_name": "Friendly name (optional)" + } + }, + "manual_position": { + "title": "Set initial position", + "description": "Where is {device_name} right now? This sets the starting position for the position slider. Note: positioning for timed motors uses a default travel time and is uncalibrated until calibration support is added in a later update.", + "data": { + "initial_position": "Current position" + } + }, "user": { "title": "Pair a new device", "description": "Press the pair button to activate pairing mode on your Schellenberg USB stick. Then press the pairing button on your blind motor within 2 minutes.", @@ -80,12 +102,15 @@ "pairing_failed": "Pairing failed.", "calibration_timeout": "Calibration timed out. The blind took more than 5 minutes to complete the movement. Please check the device and try again.", "calibration_start_timeout": "Timeout waiting for the blind to start moving. Please ensure the device is properly connected and try again.", - "unknown": "An unexpected error occurred during calibration. Please try again." + "unknown": "An unexpected error occurred during calibration. Please try again.", + "invalid_enum_format": "Transmit address must be exactly 2 hexadecimal characters (e.g. 10, 1A, 2B).", + "duplicate_enum": "A blind with this transmit address is already added to the integration." }, "abort": { "pairing_timeout": "No device responded within 2 minutes. Please try again.", "pairing_failed": "Pairing failed.", - "reconfigure_successful": "Calibration completed successfully" + "reconfigure_successful": "Calibration completed successfully", + "timed_calibration_unavailable": "Calibration for timed motors is not yet available. It will be added in a future update." } } }, diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py new file mode 100644 index 0000000..0dd2051 --- /dev/null +++ b/tests/test_config_flow.py @@ -0,0 +1,419 @@ +"""Tests for the blind subentry manual-add flow.""" + +from __future__ import annotations + +from types import MappingProxyType +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from homeassistant.config_entries import ConfigEntry, ConfigEntryState +from homeassistant.core import HomeAssistant + +from custom_components.schellenberg_usb.const import ( + CONF_BIDIRECTIONAL, + CONF_SERIAL_PORT, + DOMAIN, + SUBENTRY_TYPE_BLIND, +) +from custom_components.schellenberg_usb.config_flow import ( + SchellenbergPairingSubentryFlow, +) + + +def _make_handler( + hass: HomeAssistant, entry_id: str +) -> SchellenbergPairingSubentryFlow: + """Create a flow handler bound to the given hub entry. + + ConfigSubentryFlow._get_entry() reads self.handler[0] (the entry_id portion of + a (entry_id, subentry_type) tuple). async_create_entry requires source == 'user'. + """ + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = (entry_id, SUBENTRY_TYPE_BLIND) + handler.context = {"source": "user"} + return handler + + +@pytest.fixture +def mock_hub_entry(hass: HomeAssistant) -> ConfigEntry: + """Create a mock hub config entry.""" + entry = ConfigEntry( + version=1, + domain=DOMAIN, + title="Schellenberg USB", + data={CONF_SERIAL_PORT: "/dev/ttyUSB0"}, + options={}, + entry_id="test_manual_flow_entry", + state=ConfigEntryState.NOT_LOADED, + minor_version=1, + source="test", + unique_id=None, + discovery_keys=MappingProxyType({}), + subentries_data=None, + ) + hass.config_entries._entries[entry.entry_id] = entry + return entry + + +@pytest.mark.asyncio +async def test_manual_add_menu_shown( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Menu step returns show_menu with 'user' and 'manual_add' options.""" + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_blind(None) + + assert result["type"] == "menu" + assert "user" in result["menu_options"] + assert "manual_add" in result["menu_options"] + + +@pytest.mark.asyncio +async def test_manual_add_creates_subentry( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Bidirectional manual-add with a valid enum creates an entry without api.pair_device_and_wait.""" + handler = _make_handler(hass, mock_hub_entry.entry_id) + + mock_api = MagicMock() + mock_api.pair_device_and_wait = AsyncMock() + mock_hub_entry.runtime_data = mock_api # type: ignore[attr-defined] + + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: True, + "device_name": "Test Blind", + } + ) + + assert result["type"] == "create_entry" + mock_api.pair_device_and_wait.assert_not_called() + + +@pytest.mark.asyncio +async def test_manual_add_mode_flag( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Timed selection stores CONF_BIDIRECTIONAL False; bidirectional stores True.""" + # Timed: goes through position step + handler_timed = _make_handler(hass, mock_hub_entry.entry_id) + + result_timed = await handler_timed.async_step_manual_add( + { + "device_enum": "2B", + CONF_BIDIRECTIONAL: False, + "device_name": "Timed Blind", + } + ) + # Timed goes to position step first + assert result_timed["type"] == "form" + assert result_timed["step_id"] == "manual_position" + + # Complete timed: submit position step + result_timed_entry = await handler_timed.async_step_manual_position( + {"initial_position": 100} + ) + assert result_timed_entry["type"] == "create_entry" + assert result_timed_entry["data"][CONF_BIDIRECTIONAL] is False + + # Bidirectional: goes straight to create_entry + handler_bi = _make_handler(hass, mock_hub_entry.entry_id) + + result_bi = await handler_bi.async_step_manual_add( + { + "device_enum": "3C", + CONF_BIDIRECTIONAL: True, + "device_name": "Bi Blind", + } + ) + assert result_bi["type"] == "create_entry" + assert result_bi["data"][CONF_BIDIRECTIONAL] is True + + +@pytest.mark.asyncio +async def test_manual_add_invalid_enum( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Inputs 'XY', '100', '' each return form errors with invalid_enum_format.""" + for bad_input in ("XY", "100", ""): + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": bad_input, + CONF_BIDIRECTIONAL: False, + "device_name": "", + } + ) + assert result["type"] == "form", ( + f"Expected form for input {bad_input!r}, got {result['type']!r}" + ) + assert result["errors"].get("device_enum") == "invalid_enum_format", ( + f"Expected invalid_enum_format for input {bad_input!r}, " + f"got {result['errors']!r}" + ) + + +@pytest.mark.asyncio +async def test_manual_add_duplicate_enum( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """An enum already present in a blind subentry returns duplicate_enum error.""" + existing_sub = MagicMock() + existing_sub.subentry_type = SUBENTRY_TYPE_BLIND + existing_sub.data = {"device_enum": "1A", "device_id": "1A"} + mock_hub_entry.subentries = MappingProxyType({"existing": existing_sub}) # type: ignore[attr-defined] + + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: False, + "device_name": "", + } + ) + assert result["type"] == "form" + assert result["errors"].get("device_enum") == "duplicate_enum" + + +@pytest.mark.asyncio +async def test_manual_add_device_name( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Provided name becomes the create_entry title; omitted name falls back to 'Blind {ENUM}'.""" + # With name + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: True, + "device_name": "My Living Room Blind", + } + ) + assert result["type"] == "create_entry" + assert result["title"] == "My Living Room Blind" + + # Without name — falls back to "Blind {ENUM}" + handler2 = _make_handler(hass, mock_hub_entry.entry_id) + + result2 = await handler2.async_step_manual_add( + { + "device_enum": "2B", + CONF_BIDIRECTIONAL: True, + "device_name": "", + } + ) + assert result2["type"] == "create_entry" + assert result2["title"] == "Blind 2B" + + +@pytest.mark.asyncio +async def test_manual_add_position_step_timed_only( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Timed mode advances to manual_position form; bidirectional goes straight to create_entry.""" + # Timed: should show position form + handler_timed = _make_handler(hass, mock_hub_entry.entry_id) + + result_timed = await handler_timed.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: False, + "device_name": "Timed", + } + ) + assert result_timed["type"] == "form" + assert result_timed["step_id"] == "manual_position" + + # Bidirectional: should go directly to create_entry + handler_bi = _make_handler(hass, mock_hub_entry.entry_id) + + result_bi = await handler_bi.async_step_manual_add( + { + "device_enum": "2B", + CONF_BIDIRECTIONAL: True, + "device_name": "Bi", + } + ) + assert result_bi["type"] == "create_entry" + + +@pytest.mark.asyncio +async def test_reconfigure_timed_motor_aborts( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Reconfigure on a timed subentry aborts with timed_calibration_unavailable. + + A bidirectional subentry still reaches the calibration path (form, not abort). + """ + from custom_components.schellenberg_usb.options_flow_calibration import ( + CalibrationFlowHandler, + ) + + # --- Timed motor: must abort --- + timed_subentry = MagicMock() + timed_subentry.data = { + "device_id": "1A", + "device_enum": "1A", + CONF_BIDIRECTIONAL: False, + } + timed_subentry.title = "Timed Blind" + + handler_timed = _make_handler(hass, mock_hub_entry.entry_id) + + with patch.object( + handler_timed, "_get_reconfigure_subentry", return_value=timed_subentry + ): + with patch.object( + CalibrationFlowHandler, + "async_step_calibration_close", + new_callable=AsyncMock, + ) as mock_cal_step: + result = await handler_timed.async_step_reconfigure(None) + + assert result["type"] == "abort", ( + f"Expected abort for timed motor, got {result['type']!r}" + ) + assert result["reason"] == "timed_calibration_unavailable", ( + f"Expected timed_calibration_unavailable, got {result['reason']!r}" + ) + # The event-waiting calibration step must NOT have been called + mock_cal_step.assert_not_called() + + # --- Bidirectional motor: must reach calibration path (form, not abort) --- + bi_subentry = MagicMock() + bi_subentry.data = { + "device_id": "ABC123", + "device_enum": "10", + CONF_BIDIRECTIONAL: True, + "open_time": 20.0, + "close_time": 18.0, + } + bi_subentry.title = "Bi Blind" + + mock_api = MagicMock() + mock_api.control_blind = AsyncMock() + mock_hub_entry.runtime_data = mock_api # type: ignore[attr-defined] + + handler_bi = _make_handler(hass, mock_hub_entry.entry_id) + + with patch.object( + handler_bi, "_get_reconfigure_subentry", return_value=bi_subentry + ): + with patch.object( + CalibrationFlowHandler, + "async_step_calibration_close", + new_callable=AsyncMock, + return_value={"type": "form", "step_id": "calibration_close"}, + ): + result_bi = await handler_bi.async_step_reconfigure(None) + + # Bidirectional reconfigure should NOT be an abort with timed reason + assert result_bi.get("reason") != "timed_calibration_unavailable", ( + "Bidirectional motor should not abort with timed_calibration_unavailable" + ) + + +@pytest.mark.asyncio +async def test_manual_add_enum_case_normalized( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Lowercase '1a' collides with existing '1A' after .upper() normalization (Pitfall 4).""" + existing_sub = MagicMock() + existing_sub.subentry_type = SUBENTRY_TYPE_BLIND + existing_sub.data = {"device_enum": "1A", "device_id": "1A"} + mock_hub_entry.subentries = MappingProxyType({"existing": existing_sub}) # type: ignore[attr-defined] + + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": "1a", # lowercase — must be normalized before dedup check + CONF_BIDIRECTIONAL: False, + "device_name": "", + } + ) + assert result["type"] == "form" + assert result["errors"].get("device_enum") == "duplicate_enum", ( + "Expected duplicate_enum: '1a'.upper() == '1A' which already exists" + ) + + +@pytest.mark.asyncio +async def test_manual_add_enum_stored_uppercase( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Submitting '2b' stores device_enum/device_id as '2B' (uppercase) in the entry.""" + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": "2b", + CONF_BIDIRECTIONAL: True, + "device_name": "Uppercase Test", + } + ) + assert result["type"] == "create_entry" + assert result["data"]["device_enum"] == "2B", ( + f"Expected device_enum '2B', got {result['data']['device_enum']!r}" + ) + assert result["data"]["device_id"] == "2B", ( + f"Expected device_id '2B', got {result['data']['device_id']!r}" + ) + + +@pytest.mark.asyncio +async def test_manual_position_aborts_without_pending_state( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """async_step_manual_position aborts when _pending_device_enum is missing (WR-01). + + This guard prevents a broken subentry from being created if the step is + reached without async_step_manual_add having run first (HA re-entrancy, + serialized-flow resume, or future menu wiring change). + """ + handler = _make_handler(hass, mock_hub_entry.entry_id) + # _pending_device_enum is None by default -- do not call manual_add first + + result = await handler.async_step_manual_position(None) + + assert result["type"] == "abort", ( + f"Expected abort when pending state is missing, got {result['type']!r}" + ) + assert result["reason"] == "pairing_failed", ( + f"Expected reason='pairing_failed', got {result.get('reason')!r}" + ) + + +@pytest.mark.asyncio +async def test_manual_add_default_mode_is_bidirectional( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Unmodified manual_add form (no CONF_BIDIRECTIONAL key) defaults to bidirectional (WR-03). + + When a user submits without flipping the toggle, the resolver default must + produce CONF_BIDIRECTIONAL=True (bidirectional), matching the field-common case. + """ + handler = _make_handler(hass, mock_hub_entry.entry_id) + + # Omit CONF_BIDIRECTIONAL entirely to simulate an unmodified form submission + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + # CONF_BIDIRECTIONAL deliberately omitted + "device_name": "Default Mode Blind", + } + ) + + # With bidirectional as default, we expect a direct create_entry (no position step) + assert result["type"] == "create_entry", ( + f"Expected create_entry for bidirectional default, got {result['type']!r}" + ) + assert result["data"][CONF_BIDIRECTIONAL] is True, ( + f"Expected CONF_BIDIRECTIONAL=True (default), got {result['data'][CONF_BIDIRECTIONAL]!r}" + ) diff --git a/tests/test_cover.py b/tests/test_cover.py index 5bfd954..fd57706 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from types import MappingProxyType from typing import Any, cast from unittest.mock import AsyncMock, MagicMock, patch @@ -14,7 +15,9 @@ from custom_components.schellenberg_usb.api import SchellenbergUsbApi from custom_components.schellenberg_usb.const import ( + CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, + CONF_INITIAL_POSITION, CONF_OPEN_TIME, CONF_SERIAL_PORT, DOMAIN, @@ -645,3 +648,202 @@ async def test_cover_will_remove_from_hass( with patch.object(cover, "_stop_position_tracking") as mock_stop: await cover.async_will_remove_from_hass() mock_stop.assert_called_once() + + +@pytest.mark.asyncio +async def test_cover_mode_attribute( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Cover built with CONF_BIDIRECTIONAL True exposes mode='bidirectional'; False -> 'timed'.""" + cover_bi = SchellenbergCover( + api=mock_api, + device_id="1A", + device_enum="1A", + device_name="Bi Cover", + device_data={CONF_BIDIRECTIONAL: True}, + ) + assert cover_bi.extra_state_attributes["mode"] == "bidirectional" + + cover_timed = SchellenbergCover( + api=mock_api, + device_id="2B", + device_enum="2B", + device_name="Timed Cover", + device_data={CONF_BIDIRECTIONAL: False}, + ) + assert cover_timed.extra_state_attributes["mode"] == "timed" + + +@pytest.mark.asyncio +async def test_cover_mode_defaults_bidirectional_when_key_absent( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Legacy subentry with NO CONF_BIDIRECTIONAL key reports mode='bidirectional' (read-default True). + + This prevents CTRL-05 regression: existing auto-paired motors must never be mislabeled timed. + """ + # OMIT CONF_BIDIRECTIONAL key entirely — simulates a Phase-1 legacy subentry + cover = SchellenbergCover( + api=mock_api, + device_id="ABC123", + device_enum="10", + device_name="Legacy Cover", + device_data={"device_id": "ABC123", "device_enum": "10"}, + ) + assert cover.extra_state_attributes["mode"] == "bidirectional" + + +@pytest.mark.asyncio +async def test_cover_initial_position_from_subentry( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Timed cover with CONF_INITIAL_POSITION 100 seeds position to 100 after async_added_to_hass.""" + cover = SchellenbergCover( + api=mock_api, + device_id="1A", + device_enum="1A", + device_name="Timed Cover", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_INITIAL_POSITION: 100, + }, + ) + cover.hass = hass + + with patch.object(cover, "async_get_last_state", return_value=None): + with patch( + "custom_components.schellenberg_usb.cover.async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 100 + assert cover._attr_is_closed is False + + +@pytest.mark.asyncio +async def test_cover_initial_position_clamped( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CONF_INITIAL_POSITION=150 clamps to 100; a restored prior state wins over seeded initial.""" + # Upper-bound clamp: 150 -> 100 + cover_clamped = SchellenbergCover( + api=mock_api, + device_id="2B", + device_enum="2B", + device_name="Clamped Cover", + device_data={CONF_BIDIRECTIONAL: False, CONF_INITIAL_POSITION: 150}, + ) + cover_clamped.hass = hass + + with patch.object(cover_clamped, "async_get_last_state", return_value=None): + with patch( + "custom_components.schellenberg_usb.cover.async_dispatcher_connect" + ): + with patch.object(cover_clamped, "async_write_ha_state"): + await cover_clamped.async_added_to_hass() + + assert cover_clamped._attr_current_cover_position == 100, ( + f"Expected 100 (clamped from 150), got {cover_clamped._attr_current_cover_position}" + ) + + # RestoreEntity precedence: prior state of 50 beats seeded initial of 100 (Pitfall 5) + cover_restored = SchellenbergCover( + api=mock_api, + device_id="3C", + device_enum="3C", + device_name="Restored Cover", + device_data={CONF_BIDIRECTIONAL: False, CONF_INITIAL_POSITION: 100}, + ) + cover_restored.hass = hass + + last_state = State("cover.restored_cover", "open", {"current_position": 50}) + + with patch.object(cover_restored, "async_get_last_state", return_value=last_state): + with patch( + "custom_components.schellenberg_usb.cover.async_dispatcher_connect" + ): + with patch.object(cover_restored, "async_write_ha_state"): + await cover_restored.async_added_to_hass() + + assert cover_restored._attr_current_cover_position == 50, ( + f"Expected 50 (restored state wins), got {cover_restored._attr_current_cover_position}" + ) + + +@pytest.mark.asyncio +async def test_timed_motor_position_loop_clears_flags( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Timed motor: after SET_POSITION target is reached via the real loop, flags are cleared. + + Regression test for CR-01: the position-reached branch used to leave + _attr_is_opening/_attr_is_closing True and _target_position set, causing HA + to render the cover as perpetually moving. This test exercises the real + _async_position_update_loop (no patch on _start_position_tracking) and must + FAIL against pre-fix code and PASS after the fix. + """ + import time as _time + + # Use a very small travel time (0.5 s) so the move completes within ms + # in the event loop. Start at 0% and move UP to 50% -- the loop should + # stop the motor and clear the flags once position >= 50. + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor Test", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 0.5, + CONF_CLOSE_TIME: 0.5, + }, + ) + cover.hass = hass + cover._attr_current_cover_position = 0 + cover._attr_is_closed = True + + # Kick off async_set_cover_position. It internally calls async_open_cover, + # which calls _start_position_tracking -> creates the real loop task. + # We do NOT patch _start_position_tracking here (that is the whole point). + with patch.object(cover, "async_write_ha_state"): + # Set the move start state manually so _update_position works correctly + # when async_open_cover is called without the hass event loop already + # running the task scheduler. We pre-set the start time so that a + # position > 50 is instantly computed on the first loop iteration. + cover._attr_is_opening = True + cover._attr_is_closing = False + cover._move_start_position = 0 + # Backdate the start time by 0.4 s -- with 0.5 s travel = 80% progress, + # which already exceeds the target of 50, so the loop exits on the first + # iteration after the initial 0.2 s sleep. + cover._move_start_time = _time.monotonic() - 0.4 + cover._target_position = 50 + + loop_task = hass.async_create_task(cover._async_position_update_loop()) + + # Allow the event loop to run: the task sleeps 0.2 s then checks. + # Give it 1 second of wall-clock asyncio time to settle. + await asyncio.sleep(0.5) + + # The loop should have exited and cleared the flags. + assert cover._attr_is_opening is False, ( + f"Expected is_opening=False after position reached, got {cover._attr_is_opening}" + ) + assert cover._attr_is_closing is False, ( + f"Expected is_closing=False after position reached, got {cover._attr_is_closing}" + ) + assert cover._target_position is None, ( + f"Expected _target_position=None after position reached, got {cover._target_position}" + ) + # Position should equal the requested target (50%) + assert cover._attr_current_cover_position == 50, ( + f"Expected position=50 after reaching target, got {cover._attr_current_cover_position}" + ) + # Task should be done + assert loop_task.done(), "Expected position loop task to be done after target reached"