From 6e22654a9c594403d6b618e4612f8fbd043bfcc7 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 21:14:35 +0200 Subject: [PATCH 1/8] test(02-01): add failing test scaffold for manual-add slice - Add tests/test_config_flow.py with 8 test functions: test_manual_add_menu_shown, test_manual_add_creates_subentry, test_manual_add_mode_flag, test_manual_add_invalid_enum, test_manual_add_duplicate_enum, test_manual_add_device_name, test_manual_add_position_step_timed_only, test_reconfigure_timed_motor_aborts - Add 3 cover tests to tests/test_cover.py: test_cover_mode_attribute, test_cover_mode_defaults_bidirectional_when_key_absent, test_cover_initial_position_from_subentry - All tests RED: fail on missing CONF_BIDIRECTIONAL/CONF_INITIAL_POSITION symbols and missing flow methods (expected RED state) --- tests/test_config_flow.py | 330 ++++++++++++++++++++++++++++++++++++++ tests/test_cover.py | 76 +++++++++ 2 files changed, 406 insertions(+) create mode 100644 tests/test_config_flow.py diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py new file mode 100644 index 0000000..2205738 --- /dev/null +++ b/tests/test_config_flow.py @@ -0,0 +1,330 @@ +"""Tests for the blind subentry manual-add flow.""" + +from __future__ import annotations + +from types import MappingProxyType +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from homeassistant.config_entries import ConfigEntry, ConfigEntryState +from homeassistant.core import HomeAssistant + +from custom_components.schellenberg_usb.const import ( + CONF_BIDIRECTIONAL, + CONF_INITIAL_POSITION, + CONF_SERIAL_PORT, + DOMAIN, + SUBENTRY_TYPE_BLIND, +) +from custom_components.schellenberg_usb.config_flow import ( + SchellenbergPairingSubentryFlow, +) + + +@pytest.fixture +def mock_hub_entry(hass: HomeAssistant) -> ConfigEntry: + """Create a mock hub config entry.""" + entry = ConfigEntry( + version=1, + domain=DOMAIN, + title="Schellenberg USB", + data={CONF_SERIAL_PORT: "/dev/ttyUSB0"}, + options={}, + entry_id="test_manual_flow_entry", + state=ConfigEntryState.NOT_LOADED, + minor_version=1, + source="test", + unique_id=None, + discovery_keys=MappingProxyType({}), + subentries_data=None, + ) + hass.config_entries._entries[entry.entry_id] = entry + return entry + + +@pytest.mark.asyncio +async def test_manual_add_menu_shown( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Menu step returns show_menu with 'user' and 'manual_add' options.""" + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = mock_hub_entry.entry_id + + result = await handler.async_step_blind(None) + + assert result["type"] == "menu" + assert "user" in result["menu_options"] + assert "manual_add" in result["menu_options"] + + +@pytest.mark.asyncio +async def test_manual_add_creates_subentry( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Bidirectional manual-add with a valid enum creates an entry without api.pair_device_and_wait.""" + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = mock_hub_entry.entry_id + + mock_api = MagicMock() + mock_api.pair_device_and_wait = AsyncMock() + mock_hub_entry.runtime_data = mock_api # type: ignore[attr-defined] + + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: True, + "device_name": "Test Blind", + } + ) + + assert result["type"] == "create_entry" + mock_api.pair_device_and_wait.assert_not_called() + + +@pytest.mark.asyncio +async def test_manual_add_mode_flag( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Timed selection stores CONF_BIDIRECTIONAL False; bidirectional stores True.""" + handler_timed = SchellenbergPairingSubentryFlow() + handler_timed.hass = hass + handler_timed.handler = mock_hub_entry.entry_id + + # For timed, advance to position step to get data + result_timed = await handler_timed.async_step_manual_add( + { + "device_enum": "2B", + CONF_BIDIRECTIONAL: False, + "device_name": "Timed Blind", + } + ) + # Timed goes to position step first + assert result_timed["type"] == "form" + assert result_timed["step_id"] == "manual_position" + + # Complete timed: submit position step + result_timed_entry = await handler_timed.async_step_manual_position( + {"initial_position": 100} + ) + assert result_timed_entry["type"] == "create_entry" + assert result_timed_entry["data"][CONF_BIDIRECTIONAL] is False + + # For bidirectional + handler_bi = SchellenbergPairingSubentryFlow() + handler_bi.hass = hass + handler_bi.handler = mock_hub_entry.entry_id + + result_bi = await handler_bi.async_step_manual_add( + { + "device_enum": "3C", + CONF_BIDIRECTIONAL: True, + "device_name": "Bi Blind", + } + ) + assert result_bi["type"] == "create_entry" + assert result_bi["data"][CONF_BIDIRECTIONAL] is True + + +@pytest.mark.asyncio +async def test_manual_add_invalid_enum( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Inputs 'XY', '100', '' each return form errors with invalid_enum_format.""" + for bad_input in ("XY", "100", ""): + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = mock_hub_entry.entry_id + + result = await handler.async_step_manual_add( + { + "device_enum": bad_input, + CONF_BIDIRECTIONAL: False, + "device_name": "", + } + ) + assert result["type"] == "form", ( + f"Expected form for input {bad_input!r}, got {result['type']!r}" + ) + assert result["errors"].get("device_enum") == "invalid_enum_format", ( + f"Expected invalid_enum_format for input {bad_input!r}, " + f"got {result['errors']!r}" + ) + + +@pytest.mark.asyncio +async def test_manual_add_duplicate_enum( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """An enum already present in a blind subentry returns duplicate_enum error.""" + existing_sub = MagicMock() + existing_sub.subentry_type = SUBENTRY_TYPE_BLIND + existing_sub.data = {"device_enum": "1A", "device_id": "1A"} + mock_hub_entry.subentries = MappingProxyType({"existing": existing_sub}) # type: ignore[attr-defined] + + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = mock_hub_entry.entry_id + + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: False, + "device_name": "", + } + ) + assert result["type"] == "form" + assert result["errors"].get("device_enum") == "duplicate_enum" + + +@pytest.mark.asyncio +async def test_manual_add_device_name( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Provided name becomes the create_entry title; omitted name falls back to 'Blind {ENUM}'.""" + # With name + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = mock_hub_entry.entry_id + + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: True, + "device_name": "My Living Room Blind", + } + ) + assert result["type"] == "create_entry" + assert result["title"] == "My Living Room Blind" + + # Without name — falls back to "Blind {ENUM}" + handler2 = SchellenbergPairingSubentryFlow() + handler2.hass = hass + handler2.handler = mock_hub_entry.entry_id + + result2 = await handler2.async_step_manual_add( + { + "device_enum": "2B", + CONF_BIDIRECTIONAL: True, + "device_name": "", + } + ) + assert result2["type"] == "create_entry" + assert result2["title"] == "Blind 2B" + + +@pytest.mark.asyncio +async def test_manual_add_position_step_timed_only( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Timed mode advances to manual_position form; bidirectional goes straight to create_entry.""" + # Timed: should show position form + handler_timed = SchellenbergPairingSubentryFlow() + handler_timed.hass = hass + handler_timed.handler = mock_hub_entry.entry_id + + result_timed = await handler_timed.async_step_manual_add( + { + "device_enum": "1A", + CONF_BIDIRECTIONAL: False, + "device_name": "Timed", + } + ) + assert result_timed["type"] == "form" + assert result_timed["step_id"] == "manual_position" + + # Bidirectional: should go directly to create_entry + handler_bi = SchellenbergPairingSubentryFlow() + handler_bi.hass = hass + handler_bi.handler = mock_hub_entry.entry_id + + result_bi = await handler_bi.async_step_manual_add( + { + "device_enum": "2B", + CONF_BIDIRECTIONAL: True, + "device_name": "Bi", + } + ) + assert result_bi["type"] == "create_entry" + + +@pytest.mark.asyncio +async def test_reconfigure_timed_motor_aborts( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Reconfigure on a timed subentry aborts with timed_calibration_unavailable. + + A bidirectional subentry still reaches the calibration path (form, not abort). + """ + from custom_components.schellenberg_usb.options_flow_calibration import ( + CalibrationFlowHandler, + ) + + # --- Timed motor: must abort --- + timed_subentry = MagicMock() + timed_subentry.data = { + "device_id": "1A", + "device_enum": "1A", + CONF_BIDIRECTIONAL: False, + } + timed_subentry.title = "Timed Blind" + + handler_timed = SchellenbergPairingSubentryFlow() + handler_timed.hass = hass + handler_timed.handler = mock_hub_entry.entry_id + + with patch.object( + handler_timed, "_get_reconfigure_subentry", return_value=timed_subentry + ): + with patch.object( + CalibrationFlowHandler, + "async_step_calibration_close", + new_callable=AsyncMock, + ) as mock_cal_step: + result = await handler_timed.async_step_reconfigure(None) + + assert result["type"] == "abort", ( + f"Expected abort for timed motor, got {result['type']!r}" + ) + assert result["reason"] == "timed_calibration_unavailable", ( + f"Expected timed_calibration_unavailable, got {result['reason']!r}" + ) + # The event-waiting calibration step must NOT have been called + mock_cal_step.assert_not_called() + + # --- Bidirectional motor: must reach calibration path (form, not abort) --- + bi_subentry = MagicMock() + bi_subentry.data = { + "device_id": "ABC123", + "device_enum": "10", + CONF_BIDIRECTIONAL: True, + "open_time": 20.0, + "close_time": 18.0, + } + bi_subentry.title = "Bi Blind" + + mock_api = MagicMock() + mock_api.control_blind = AsyncMock() + mock_hub_entry.runtime_data = mock_api # type: ignore[attr-defined] + + handler_bi = SchellenbergPairingSubentryFlow() + handler_bi.hass = hass + handler_bi.handler = mock_hub_entry.entry_id + + with patch.object( + handler_bi, "_get_reconfigure_subentry", return_value=bi_subentry + ): + with patch.object( + CalibrationFlowHandler, + "async_step_calibration_close", + new_callable=AsyncMock, + return_value={"type": "form", "step_id": "calibration_close"}, + ): + result_bi = await handler_bi.async_step_reconfigure(None) + + # Bidirectional reconfigure should NOT be an abort with timed reason + assert result_bi.get("reason") != "timed_calibration_unavailable", ( + "Bidirectional motor should not abort with timed_calibration_unavailable" + ) diff --git a/tests/test_cover.py b/tests/test_cover.py index 5bfd954..f2e3fc7 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -14,7 +14,9 @@ from custom_components.schellenberg_usb.api import SchellenbergUsbApi from custom_components.schellenberg_usb.const import ( + CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, + CONF_INITIAL_POSITION, CONF_OPEN_TIME, CONF_SERIAL_PORT, DOMAIN, @@ -645,3 +647,77 @@ async def test_cover_will_remove_from_hass( with patch.object(cover, "_stop_position_tracking") as mock_stop: await cover.async_will_remove_from_hass() mock_stop.assert_called_once() + + +@pytest.mark.asyncio +async def test_cover_mode_attribute( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Cover built with CONF_BIDIRECTIONAL True exposes mode='bidirectional'; False -> 'timed'.""" + cover_bi = SchellenbergCover( + api=mock_api, + device_id="1A", + device_enum="1A", + device_name="Bi Cover", + device_data={CONF_BIDIRECTIONAL: True}, + ) + assert cover_bi.extra_state_attributes["mode"] == "bidirectional" + + cover_timed = SchellenbergCover( + api=mock_api, + device_id="2B", + device_enum="2B", + device_name="Timed Cover", + device_data={CONF_BIDIRECTIONAL: False}, + ) + assert cover_timed.extra_state_attributes["mode"] == "timed" + + +@pytest.mark.asyncio +async def test_cover_mode_defaults_bidirectional_when_key_absent( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Legacy subentry with NO CONF_BIDIRECTIONAL key reports mode='bidirectional' (read-default True). + + This prevents CTRL-05 regression: existing auto-paired motors must never be mislabeled timed. + """ + # OMIT CONF_BIDIRECTIONAL key entirely — simulates a Phase-1 legacy subentry + cover = SchellenbergCover( + api=mock_api, + device_id="ABC123", + device_enum="10", + device_name="Legacy Cover", + device_data={"device_id": "ABC123", "device_enum": "10"}, + ) + assert cover.extra_state_attributes["mode"] == "bidirectional" + + +@pytest.mark.asyncio +async def test_cover_initial_position_from_subentry( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Timed cover with CONF_INITIAL_POSITION 100 seeds position to 100 after async_added_to_hass.""" + cover = SchellenbergCover( + api=mock_api, + device_id="1A", + device_enum="1A", + device_name="Timed Cover", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_INITIAL_POSITION: 100, + }, + ) + cover.hass = hass + + with patch.object(cover, "async_get_last_state", return_value=None): + with patch( + "custom_components.schellenberg_usb.cover.async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 100 + assert cover._attr_is_closed is False From 1bf266ad37f6ac21c99691f9ea9913cc40951418 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 21:46:56 +0200 Subject: [PATCH 2/8] =?UTF-8?q?feat(02-01):=20implement=20manual-add=20sli?= =?UTF-8?q?ce=20=E2=80=94=20const=20keys,=20menu/form=20steps,=20cover=20r?= =?UTF-8?q?eads?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - const.py: add CONF_BIDIRECTIONAL ('bidirectional') and CONF_INITIAL_POSITION - config_flow.py: async_step_blind now delegates to async_step_menu; add async_step_menu (shows 'Pair automatically' / 'Add manually' menu), async_step_manual_add (validates 2-hex enum, dedup check, mode, name), async_step_manual_position (initial position slider, timed only); extend __init__ with _pending_is_bidirectional/_pending_initial_position; add timed-motor guard at top of async_step_reconfigure (REVIEW-2) - cover.py: add CONF_BIDIRECTIONAL/CONF_INITIAL_POSITION imports; read both in __init__ (CONF_BIDIRECTIONAL default True for legacy subentries, REVIEW-1); add extra_state_attributes returning mode; seed initial position in async_added_to_hass (RestoreEntity takes precedence) - strings.json + translations/en.json: relabel initiate_flow.user to 'Add device' (REVIEW-3); add menu/manual_add/manual_position steps, invalid_enum_format + duplicate_enum errors, timed_calibration_unavailable abort - tests/test_config_flow.py: fix handler binding (_make_handler helper with tuple handler + context source=user); all 11 Phase-2 tests now GREEN (34 pass) --- .../schellenberg_usb/config_flow.py | 158 +++++++++++++++++- custom_components/schellenberg_usb/const.py | 4 + custom_components/schellenberg_usb/cover.py | 53 +++++- .../schellenberg_usb/strings.json | 32 +++- .../schellenberg_usb/translations/en.json | 32 +++- tests/test_config_flow.py | 69 ++++---- 6 files changed, 289 insertions(+), 59 deletions(-) diff --git a/custom_components/schellenberg_usb/config_flow.py b/custom_components/schellenberg_usb/config_flow.py index ab21588..ceb5a79 100644 --- a/custom_components/schellenberg_usb/config_flow.py +++ b/custom_components/schellenberg_usb/config_flow.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import re from typing import Any, Awaitable, cast import serial # NOTE: blocking open used only to sanity-check connectivity @@ -18,7 +19,10 @@ from homeassistant.helpers.service_info.usb import UsbServiceInfo from .const import ( + CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, + CONF_DEVICE_ID, + CONF_INITIAL_POSITION, CONF_OPEN_TIME, CONF_SERIAL_PORT, DOMAIN, @@ -201,6 +205,8 @@ def __init__(self) -> None: self._pending_device_id: str | None = None self._pending_device_enum: str | None = None self._pending_device_name: str | None = None + self._pending_is_bidirectional: bool = False + self._pending_initial_position: int | None = None def _get_calibration_handler(self) -> CalibrationFlowHandler: """Return (and lazily create) the calibration flow handler.""" @@ -218,16 +224,143 @@ async def _await_subentry_result( async def async_step_blind( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: - """Entry point when the user clicks the 'Pair device' button. + """Entry point when the user clicks the 'Add device' button. Home Assistant calls async_step_{subentry_type}() where subentry_type is the key returned by async_get_supported_subentry_types. Since our type is - 'blind', we implement async_step_blind(). Previously this was named - async_step_pairing, which caused the flow to fall back and the - translation key for the initiate button to be missing. + 'blind', we implement async_step_blind(). Delegates to async_step_menu + so the user can choose between auto-pair and manual-add. """ - _LOGGER.debug("Subentry blind flow initiated (pairing new device)") - return await self.async_step_user(user_input) + _LOGGER.debug("Subentry blind flow initiated") + return await self.async_step_menu(user_input) + + async def async_step_menu( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Show menu: Pair automatically or Add manually.""" + return self.async_show_menu( + step_id="menu", + menu_options=["user", "manual_add"], + ) + + async def async_step_manual_add( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Collect device enum, mode, and optional name for manual-add.""" + errors: dict[str, str] = {} + + if user_input is not None: + # Normalize to uppercase before validation and storage (Pitfall 4) + device_enum = user_input.get("device_enum", "").upper() + + # Format check: exactly 2 hex characters + if not re.match(r"^[0-9A-Fa-f]{2}$", device_enum): + errors["device_enum"] = "invalid_enum_format" + else: + # Duplicate check across existing blind subentries + hub_entry = self._get_entry() + existing_enums = { + s.data.get("device_enum") + for s in hub_entry.subentries.values() + if s.subentry_type == SUBENTRY_TYPE_BLIND + } + if device_enum in existing_enums: + errors["device_enum"] = "duplicate_enum" + + if not errors: + # Resolve mode — BooleanSelector returns a real Python bool + is_bidirectional: bool = bool( + user_input.get(CONF_BIDIRECTIONAL, False) + ) + device_name = ( + user_input.get("device_name") or f"Blind {device_enum}" + ) + self._pending_device_enum = device_enum + self._pending_device_name = device_name + self._pending_is_bidirectional = is_bidirectional + + if is_bidirectional: + _LOGGER.info( + "Creating bidirectional manual subentry for enum %s", + device_enum, + ) + return self.async_create_entry( + title=device_name, + data={ + CONF_DEVICE_ID: device_enum, + "device_enum": device_enum, + CONF_BIDIRECTIONAL: True, + }, + unique_id=device_enum, + ) + # Timed: advance to initial-position step + _LOGGER.debug( + "Timed motor %s: advancing to position step", device_enum + ) + return await self.async_step_manual_position() + + return self.async_show_form( + step_id="manual_add", + data_schema=vol.Schema( + { + vol.Required("device_enum"): selector.TextSelector(), + vol.Required( + CONF_BIDIRECTIONAL, default=False + ): selector.BooleanSelector(), + vol.Optional("device_name"): selector.TextSelector(), + } + ), + errors=errors, + ) + + async def async_step_manual_position( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Collect initial position for timed motors (shown only after mode=timed).""" + if user_input is not None: + initial_position = int(user_input.get("initial_position", 100)) + # Clamp to 0–100 as defense in depth (slider already bounds, but be safe) + initial_position = max(0, min(100, initial_position)) + device_enum = self._pending_device_enum or "" + device_name = self._pending_device_name or f"Blind {device_enum}" + _LOGGER.info( + "Creating timed manual subentry for enum %s at initial position %d%%", + device_enum, + initial_position, + ) + return self.async_create_entry( + title=device_name, + data={ + CONF_DEVICE_ID: device_enum, + "device_enum": device_enum, + CONF_BIDIRECTIONAL: False, + CONF_INITIAL_POSITION: initial_position, + }, + unique_id=device_enum, + ) + + return self.async_show_form( + step_id="manual_position", + data_schema=vol.Schema( + { + vol.Optional( + "initial_position", default=100 + ): selector.NumberSelector( + selector.NumberSelectorConfig( + min=0, + max=100, + step=1, + unit_of_measurement="%", + mode=selector.NumberSelectorMode.SLIDER, + ) + ), + } + ), + description_placeholders={ + "device_name": self._pending_device_name or "", + }, + last_step=True, + ) async def async_step_user( self, user_input: dict[str, Any] | None = None @@ -329,6 +462,19 @@ async def async_step_reconfigure( if not device_id: return self.async_abort(reason="device_not_found") + # Guard: timed motors cannot calibrate via the event-waiting CalibrationFlowHandler + # (they never send EVENT_STARTED_MOVING_*/EVENT_STOPPED, so calibration hangs). + # Use the same missing-key default as cover.py (True = bidirectional) so legacy + # flag-less subentries are still treated as bidirectional here (REVIEW-2, T-02-04). + # Timed-motor calibration is deferred to Phase 4 / CAL-01. + is_bidirectional = bool(subentry.data.get(CONF_BIDIRECTIONAL, True)) + if not is_bidirectional: + _LOGGER.debug( + "Reconfigure blocked for timed motor %s: calibration not yet supported", + device_id, + ) + return self.async_abort(reason="timed_calibration_unavailable") + # Build a minimal device record; calibration handler will enrich after timing device_name = subentry.title or f"Blind {device_id}" handler.set_selected_device( diff --git a/custom_components/schellenberg_usb/const.py b/custom_components/schellenberg_usb/const.py index 694426a..f877d63 100644 --- a/custom_components/schellenberg_usb/const.py +++ b/custom_components/schellenberg_usb/const.py @@ -105,3 +105,7 @@ CONF_OPEN_TIME = "open_time" # Time it takes to open (up) in seconds CONF_CLOSE_TIME = "close_time" # Time it takes to close (down) in seconds CONF_DEVICE_ID = "device_id" # Device ID for calibration + +# Manual-add device mode flag (stored in subentry.data) +CONF_BIDIRECTIONAL = "bidirectional" # bool; False = timed/non-bidirectional +CONF_INITIAL_POSITION = "initial_position" # int 0-100; timed motors only diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 700289e..05fa53a 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -24,7 +24,9 @@ CMD_DOWN, CMD_STOP, CMD_UP, + CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, + CONF_INITIAL_POSITION, CONF_OPEN_TIME, CONF_SERIAL_PORT, DOMAIN, @@ -260,6 +262,25 @@ def __init__( CONF_CLOSE_TIME, DEFAULT_TRAVEL_TIME ) + # Mode flag: True = bidirectional (can receive events), False = timed. + # Read-default is True so legacy Phase-1 auto-paired subentries that have + # NO CONF_BIDIRECTIONAL key are treated as bidirectional — preventing a + # CTRL-05 regression (Phase 3 would route them through timed control). + # Manual adds ALWAYS write the key explicitly, so this default only + # affects pre-existing flag-less subentries. (Phase 2 known limitation: + # bidirectional manual adds store device_id as 2-char enum, so inbound + # 6-char ss-frame device_id matches will miss _registered_devices — see + # RESEARCH.md "Signal Filter Coupling". No fix needed for timed motors + # as they produce no inbound frames. Tracked for a v2 story.) + self._is_bidirectional: bool = bool( + device_data_dict.get(CONF_BIDIRECTIONAL, True) + ) + self._initial_position: int | None = ( + int(device_data_dict[CONF_INITIAL_POSITION]) + if CONF_INITIAL_POSITION in device_data_dict + else None + ) + self._move_start_time: float | None = None self._move_start_position: int | None = None self._position_update_task: asyncio.Task[None] | None = None @@ -286,6 +307,13 @@ def entity_registry_enabled_default(self) -> bool: """Return if entity should be enabled by default.""" return True + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return device-specific state attributes.""" + return { + "mode": "bidirectional" if self._is_bidirectional else "timed", + } + async def async_added_to_hass(self) -> None: """Run when entity about to be added to hass.""" await super().async_added_to_hass() @@ -329,13 +357,24 @@ async def async_added_to_hass(self) -> None: ) if self._attr_current_cover_position is None: - self._attr_current_cover_position = 0 - self._attr_is_closed = True - _LOGGER.debug( - "No previous state for %s (%s); defaulting position to 0%% (closed)", - self._attr_name, - self._device_id, - ) + if self._initial_position is not None: + self._attr_current_cover_position = max( + 0, min(100, self._initial_position) + ) + self._attr_is_closed = self._attr_current_cover_position == 0 + _LOGGER.debug( + "Seeding initial position for %s to %d%% from subentry.data", + self._attr_name, + self._attr_current_cover_position, + ) + else: + self._attr_current_cover_position = 0 + self._attr_is_closed = True + _LOGGER.debug( + "No previous state for %s (%s); defaulting position to 0%% (closed)", + self._attr_name, + self._device_id, + ) self.async_write_ha_state() diff --git a/custom_components/schellenberg_usb/strings.json b/custom_components/schellenberg_usb/strings.json index 1e9f943..3496cb3 100644 --- a/custom_components/schellenberg_usb/strings.json +++ b/custom_components/schellenberg_usb/strings.json @@ -38,10 +38,33 @@ "blind": { "entry_type": "Blind", "initiate_flow": { - "user": "Pair device", + "user": "Add device", "reconfigure": "Calibrate" }, "step": { + "menu": { + "title": "Add a blind", + "menu_options": { + "user": "Pair automatically", + "manual_add": "Add manually (already paired)" + } + }, + "manual_add": { + "title": "Add blind by transmit address", + "description": "Enter the 2-character hex transmit address (enum slot) the motor already listens to. Example: 10, 1A, 2B.", + "data": { + "device_enum": "Transmit address (hex)", + "bidirectional": "Bidirectional motor", + "device_name": "Friendly name (optional)" + } + }, + "manual_position": { + "title": "Set initial position", + "description": "Where is {device_name} right now? This sets the starting position for the position slider.", + "data": { + "initial_position": "Current position" + } + }, "user": { "title": "Pair a new device", "description": "Press the pair button to activate pairing mode on your Schellenberg USB stick. Then press the pairing button on your blind motor within 2 minutes.", @@ -80,12 +103,15 @@ "pairing_failed": "Pairing failed.", "calibration_timeout": "Calibration timed out. The blind took more than 5 minutes to complete the movement. Please check the device and try again.", "calibration_start_timeout": "Timeout waiting for the blind to start moving. Please ensure the device is properly connected and try again.", - "unknown": "An unexpected error occurred during calibration. Please try again." + "unknown": "An unexpected error occurred during calibration. Please try again.", + "invalid_enum_format": "Transmit address must be exactly 2 hexadecimal characters (e.g. 10, 1A, 2B).", + "duplicate_enum": "A blind with this transmit address is already added to the integration." }, "abort": { "pairing_timeout": "No device responded within 2 minutes. Please try again.", "pairing_failed": "Pairing failed.", - "reconfigure_successful": "Calibration completed successfully" + "reconfigure_successful": "Calibration completed successfully", + "timed_calibration_unavailable": "Calibration for timed motors is not yet available. It will be added in a future update." } } }, diff --git a/custom_components/schellenberg_usb/translations/en.json b/custom_components/schellenberg_usb/translations/en.json index 9ad7f95..430930e 100644 --- a/custom_components/schellenberg_usb/translations/en.json +++ b/custom_components/schellenberg_usb/translations/en.json @@ -38,10 +38,33 @@ "blind": { "entry_type": "Blind", "initiate_flow": { - "user": "Pair device", + "user": "Add device", "reconfigure": "Calibrate" }, "step": { + "menu": { + "title": "Add a blind", + "menu_options": { + "user": "Pair automatically", + "manual_add": "Add manually (already paired)" + } + }, + "manual_add": { + "title": "Add blind by transmit address", + "description": "Enter the 2-character hex transmit address (enum slot) the motor already listens to. Example: 10, 1A, 2B.", + "data": { + "device_enum": "Transmit address (hex)", + "bidirectional": "Bidirectional motor", + "device_name": "Friendly name (optional)" + } + }, + "manual_position": { + "title": "Set initial position", + "description": "Where is {device_name} right now? This sets the starting position for the position slider.", + "data": { + "initial_position": "Current position" + } + }, "user": { "title": "Pair a new device", "description": "Press the pair button to activate pairing mode on your Schellenberg USB stick. Then press the pairing button on your blind motor within 2 minutes.", @@ -80,12 +103,15 @@ "pairing_failed": "Pairing failed.", "calibration_timeout": "Calibration timed out. The blind took more than 5 minutes to complete the movement. Please check the device and try again.", "calibration_start_timeout": "Timeout waiting for the blind to start moving. Please ensure the device is properly connected and try again.", - "unknown": "An unexpected error occurred during calibration. Please try again." + "unknown": "An unexpected error occurred during calibration. Please try again.", + "invalid_enum_format": "Transmit address must be exactly 2 hexadecimal characters (e.g. 10, 1A, 2B).", + "duplicate_enum": "A blind with this transmit address is already added to the integration." }, "abort": { "pairing_timeout": "No device responded within 2 minutes. Please try again.", "pairing_failed": "Pairing failed.", - "reconfigure_successful": "Calibration completed successfully" + "reconfigure_successful": "Calibration completed successfully", + "timed_calibration_unavailable": "Calibration for timed motors is not yet available. It will be added in a future update." } } }, diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py index 2205738..5e3ed04 100644 --- a/tests/test_config_flow.py +++ b/tests/test_config_flow.py @@ -3,7 +3,6 @@ from __future__ import annotations from types import MappingProxyType -from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -12,7 +11,6 @@ from custom_components.schellenberg_usb.const import ( CONF_BIDIRECTIONAL, - CONF_INITIAL_POSITION, CONF_SERIAL_PORT, DOMAIN, SUBENTRY_TYPE_BLIND, @@ -22,6 +20,21 @@ ) +def _make_handler( + hass: HomeAssistant, entry_id: str +) -> SchellenbergPairingSubentryFlow: + """Create a flow handler bound to the given hub entry. + + ConfigSubentryFlow._get_entry() reads self.handler[0] (the entry_id portion of + a (entry_id, subentry_type) tuple). async_create_entry requires source == 'user'. + """ + handler = SchellenbergPairingSubentryFlow() + handler.hass = hass + handler.handler = (entry_id, SUBENTRY_TYPE_BLIND) + handler.context = {"source": "user"} + return handler + + @pytest.fixture def mock_hub_entry(hass: HomeAssistant) -> ConfigEntry: """Create a mock hub config entry.""" @@ -48,9 +61,7 @@ async def test_manual_add_menu_shown( hass: HomeAssistant, mock_hub_entry: ConfigEntry ) -> None: """Menu step returns show_menu with 'user' and 'manual_add' options.""" - handler = SchellenbergPairingSubentryFlow() - handler.hass = hass - handler.handler = mock_hub_entry.entry_id + handler = _make_handler(hass, mock_hub_entry.entry_id) result = await handler.async_step_blind(None) @@ -64,9 +75,7 @@ async def test_manual_add_creates_subentry( hass: HomeAssistant, mock_hub_entry: ConfigEntry ) -> None: """Bidirectional manual-add with a valid enum creates an entry without api.pair_device_and_wait.""" - handler = SchellenbergPairingSubentryFlow() - handler.hass = hass - handler.handler = mock_hub_entry.entry_id + handler = _make_handler(hass, mock_hub_entry.entry_id) mock_api = MagicMock() mock_api.pair_device_and_wait = AsyncMock() @@ -89,11 +98,9 @@ async def test_manual_add_mode_flag( hass: HomeAssistant, mock_hub_entry: ConfigEntry ) -> None: """Timed selection stores CONF_BIDIRECTIONAL False; bidirectional stores True.""" - handler_timed = SchellenbergPairingSubentryFlow() - handler_timed.hass = hass - handler_timed.handler = mock_hub_entry.entry_id + # Timed: goes through position step + handler_timed = _make_handler(hass, mock_hub_entry.entry_id) - # For timed, advance to position step to get data result_timed = await handler_timed.async_step_manual_add( { "device_enum": "2B", @@ -112,10 +119,8 @@ async def test_manual_add_mode_flag( assert result_timed_entry["type"] == "create_entry" assert result_timed_entry["data"][CONF_BIDIRECTIONAL] is False - # For bidirectional - handler_bi = SchellenbergPairingSubentryFlow() - handler_bi.hass = hass - handler_bi.handler = mock_hub_entry.entry_id + # Bidirectional: goes straight to create_entry + handler_bi = _make_handler(hass, mock_hub_entry.entry_id) result_bi = await handler_bi.async_step_manual_add( { @@ -134,9 +139,7 @@ async def test_manual_add_invalid_enum( ) -> None: """Inputs 'XY', '100', '' each return form errors with invalid_enum_format.""" for bad_input in ("XY", "100", ""): - handler = SchellenbergPairingSubentryFlow() - handler.hass = hass - handler.handler = mock_hub_entry.entry_id + handler = _make_handler(hass, mock_hub_entry.entry_id) result = await handler.async_step_manual_add( { @@ -164,9 +167,7 @@ async def test_manual_add_duplicate_enum( existing_sub.data = {"device_enum": "1A", "device_id": "1A"} mock_hub_entry.subentries = MappingProxyType({"existing": existing_sub}) # type: ignore[attr-defined] - handler = SchellenbergPairingSubentryFlow() - handler.hass = hass - handler.handler = mock_hub_entry.entry_id + handler = _make_handler(hass, mock_hub_entry.entry_id) result = await handler.async_step_manual_add( { @@ -185,9 +186,7 @@ async def test_manual_add_device_name( ) -> None: """Provided name becomes the create_entry title; omitted name falls back to 'Blind {ENUM}'.""" # With name - handler = SchellenbergPairingSubentryFlow() - handler.hass = hass - handler.handler = mock_hub_entry.entry_id + handler = _make_handler(hass, mock_hub_entry.entry_id) result = await handler.async_step_manual_add( { @@ -200,9 +199,7 @@ async def test_manual_add_device_name( assert result["title"] == "My Living Room Blind" # Without name — falls back to "Blind {ENUM}" - handler2 = SchellenbergPairingSubentryFlow() - handler2.hass = hass - handler2.handler = mock_hub_entry.entry_id + handler2 = _make_handler(hass, mock_hub_entry.entry_id) result2 = await handler2.async_step_manual_add( { @@ -221,9 +218,7 @@ async def test_manual_add_position_step_timed_only( ) -> None: """Timed mode advances to manual_position form; bidirectional goes straight to create_entry.""" # Timed: should show position form - handler_timed = SchellenbergPairingSubentryFlow() - handler_timed.hass = hass - handler_timed.handler = mock_hub_entry.entry_id + handler_timed = _make_handler(hass, mock_hub_entry.entry_id) result_timed = await handler_timed.async_step_manual_add( { @@ -236,9 +231,7 @@ async def test_manual_add_position_step_timed_only( assert result_timed["step_id"] == "manual_position" # Bidirectional: should go directly to create_entry - handler_bi = SchellenbergPairingSubentryFlow() - handler_bi.hass = hass - handler_bi.handler = mock_hub_entry.entry_id + handler_bi = _make_handler(hass, mock_hub_entry.entry_id) result_bi = await handler_bi.async_step_manual_add( { @@ -271,9 +264,7 @@ async def test_reconfigure_timed_motor_aborts( } timed_subentry.title = "Timed Blind" - handler_timed = SchellenbergPairingSubentryFlow() - handler_timed.hass = hass - handler_timed.handler = mock_hub_entry.entry_id + handler_timed = _make_handler(hass, mock_hub_entry.entry_id) with patch.object( handler_timed, "_get_reconfigure_subentry", return_value=timed_subentry @@ -309,9 +300,7 @@ async def test_reconfigure_timed_motor_aborts( mock_api.control_blind = AsyncMock() mock_hub_entry.runtime_data = mock_api # type: ignore[attr-defined] - handler_bi = SchellenbergPairingSubentryFlow() - handler_bi.hass = hass - handler_bi.handler = mock_hub_entry.entry_id + handler_bi = _make_handler(hass, mock_hub_entry.entry_id) with patch.object( handler_bi, "_get_reconfigure_subentry", return_value=bi_subentry From fa10f40e43a5bd7a70e2ca60424142deaa8d0473 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 21:54:44 +0200 Subject: [PATCH 3/8] =?UTF-8?q?feat(02-01):=20harden=20edge=20cases=20?= =?UTF-8?q?=E2=80=94=20enum=20normalization,=20position=20clamping,=20full?= =?UTF-8?q?=20gate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tests/test_config_flow.py: add test_manual_add_enum_case_normalized (lowercase '1a' collides with existing '1A' after .upper()), test_manual_add_enum_stored_uppercase ('2b' stored as '2B') - tests/test_cover.py: add test_cover_initial_position_clamped (CONF_INITIAL_POSITION=150 clamps to 100; restored prior state of 50 beats seeded initial of 100) - Full WSL pytest suite: 160 passed, 0 failures (no regressions) - Native ruff: All checks passed; mypy: Success, no issues in 3 source files --- tests/test_config_flow.py | 48 ++++++++++++++++++++++++++++++++++++ tests/test_cover.py | 51 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py index 5e3ed04..43043cb 100644 --- a/tests/test_config_flow.py +++ b/tests/test_config_flow.py @@ -317,3 +317,51 @@ async def test_reconfigure_timed_motor_aborts( assert result_bi.get("reason") != "timed_calibration_unavailable", ( "Bidirectional motor should not abort with timed_calibration_unavailable" ) + + +@pytest.mark.asyncio +async def test_manual_add_enum_case_normalized( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Lowercase '1a' collides with existing '1A' after .upper() normalization (Pitfall 4).""" + existing_sub = MagicMock() + existing_sub.subentry_type = SUBENTRY_TYPE_BLIND + existing_sub.data = {"device_enum": "1A", "device_id": "1A"} + mock_hub_entry.subentries = MappingProxyType({"existing": existing_sub}) # type: ignore[attr-defined] + + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": "1a", # lowercase — must be normalized before dedup check + CONF_BIDIRECTIONAL: False, + "device_name": "", + } + ) + assert result["type"] == "form" + assert result["errors"].get("device_enum") == "duplicate_enum", ( + "Expected duplicate_enum: '1a'.upper() == '1A' which already exists" + ) + + +@pytest.mark.asyncio +async def test_manual_add_enum_stored_uppercase( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Submitting '2b' stores device_enum/device_id as '2B' (uppercase) in the entry.""" + handler = _make_handler(hass, mock_hub_entry.entry_id) + + result = await handler.async_step_manual_add( + { + "device_enum": "2b", + CONF_BIDIRECTIONAL: True, + "device_name": "Uppercase Test", + } + ) + assert result["type"] == "create_entry" + assert result["data"]["device_enum"] == "2B", ( + f"Expected device_enum '2B', got {result['data']['device_enum']!r}" + ) + assert result["data"]["device_id"] == "2B", ( + f"Expected device_id '2B', got {result['data']['device_id']!r}" + ) diff --git a/tests/test_cover.py b/tests/test_cover.py index f2e3fc7..69d407d 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -721,3 +721,54 @@ async def test_cover_initial_position_from_subentry( assert cover._attr_current_cover_position == 100 assert cover._attr_is_closed is False + + +@pytest.mark.asyncio +async def test_cover_initial_position_clamped( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CONF_INITIAL_POSITION=150 clamps to 100; a restored prior state wins over seeded initial.""" + # Upper-bound clamp: 150 -> 100 + cover_clamped = SchellenbergCover( + api=mock_api, + device_id="2B", + device_enum="2B", + device_name="Clamped Cover", + device_data={CONF_BIDIRECTIONAL: False, CONF_INITIAL_POSITION: 150}, + ) + cover_clamped.hass = hass + + with patch.object(cover_clamped, "async_get_last_state", return_value=None): + with patch( + "custom_components.schellenberg_usb.cover.async_dispatcher_connect" + ): + with patch.object(cover_clamped, "async_write_ha_state"): + await cover_clamped.async_added_to_hass() + + assert cover_clamped._attr_current_cover_position == 100, ( + f"Expected 100 (clamped from 150), got {cover_clamped._attr_current_cover_position}" + ) + + # RestoreEntity precedence: prior state of 50 beats seeded initial of 100 (Pitfall 5) + cover_restored = SchellenbergCover( + api=mock_api, + device_id="3C", + device_enum="3C", + device_name="Restored Cover", + device_data={CONF_BIDIRECTIONAL: False, CONF_INITIAL_POSITION: 100}, + ) + cover_restored.hass = hass + + last_state = State("cover.restored_cover", "open", {"current_position": 50}) + + with patch.object(cover_restored, "async_get_last_state", return_value=last_state): + with patch( + "custom_components.schellenberg_usb.cover.async_dispatcher_connect" + ): + with patch.object(cover_restored, "async_write_ha_state"): + await cover_restored.async_added_to_hass() + + assert cover_restored._attr_current_cover_position == 50, ( + f"Expected 50 (restored state wins), got {cover_restored._attr_current_cover_position}" + ) From 74cfa00503607fbee36aee92b07b7a299800f714 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 22:20:17 +0200 Subject: [PATCH 4/8] fix(02): reset cover movement flags when timed-motor position reached (CR-01) In _async_position_update_loop, the position_reached branch now clears _attr_is_opening, _attr_is_closing, _attr_is_closed, and _target_position before returning -- matching the two boundary-exit branches that already did this correctly. Without this fix, timed motors were left stuck in a phantom opening/closing state indefinitely after SET_POSITION completed. Also adds _unrecorded_attributes = frozenset({'mode'}) to SchellenbergCover so the static mode attribute is excluded from the recorder (WR-05). --- custom_components/schellenberg_usb/cover.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 05fa53a..d693186 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -215,6 +215,7 @@ class SchellenbergCover(CoverEntity, RestoreEntity): _attr_has_entity_name = True _attr_should_poll = False + _unrecorded_attributes = frozenset({"mode"}) _attr_supported_features = ( CoverEntityFeature.OPEN @@ -539,6 +540,10 @@ async def _async_position_update_loop(self) -> None: if self._target_position not in (0, 100): await self._api.control_blind(self._device_enum, CMD_STOP) + self._attr_is_opening = False + self._attr_is_closing = False + self._attr_is_closed = self._attr_current_cover_position == 0 + self._target_position = None self._position_update_task = None self._move_start_time = None self._move_start_position = None From ffd6b51ea736dff61ec9b030cd881e8c8feaa59b Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 22:20:32 +0200 Subject: [PATCH 5/8] fix(02): flip manual-add default to bidirectional, guard manual_position, and cleanups (WR-01/03/04, IN-01/02) - WR-01: async_step_manual_position now aborts with pairing_failed if _pending_device_enum is missing, preventing a broken empty-id subentry. - WR-03: BooleanSelector default and resolver default for CONF_BIDIRECTIONAL flipped from False to True so an unmodified form yields bidirectional, matching the field-common case and the read-default used everywhere else. - WR-04: removed the never-used _pending_initial_position attribute from __init__. - IN-01: Awaitable moved from typing to collections.abc (deprecated in 3.9+). - IN-02: Unicode en-dash in comment replaced with plain ASCII hyphen. --- custom_components/schellenberg_usb/config_flow.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/custom_components/schellenberg_usb/config_flow.py b/custom_components/schellenberg_usb/config_flow.py index ceb5a79..38b953f 100644 --- a/custom_components/schellenberg_usb/config_flow.py +++ b/custom_components/schellenberg_usb/config_flow.py @@ -4,7 +4,8 @@ import logging import re -from typing import Any, Awaitable, cast +from collections.abc import Awaitable +from typing import Any, cast import serial # NOTE: blocking open used only to sanity-check connectivity import voluptuous as vol @@ -206,7 +207,6 @@ def __init__(self) -> None: self._pending_device_enum: str | None = None self._pending_device_name: str | None = None self._pending_is_bidirectional: bool = False - self._pending_initial_position: int | None = None def _get_calibration_handler(self) -> CalibrationFlowHandler: """Return (and lazily create) the calibration flow handler.""" @@ -270,7 +270,7 @@ async def async_step_manual_add( if not errors: # Resolve mode — BooleanSelector returns a real Python bool is_bidirectional: bool = bool( - user_input.get(CONF_BIDIRECTIONAL, False) + user_input.get(CONF_BIDIRECTIONAL, True) ) device_name = ( user_input.get("device_name") or f"Blind {device_enum}" @@ -305,7 +305,7 @@ async def async_step_manual_add( { vol.Required("device_enum"): selector.TextSelector(), vol.Required( - CONF_BIDIRECTIONAL, default=False + CONF_BIDIRECTIONAL, default=True ): selector.BooleanSelector(), vol.Optional("device_name"): selector.TextSelector(), } @@ -317,9 +317,12 @@ async def async_step_manual_position( self, user_input: dict[str, Any] | None = None ) -> SubentryFlowResult: """Collect initial position for timed motors (shown only after mode=timed).""" + if not self._pending_device_enum: + return self.async_abort(reason="pairing_failed") + if user_input is not None: initial_position = int(user_input.get("initial_position", 100)) - # Clamp to 0–100 as defense in depth (slider already bounds, but be safe) + # Clamp to 0-100 as defense in depth (slider already bounds, but be safe) initial_position = max(0, min(100, initial_position)) device_enum = self._pending_device_enum or "" device_name = self._pending_device_name or f"Blind {device_enum}" From 9fe049a9db484ee60b7eb6994109626f7f112671 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 22:20:42 +0200 Subject: [PATCH 6/8] fix(02): add uncalibrated note to manual_position description, remove orphaned menu key (WR-02, IN-04) - WR-02: manual_position step description now notes that positioning for timed motors uses a default travel time and is uncalibrated until Phase 4. - IN-04: removed the orphaned config.step.menu.menu_options.pair_device_menu key from both strings.json and translations/en.json; no matching flow step exists. Subentry-level user/manual_add keys are unaffected. --- custom_components/schellenberg_usb/strings.json | 5 ++--- custom_components/schellenberg_usb/translations/en.json | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/custom_components/schellenberg_usb/strings.json b/custom_components/schellenberg_usb/strings.json index 3496cb3..12ccdc3 100644 --- a/custom_components/schellenberg_usb/strings.json +++ b/custom_components/schellenberg_usb/strings.json @@ -4,8 +4,7 @@ "menu": { "title": "Schellenberg USB setup", "menu_options": { - "user": "Set up USB hub", - "pair_device_menu": "Pair new device" + "user": "Set up USB hub" } }, "user": { @@ -60,7 +59,7 @@ }, "manual_position": { "title": "Set initial position", - "description": "Where is {device_name} right now? This sets the starting position for the position slider.", + "description": "Where is {device_name} right now? This sets the starting position for the position slider. Note: positioning for timed motors uses a default travel time and is uncalibrated until calibration support is added in a later update.", "data": { "initial_position": "Current position" } diff --git a/custom_components/schellenberg_usb/translations/en.json b/custom_components/schellenberg_usb/translations/en.json index 430930e..6a86371 100644 --- a/custom_components/schellenberg_usb/translations/en.json +++ b/custom_components/schellenberg_usb/translations/en.json @@ -4,8 +4,7 @@ "menu": { "title": "Schellenberg USB setup", "menu_options": { - "user": "Set up USB hub", - "pair_device_menu": "Pair new device" + "user": "Set up USB hub" } }, "user": { @@ -60,7 +59,7 @@ }, "manual_position": { "title": "Set initial position", - "description": "Where is {device_name} right now? This sets the starting position for the position slider.", + "description": "Where is {device_name} right now? This sets the starting position for the position slider. Note: positioning for timed motors uses a default travel time and is uncalibrated until calibration support is added in a later update.", "data": { "initial_position": "Current position" } From d4c533c6d1e6d71476ebf75e8b8427701b0feea1 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 22:20:52 +0200 Subject: [PATCH 7/8] test(02): add regression tests for CR-01 position loop, WR-01 guard, and WR-03 default (WR-06) - test_timed_motor_position_loop_clears_flags: exercises the REAL _async_position_update_loop for a timed motor with tiny travel time, drives SET_POSITION to 50, and asserts is_opening=False, is_closing=False, _target_position=None, and position==50 after completion (WR-06/CR-01). - test_manual_position_aborts_without_pending_state: asserts abort with pairing_failed when manual_position is entered without prior manual_add (WR-01). - test_manual_add_default_mode_is_bidirectional: asserts that omitting CONF_BIDIRECTIONAL from the form yields CONF_BIDIRECTIONAL=True (WR-03). --- tests/test_config_flow.py | 52 +++++++++++++++++++++++++++ tests/test_cover.py | 75 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py index 43043cb..0dd2051 100644 --- a/tests/test_config_flow.py +++ b/tests/test_config_flow.py @@ -365,3 +365,55 @@ async def test_manual_add_enum_stored_uppercase( assert result["data"]["device_id"] == "2B", ( f"Expected device_id '2B', got {result['data']['device_id']!r}" ) + + +@pytest.mark.asyncio +async def test_manual_position_aborts_without_pending_state( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """async_step_manual_position aborts when _pending_device_enum is missing (WR-01). + + This guard prevents a broken subentry from being created if the step is + reached without async_step_manual_add having run first (HA re-entrancy, + serialized-flow resume, or future menu wiring change). + """ + handler = _make_handler(hass, mock_hub_entry.entry_id) + # _pending_device_enum is None by default -- do not call manual_add first + + result = await handler.async_step_manual_position(None) + + assert result["type"] == "abort", ( + f"Expected abort when pending state is missing, got {result['type']!r}" + ) + assert result["reason"] == "pairing_failed", ( + f"Expected reason='pairing_failed', got {result.get('reason')!r}" + ) + + +@pytest.mark.asyncio +async def test_manual_add_default_mode_is_bidirectional( + hass: HomeAssistant, mock_hub_entry: ConfigEntry +) -> None: + """Unmodified manual_add form (no CONF_BIDIRECTIONAL key) defaults to bidirectional (WR-03). + + When a user submits without flipping the toggle, the resolver default must + produce CONF_BIDIRECTIONAL=True (bidirectional), matching the field-common case. + """ + handler = _make_handler(hass, mock_hub_entry.entry_id) + + # Omit CONF_BIDIRECTIONAL entirely to simulate an unmodified form submission + result = await handler.async_step_manual_add( + { + "device_enum": "1A", + # CONF_BIDIRECTIONAL deliberately omitted + "device_name": "Default Mode Blind", + } + ) + + # With bidirectional as default, we expect a direct create_entry (no position step) + assert result["type"] == "create_entry", ( + f"Expected create_entry for bidirectional default, got {result['type']!r}" + ) + assert result["data"][CONF_BIDIRECTIONAL] is True, ( + f"Expected CONF_BIDIRECTIONAL=True (default), got {result['data'][CONF_BIDIRECTIONAL]!r}" + ) diff --git a/tests/test_cover.py b/tests/test_cover.py index 69d407d..fd57706 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from types import MappingProxyType from typing import Any, cast from unittest.mock import AsyncMock, MagicMock, patch @@ -772,3 +773,77 @@ async def test_cover_initial_position_clamped( assert cover_restored._attr_current_cover_position == 50, ( f"Expected 50 (restored state wins), got {cover_restored._attr_current_cover_position}" ) + + +@pytest.mark.asyncio +async def test_timed_motor_position_loop_clears_flags( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Timed motor: after SET_POSITION target is reached via the real loop, flags are cleared. + + Regression test for CR-01: the position-reached branch used to leave + _attr_is_opening/_attr_is_closing True and _target_position set, causing HA + to render the cover as perpetually moving. This test exercises the real + _async_position_update_loop (no patch on _start_position_tracking) and must + FAIL against pre-fix code and PASS after the fix. + """ + import time as _time + + # Use a very small travel time (0.5 s) so the move completes within ms + # in the event loop. Start at 0% and move UP to 50% -- the loop should + # stop the motor and clear the flags once position >= 50. + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor Test", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 0.5, + CONF_CLOSE_TIME: 0.5, + }, + ) + cover.hass = hass + cover._attr_current_cover_position = 0 + cover._attr_is_closed = True + + # Kick off async_set_cover_position. It internally calls async_open_cover, + # which calls _start_position_tracking -> creates the real loop task. + # We do NOT patch _start_position_tracking here (that is the whole point). + with patch.object(cover, "async_write_ha_state"): + # Set the move start state manually so _update_position works correctly + # when async_open_cover is called without the hass event loop already + # running the task scheduler. We pre-set the start time so that a + # position > 50 is instantly computed on the first loop iteration. + cover._attr_is_opening = True + cover._attr_is_closing = False + cover._move_start_position = 0 + # Backdate the start time by 0.4 s -- with 0.5 s travel = 80% progress, + # which already exceeds the target of 50, so the loop exits on the first + # iteration after the initial 0.2 s sleep. + cover._move_start_time = _time.monotonic() - 0.4 + cover._target_position = 50 + + loop_task = hass.async_create_task(cover._async_position_update_loop()) + + # Allow the event loop to run: the task sleeps 0.2 s then checks. + # Give it 1 second of wall-clock asyncio time to settle. + await asyncio.sleep(0.5) + + # The loop should have exited and cleared the flags. + assert cover._attr_is_opening is False, ( + f"Expected is_opening=False after position reached, got {cover._attr_is_opening}" + ) + assert cover._attr_is_closing is False, ( + f"Expected is_closing=False after position reached, got {cover._attr_is_closing}" + ) + assert cover._target_position is None, ( + f"Expected _target_position=None after position reached, got {cover._target_position}" + ) + # Position should equal the requested target (50%) + assert cover._attr_current_cover_position == 50, ( + f"Expected position=50 after reaching target, got {cover._attr_current_cover_position}" + ) + # Task should be done + assert loop_task.done(), "Expected position loop task to be done after target reached" From 593baab76d42039e5d1095e230e1d45cd8f0e4bf Mon Sep 17 00:00:00 2001 From: hrabbach Date: Thu, 25 Jun 2026 23:00:23 +0200 Subject: [PATCH 8/8] chore(02): bump version to 1.1.0 for manual-device-entry release Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01QwPMgtiypLgJ5nkR3mUGfL --- custom_components/schellenberg_usb/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/schellenberg_usb/manifest.json b/custom_components/schellenberg_usb/manifest.json index 10d7b80..5b6a1fe 100644 --- a/custom_components/schellenberg_usb/manifest.json +++ b/custom_components/schellenberg_usb/manifest.json @@ -15,5 +15,5 @@ "manufacturer": "van ooijen" } ], - "version": "1.0.0" + "version": "1.1.0" }