From e5a3a63ec8cfce2e832ce7f75d93e87f9ca1fd56 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 11:18:27 +0200 Subject: [PATCH 1/8] test(03-01): add immediate-dispatch tests for timed open and close (CTRL-01) - Add test_timed_open_sends_command_immediately: asserts CMD_UP awaited once on async_open_cover for CONF_BIDIRECTIONAL=False cover, no event needed - Add test_timed_close_sends_command_immediately: asserts CMD_DOWN awaited once on async_close_cover for timed motor - Add CMD_UP, CMD_DOWN, CMD_STOP to const import block for use in tests --- tests/test_cover.py | 221 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) diff --git a/tests/test_cover.py b/tests/test_cover.py index fd57706..60b6b68 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -15,6 +15,9 @@ from custom_components.schellenberg_usb.api import SchellenbergUsbApi from custom_components.schellenberg_usb.const import ( + CMD_DOWN, + CMD_STOP, + CMD_UP, CONF_BIDIRECTIONAL, CONF_CLOSE_TIME, CONF_INITIAL_POSITION, @@ -847,3 +850,221 @@ async def test_timed_motor_position_loop_clears_flags( ) # Task should be done assert loop_task.done(), "Expected position loop task to be done after target reached" + + +# --------------------------------------------------------------------------- +# CTRL-01: Timed motor open/close dispatch immediately, no event wait +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_timed_open_sends_command_immediately( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CTRL-01: timed (non-bidirectional) open dispatches CMD_UP immediately. + + No inbound device event is needed — control_blind must be awaited exactly + once with CMD_UP upon async_open_cover, with no _handle_event involvement. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + cover._attr_current_cover_position = 0 + + with patch.object(cover, "_start_position_tracking"): + with patch.object(cover, "async_write_ha_state"): + await cover.async_open_cover() + + assert cover._attr_is_opening is True + assert cover._attr_is_closing is False + _async_mock(mock_api.control_blind).assert_awaited_once_with( + cover._device_enum, CMD_UP + ) + + +@pytest.mark.asyncio +async def test_timed_close_sends_command_immediately( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CTRL-01: timed motor close dispatches CMD_DOWN immediately. + + No inbound device event is needed — control_blind must be awaited exactly + once with CMD_DOWN upon async_close_cover, with no _handle_event involvement. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + cover._attr_current_cover_position = 100 + + with patch.object(cover, "_start_position_tracking"): + with patch.object(cover, "async_write_ha_state"): + await cover.async_close_cover() + + assert cover._attr_is_opening is False + assert cover._attr_is_closing is True + _async_mock(mock_api.control_blind).assert_awaited_once_with( + cover._device_enum, CMD_DOWN + ) + + +# --------------------------------------------------------------------------- +# CTRL-02 / D-01: Stop freezes the position estimate for timed motors +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_timed_stop_freezes_at_estimate( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-01: async_stop_cover freezes the position estimate (no endstop snap). + + A timed motor mid-open: stop must record the interpolated position at the + moment of the stop call — not snap to 0 or 100. CMD_STOP must be sent + exactly once. + """ + import time as _time + + # travel time: 1.0 s → 50% elapsed after 0.5 s from position 0 + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 1.0, + CONF_CLOSE_TIME: 1.0, + }, + ) + cover.hass = hass + cover._attr_is_opening = True + cover._attr_is_closing = False + cover._move_start_position = 0 + # Backdate start by 0.5 s: 0.5/1.0 * 100 = 50% change → position ~50 + cover._move_start_time = _time.monotonic() - 0.5 + + with patch.object(cover, "_stop_position_tracking"): + with patch.object(cover, "async_write_ha_state"): + await cover.async_stop_cover() + + assert cover._attr_is_opening is False + assert cover._attr_is_closing is False + frozen_pos = cover._attr_current_cover_position + assert frozen_pos is not None + assert 0 < frozen_pos < 100, ( + f"Expected mid estimate, got {frozen_pos}" + ) + _async_mock(mock_api.control_blind).assert_awaited_once_with( + cover._device_enum, CMD_STOP + ) + + +# --------------------------------------------------------------------------- +# CTRL-02 / D-02: Full run to completion resets position to 100% / 0% +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_timed_full_open_resets_to_100( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-02: full open run (target=None) resets position to 100 via endstop branch. + + _target_position must be None so the loop takes the endstop-completion + branch (cover.py:553-580), NOT the partial-move target-reached branch + (cover.py:526-551). _move_start_time is backdated beyond CONF_OPEN_TIME + so that _update_position drives position to 100 on the first loop tick. + """ + import time as _time + + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 0.2, + CONF_CLOSE_TIME: 0.2, + }, + ) + cover.hass = hass + cover._attr_is_opening = True + cover._attr_is_closing = False + # Explicitly set None so the endstop branch (not partial-move) is taken + cover._target_position = None + cover._move_start_position = 0 + # Backdate beyond travel time → elapsed/travel >= 1.0 → position = 100 + cover._move_start_time = _time.monotonic() - 0.5 + + with patch.object(cover, "async_write_ha_state"): + loop_task = hass.async_create_task(cover._async_position_update_loop()) + await asyncio.sleep(0.5) + + assert cover._attr_current_cover_position == 100, ( + f"Expected 100 after full open, got {cover._attr_current_cover_position}" + ) + assert cover._attr_is_opening is False + assert cover._attr_is_closing is False + assert loop_task.done(), "Expected loop task done after endstop reset" + + +@pytest.mark.asyncio +async def test_timed_full_close_resets_to_0( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-02: full close run (target=None) resets position to 0 via endstop branch. + + _target_position must be None so the loop takes the endstop-completion + branch (cover.py:553-566), NOT the partial-move target-reached branch. + _move_start_time is backdated beyond CONF_CLOSE_TIME so the loop exits on + the first tick. + """ + import time as _time + + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 0.2, + CONF_CLOSE_TIME: 0.2, + }, + ) + cover.hass = hass + cover._attr_is_opening = False + cover._attr_is_closing = True + # Explicitly set None so the endstop branch (not partial-move) is taken + cover._target_position = None + cover._move_start_position = 100 + # Backdate beyond travel time → elapsed/travel >= 1.0 → position = 0 + cover._move_start_time = _time.monotonic() - 0.5 + + with patch.object(cover, "async_write_ha_state"): + loop_task = hass.async_create_task(cover._async_position_update_loop()) + await asyncio.sleep(0.5) + + assert cover._attr_current_cover_position == 0, ( + f"Expected 0 after full close, got {cover._attr_current_cover_position}" + ) + assert cover._attr_is_opening is False + assert cover._attr_is_closing is False + assert cover._attr_is_closed is True + assert loop_task.done(), "Expected loop task done after endstop reset" From 0f7127fcd73260dc7d9b4b8bb51ec268a3f732e0 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 11:46:13 +0200 Subject: [PATCH 2/8] feat(03-02): add _is_calibrated flag and calibrated attribute (D-06/D-07) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - __init__: compute _is_calibrated from VALUE-presence of CONF_OPEN_TIME and CONF_CLOSE_TIME (is not None) — key-present-but-None stays uncalibrated (REVIEW-01) - _unrecorded_attributes: add 'calibrated' alongside 'mode' (D-07, T-03-04) - extra_state_attributes: expose calibrated key for timed motors only (D-07) - _handle_calibration_completed: set _is_calibrated=True before async_write_ha_state (REVIEW-05) - tests: 4 failing-first tests (RED confirmed) then GREEN for D-06/D-07 behaviours --- custom_components/schellenberg_usb/cover.py | 20 ++- tests/test_cover.py | 185 ++++++++++++++++++++ 2 files changed, 203 insertions(+), 2 deletions(-) diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index d693186..2fa58e8 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -215,7 +215,7 @@ class SchellenbergCover(CoverEntity, RestoreEntity): _attr_has_entity_name = True _attr_should_poll = False - _unrecorded_attributes = frozenset({"mode"}) + _unrecorded_attributes = frozenset({"mode", "calibrated"}) _attr_supported_features = ( CoverEntityFeature.OPEN @@ -282,6 +282,15 @@ def __init__( else None ) + # Calibrated = real open AND close times explicitly present (non-None). + # The DEFAULT_TRAVEL_TIME fallback does NOT count as calibrated (D-06). + # Value-presence check (is not None), not key-presence: a key present + # but explicitly set to None must not be treated as calibrated (REVIEW-01). + self._is_calibrated: bool = ( + device_data_dict.get(CONF_OPEN_TIME) is not None + and device_data_dict.get(CONF_CLOSE_TIME) is not None + ) + self._move_start_time: float | None = None self._move_start_position: int | None = None self._position_update_task: asyncio.Task[None] | None = None @@ -311,9 +320,12 @@ def entity_registry_enabled_default(self) -> bool: @property def extra_state_attributes(self) -> dict[str, Any]: """Return device-specific state attributes.""" - return { + attrs: dict[str, Any] = { "mode": "bidirectional" if self._is_bidirectional else "timed", } + if not self._is_bidirectional: + attrs["calibrated"] = self._is_calibrated + return attrs async def async_added_to_hass(self) -> None: """Run when entity about to be added to hass.""" @@ -436,6 +448,10 @@ def _handle_calibration_completed( self._attr_current_cover_position = 0 self._attr_is_closed = True + # Flip calibrated flag so the attribute reflects live state (REVIEW-05). + # Must run BEFORE async_write_ha_state() so the pushed state is correct. + self._is_calibrated = True + _LOGGER.info( "Device %s calibration updated: open_time=%.2fs, close_time=%.2fs. " "Cover position set to fully closed (0%%)", diff --git a/tests/test_cover.py b/tests/test_cover.py index 60b6b68..9a7b6ca 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -1068,3 +1068,188 @@ async def test_timed_full_close_resets_to_0( assert cover._attr_is_closing is False assert cover._attr_is_closed is True assert loop_task.done(), "Expected loop task done after endstop reset" + + +# --------------------------------------------------------------------------- +# 03-02: Calibrated flag (D-06 / REVIEW-01) and calibrated attribute (D-07) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_timed_calibrated_flag_requires_both_times( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """_is_calibrated is True only when both CONF_OPEN_TIME and CONF_CLOSE_TIME + are present with non-None values (D-06). + + DEFAULT_TRAVEL_TIME fallback must NOT set the flag. One time only is also + not enough — both must be present and non-None. + """ + # No travel times at all → uncalibrated + cover_uncal = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Uncalibrated", + device_data={CONF_BIDIRECTIONAL: False}, + ) + assert cover_uncal._is_calibrated is False + + # Both times present → calibrated + cover_cal = SchellenbergCover( + api=mock_api, + device_id="TM02", + device_enum="11", + device_name="Timed Calibrated", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 30.0, + CONF_CLOSE_TIME: 35.0, + }, + ) + assert cover_cal._is_calibrated is True + + # Only open time present → still uncalibrated + cover_open_only = SchellenbergCover( + api=mock_api, + device_id="TM03", + device_enum="12", + device_name="Open Only", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 30.0, + }, + ) + assert cover_open_only._is_calibrated is False + + +@pytest.mark.asyncio +async def test_timed_calibrated_flag_rejects_none_values( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Keys present but values None must NOT count as calibrated (REVIEW-01). + + Value-presence check (`is not None`), not key-presence check (`in dict`). + """ + # Both keys present but both None → uncalibrated + cover_both_none = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Both None", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: None, + CONF_CLOSE_TIME: None, + }, + ) + assert cover_both_none._is_calibrated is False + + # One None, one real value → still uncalibrated + cover_one_none = SchellenbergCover( + api=mock_api, + device_id="TM02", + device_enum="11", + device_name="One None", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: None, + CONF_CLOSE_TIME: 12.0, + }, + ) + assert cover_one_none._is_calibrated is False + + +@pytest.mark.asyncio +async def test_timed_calibrated_attribute_in_state( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """extra_state_attributes exposes calibrated for timed motors (D-07). + + Uncalibrated timed motor has calibrated=False; calibrated timed motor has + calibrated=True. Mode remains 'timed' in both cases. + """ + cover_uncal = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Uncalibrated", + device_data={CONF_BIDIRECTIONAL: False}, + ) + attrs_uncal = cover_uncal.extra_state_attributes + assert attrs_uncal["mode"] == "timed" + assert attrs_uncal["calibrated"] is False + + cover_cal = SchellenbergCover( + api=mock_api, + device_id="TM02", + device_enum="11", + device_name="Calibrated", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 30.0, + CONF_CLOSE_TIME: 35.0, + }, + ) + attrs_cal = cover_cal.extra_state_attributes + assert attrs_cal["mode"] == "timed" + assert attrs_cal["calibrated"] is True + + +@pytest.mark.asyncio +async def test_bidir_has_no_calibrated_attribute( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """Bidirectional covers must NOT expose a 'calibrated' key (D-07).""" + cover_bidir = SchellenbergCover( + api=mock_api, + device_id="BD01", + device_enum="10", + device_name="Bidirectional", + device_data={CONF_BIDIRECTIONAL: True}, + ) + attrs = cover_bidir.extra_state_attributes + assert attrs["mode"] == "bidirectional" + assert "calibrated" not in attrs + + +# --------------------------------------------------------------------------- +# 03-02: Uncalibrated set-position no-op gate (D-05) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_timed_set_position_noop_when_uncalibrated( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-05: set_position on an uncalibrated timed motor is a silent no-op. + + Neither async_open_cover nor async_close_cover must be called. + current_position must remain unchanged. No exception must be raised. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + cover._attr_current_cover_position = 50 + + with patch.object( + cover, "async_open_cover", new_callable=AsyncMock + ) as mock_open: + with patch.object( + cover, "async_close_cover", new_callable=AsyncMock + ) as mock_close: + await cover.async_set_cover_position(**{ATTR_POSITION: 80}) + + mock_open.assert_not_called() + mock_close.assert_not_called() + assert cover._attr_current_cover_position == 50 From 79bdc766cbcf3db20fbbe275ce417fd052ec07b8 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 11:52:36 +0200 Subject: [PATCH 3/8] feat(03-02): add uncalibrated set-position no-op gate (D-05) - async_set_cover_position: early-return guard at method top under `if not self._is_bidirectional and not self._is_calibrated:` with a single debug log; SET_POSITION feature retained (slider stays visible) - test: failing-first (RED confirmed) then GREEN for D-05 behaviour --- custom_components/schellenberg_usb/cover.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 2fa58e8..4544cf1 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -683,6 +683,12 @@ async def async_stop_cover(self, **kwargs: Any) -> None: async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the cover to a specific position.""" + if not self._is_bidirectional and not self._is_calibrated: + _LOGGER.debug( + "Timed motor %s: set-position ignored (not calibrated yet)", + self._attr_name, + ) + return target_position = kwargs[ATTR_POSITION] if self._attr_current_cover_position is None: From 17b7c6bb23e32c2a8766323a7aa73ebcf49fa52e Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 12:18:59 +0200 Subject: [PATCH 4/8] feat(03-03): D-08 mid-move endstop snap + D-11 _handle_event guard (TDD GREEN) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _restore_position_from_last_state helper (REVIEW-02): the generic recorded-position restore logic now lives in one place, called from both the bidirectional and timed-idle branches — no duplication, no drift risk. - Restructure async_added_to_hass: timed branch (not _is_bidirectional) snaps opening→100%% and closing→0%% on restart (D-08); bidirectional branch calls the shared helper unchanged (D-10). - Add if not self._is_bidirectional: return guard at the top of _handle_event (D-11/REVIEW-04) — stray inbound events on timed motors are a structural no-op, never mutating state or writing HA state. - Tests: test_timed_restart_opening_snaps_to_100, test_timed_restart_closing_snaps_to_0, test_timed_handle_event_ignored (all GREEN); bidirectional restore canary passes. --- custom_components/schellenberg_usb/cover.py | 110 ++++++++++++++----- tests/test_cover.py | 115 ++++++++++++++++++++ 2 files changed, 195 insertions(+), 30 deletions(-) diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 4544cf1..8192a29 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -327,6 +327,50 @@ def extra_state_attributes(self) -> dict[str, Any]: attrs["calibrated"] = self._is_calibrated return attrs + def _restore_position_from_last_state( + self, last_state: Any + ) -> None: + """Restore cover position from a HA last-known state. + + Contains the generic recorded-position restore logic: raw_position + extraction, int coercion, state-string fallback, clamp, is_closed, + and debug log. Called from both the bidirectional and timed-idle + branches so the logic lives in exactly one place (REVIEW-02). + """ + restored_position: int | None = None + raw_position = ( + last_state.attributes.get("current_position") + if "current_position" in last_state.attributes + else last_state.attributes.get(ATTR_POSITION) + ) + + if isinstance(raw_position, (int, float)): + restored_position = int(raw_position) + elif raw_position is not None: + try: + restored_position = int(str(raw_position)) + except ValueError: + restored_position = None + + if restored_position is None: + if last_state.state == "open": + restored_position = 100 + elif last_state.state == "closed": + restored_position = 0 + + if restored_position is not None: + self._attr_current_cover_position = max( + 0, min(100, restored_position) + ) + self._attr_is_closed = self._attr_current_cover_position == 0 + _LOGGER.debug( + "Restored position for %s (%s) to %d%% (raw=%s)", + self._attr_name, + self._device_id, + self._attr_current_cover_position, + raw_position, + ) + async def async_added_to_hass(self) -> None: """Run when entity about to be added to hass.""" await super().async_added_to_hass() @@ -336,38 +380,37 @@ async def async_added_to_hass(self) -> None: # Restore the last known state last_state = await self.async_get_last_state() - if last_state: - restored_position: int | None = None - raw_position = ( - last_state.attributes.get("current_position") - if "current_position" in last_state.attributes - else last_state.attributes.get(ATTR_POSITION) - ) - - if isinstance(raw_position, (int, float)): - restored_position = int(raw_position) - elif raw_position is not None: - try: - restored_position = int(str(raw_position)) - except ValueError: - restored_position = None - - if restored_position is None: - if last_state.state == "open": - restored_position = 100 - elif last_state.state == "closed": - restored_position = 0 - - if restored_position is not None: - self._attr_current_cover_position = max(0, min(100, restored_position)) - self._attr_is_closed = self._attr_current_cover_position == 0 + if last_state and not self._is_bidirectional: + # D-08: timed motor mid-move restart → snap to destination endstop. + # This branch runs before the recorded-position restore so a stale + # mid-move current_position attribute is discarded (plan key-link). + if last_state.state == "opening": + self._attr_current_cover_position = 100 + self._attr_is_closed = False _LOGGER.debug( - "Restored position for %s (%s) to %d%% (raw=%s)", + "Timed motor %s was opening at restart," + " snapping to 100%%", + self._attr_name, + ) + elif last_state.state == "closing": + self._attr_current_cover_position = 0 + self._attr_is_closed = True + _LOGGER.debug( + "Timed motor %s was closing at restart," + " snapping to 0%%", self._attr_name, - self._device_id, - self._attr_current_cover_position, - raw_position, ) + else: + # D-09: idle timed motor → recorded position wins. + # The helper handles raw_position extraction, is None sentinel, + # and clamp. A real recorded 0%% is preserved (not overridden). + # Missing-data fallback (initial_position / 100) is layered in + # Task 2 after this call. + self._restore_position_from_last_state(last_state) + + elif last_state and self._is_bidirectional: + # Bidirectional path: use the shared helper (REVIEW-02 — no copy). + self._restore_position_from_last_state(last_state) if self._attr_current_cover_position is None: if self._initial_position is not None: @@ -384,7 +427,8 @@ async def async_added_to_hass(self) -> None: self._attr_current_cover_position = 0 self._attr_is_closed = True _LOGGER.debug( - "No previous state for %s (%s); defaulting position to 0%% (closed)", + "No previous state for %s (%s);" + " defaulting position to 0%% (closed)", self._attr_name, self._device_id, ) @@ -470,6 +514,12 @@ async def async_will_remove_from_hass(self) -> None: @callback def _handle_event(self, event: str) -> None: """Handle events from the USB stick for this device.""" + # D-11 / REVIEW-04: timed motors produce no inbound frames; any stray + # event must not mutate state. This guard makes D-11 structurally + # self-enforcing — the whole event body is skipped for timed motors. + if not self._is_bidirectional: + return + _LOGGER.info( "Device %s (%s) received activity event: %s", self._attr_name, diff --git a/tests/test_cover.py b/tests/test_cover.py index 9a7b6ca..18adfd6 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -1253,3 +1253,118 @@ async def test_timed_set_position_noop_when_uncalibrated( mock_open.assert_not_called() mock_close.assert_not_called() assert cover._attr_current_cover_position == 50 + + +# --------------------------------------------------------------------------- +# 03-03: Timed motor restart restore (D-08, D-09, D-11) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_timed_restart_opening_snaps_to_100( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-08: timed motor was opening at HA restart → restore to 100%. + + The stale mid-move current_position (60%) must be discarded; the + endstop snap to 100% wins. Bidirectional guard ensures this branch + only fires for timed motors. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + + last_state = State( + "cover.timed_motor", "opening", {"current_position": 60} + ) + with patch.object(cover, "async_get_last_state", return_value=last_state): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 100 + assert cover._attr_is_closed is False + + +@pytest.mark.asyncio +async def test_timed_restart_closing_snaps_to_0( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-08: timed motor was closing at HA restart → restore to 0%. + + The stale mid-move current_position (40%) must be discarded; the + endstop snap to 0% wins. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + + last_state = State( + "cover.timed_motor", "closing", {"current_position": 40} + ) + with patch.object(cover, "async_get_last_state", return_value=last_state): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 0 + assert cover._attr_is_closed is True + + +@pytest.mark.asyncio +async def test_timed_handle_event_ignored( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-11 / REVIEW-04: a stray device event on a timed motor is a no-op. + + _handle_event must early-return for timed motors before any state + mutation. Neither is_opening nor is_closing must be set; position + must remain unchanged. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + cover._attr_current_cover_position = 75 + cover._attr_is_opening = False + cover._attr_is_closing = False + + with patch.object(cover, "async_write_ha_state") as mock_write: + cover._handle_event(EVENT_STARTED_MOVING_UP) + + # No state mutation, no HA state write + assert cover._attr_is_opening is False + assert cover._attr_is_closing is False + assert cover._attr_current_cover_position == 75 + mock_write.assert_not_called() + + with patch.object(cover, "async_write_ha_state") as mock_write2: + cover._handle_event(EVENT_STOPPED) + + assert cover._attr_is_opening is False + assert cover._attr_is_closing is False + assert cover._attr_current_cover_position == 75 + mock_write2.assert_not_called() From d73f31fc51ae5e5fd6fe82da319e69fa82964542 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 12:25:52 +0200 Subject: [PATCH 5/8] feat(03-03): D-09 idle restore + no-prior fallback 100%% for timed motors (TDD GREEN) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Timed-idle restart branch calls _restore_position_from_last_state (shared helper) — a real recorded 0%% is preserved via 'is None' sentinel, never replaced by initial_position (Pitfall 1 / REVIEW-02 — no inline duplication). - No-prior-state fallback for timed motors is now 100%% (assume open, D-09), NOT the existing bidirectional 0%% default; the existing else:0 is gated on self._is_bidirectional so bidirectional behavior is unchanged (D-10). - Tests: test_timed_restart_idle_restores_position (real-0%% key guard), test_timed_restart_no_prior_state_uses_initial_position, test_timed_restart_no_prior_no_initial_defaults_to_100 (all GREEN). --- custom_components/schellenberg_usb/cover.py | 14 +- tests/test_cover.py | 135 ++++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 8192a29..04a982c 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -423,7 +423,8 @@ async def async_added_to_hass(self) -> None: self._attr_name, self._attr_current_cover_position, ) - else: + elif self._is_bidirectional: + # Bidirectional: default to 0 (closed) — existing behavior. self._attr_current_cover_position = 0 self._attr_is_closed = True _LOGGER.debug( @@ -432,6 +433,17 @@ async def async_added_to_hass(self) -> None: self._attr_name, self._device_id, ) + else: + # D-09: timed motor with no prior state → assume open (100%%). + # Never collapse missing data to 0 (SC#4 slider regression). + self._attr_current_cover_position = 100 + self._attr_is_closed = False + _LOGGER.debug( + "No previous state for timed motor %s (%s);" + " defaulting to 100%% (assume open)", + self._attr_name, + self._device_id, + ) self.async_write_ha_state() diff --git a/tests/test_cover.py b/tests/test_cover.py index 18adfd6..9fa384f 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -1368,3 +1368,138 @@ async def test_timed_handle_event_ignored( assert cover._attr_is_closing is False assert cover._attr_current_cover_position == 75 mock_write2.assert_not_called() + + +@pytest.mark.asyncio +async def test_timed_restart_idle_restores_position( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-09: idle timed motor restart restores the recorded position as-is. + + A real recorded 0%% (cover fully closed) must survive restart — it must + NOT be replaced by initial_position or the 100%% fallback (Pitfall 1: + the falsy-0 trap). Also verifies a non-zero idle case (30%%). + """ + # Case 1: real recorded 0%% — closed cover stays closed + cover_zero = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False, CONF_INITIAL_POSITION: 50}, + ) + cover_zero.hass = hass + + last_state_zero = State( + "cover.timed_motor", "closed", {"current_position": 0} + ) + with patch.object( + cover_zero, "async_get_last_state", return_value=last_state_zero + ): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover_zero, "async_write_ha_state"): + await cover_zero.async_added_to_hass() + + assert cover_zero._attr_current_cover_position == 0, ( + "Real recorded 0%% must survive — must not be replaced by" + f" initial_position; got {cover_zero._attr_current_cover_position}" + ) + assert cover_zero._attr_is_closed is True + + # Case 2: non-zero idle restore (30%%) + cover_thirty = SchellenbergCover( + api=mock_api, + device_id="TM02", + device_enum="11", + device_name="Timed Motor 30", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover_thirty.hass = hass + + last_state_thirty = State( + "cover.timed_motor_30", "open", {"current_position": 30} + ) + with patch.object( + cover_thirty, + "async_get_last_state", + return_value=last_state_thirty, + ): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover_thirty, "async_write_ha_state"): + await cover_thirty.async_added_to_hass() + + assert cover_thirty._attr_current_cover_position == 30 + assert cover_thirty._attr_is_closed is False + + +@pytest.mark.asyncio +async def test_timed_restart_no_prior_state_uses_initial_position( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-09: timed motor with no prior state uses CONF_INITIAL_POSITION. + + When async_get_last_state returns None AND CONF_INITIAL_POSITION is + set, position must be seeded to that value (not 0 and not 100). + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False, CONF_INITIAL_POSITION: 70}, + ) + cover.hass = hass + + with patch.object(cover, "async_get_last_state", return_value=None): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 70 + + +@pytest.mark.asyncio +async def test_timed_restart_no_prior_no_initial_defaults_to_100( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """D-09: timed motor with no prior state and no initial_position defaults + to 100%% (assume open), never 0. + + SC#4: the slider must not jump to 0%% after restart. This is the key + regression guard — the existing bidirectional default of 0 must NOT + apply to timed motors. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={CONF_BIDIRECTIONAL: False}, + ) + cover.hass = hass + + with patch.object(cover, "async_get_last_state", return_value=None): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 100, ( + "Timed motor with no prior state must default to 100%% (assume open)," + f" not 0; got {cover._attr_current_cover_position}" + ) + assert cover._attr_is_closed is False From 22839dd57552ca87f1c4e83c97be222638ce8d68 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 13:18:42 +0200 Subject: [PATCH 6/8] test(03-04): add CTRL-05 bidirectional zero-regression tests --- tests/test_cover.py | 207 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) diff --git a/tests/test_cover.py b/tests/test_cover.py index 9fa384f..bcbcae5 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -1503,3 +1503,210 @@ async def test_timed_restart_no_prior_no_initial_defaults_to_100( f" not 0; got {cover._attr_current_cover_position}" ) assert cover._attr_is_closed is False + + +# --------------------------------------------------------------------------- +# 03-04 / CTRL-05: Bidirectional zero-regression tests +# Prove every Phase 3 gate is timed-only; bidirectional path is unaffected. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_bidir_open_close_stop_regression( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CTRL-05: bidirectional open/close/stop dispatch and freeze-stop unchanged. + + Uses a device_data with NO CONF_BIDIRECTIONAL key to exercise the + read-default True legacy path (SC#5 safety case). Verifies CMD_UP / + CMD_DOWN / CMD_STOP are dispatched exactly once for open / close / stop, + and that stop calls _stop_position_tracking before mutating position + (freeze-at-estimate, not snap). + """ + # --- open --- + cover_open = SchellenbergCover( + api=mock_api, + device_id="BD01", + device_enum="10", + device_name="Bidir Open", + # Deliberately omit CONF_BIDIRECTIONAL: exercises read-default True + device_data={"device_id": "BD01", "device_enum": "10"}, + ) + cover_open.hass = hass + cover_open._attr_current_cover_position = 0 + + with patch.object(cover_open, "_start_position_tracking"): + with patch.object(cover_open, "async_write_ha_state"): + await cover_open.async_open_cover() + + assert cover_open._attr_is_opening is True + assert cover_open._attr_is_closing is False + _async_mock(mock_api.control_blind).assert_awaited_once_with( + cover_open._device_enum, CMD_UP + ) + + # --- close --- + _async_mock(mock_api.control_blind).reset_mock() + cover_close = SchellenbergCover( + api=mock_api, + device_id="BD02", + device_enum="11", + device_name="Bidir Close", + device_data={"device_id": "BD02", "device_enum": "11"}, + ) + cover_close.hass = hass + cover_close._attr_current_cover_position = 100 + + with patch.object(cover_close, "_start_position_tracking"): + with patch.object(cover_close, "async_write_ha_state"): + await cover_close.async_close_cover() + + assert cover_close._attr_is_opening is False + assert cover_close._attr_is_closing is True + _async_mock(mock_api.control_blind).assert_awaited_once_with( + cover_close._device_enum, CMD_DOWN + ) + + # --- stop (freeze-at-estimate) --- + _async_mock(mock_api.control_blind).reset_mock() + cover_stop = SchellenbergCover( + api=mock_api, + device_id="BD03", + device_enum="12", + device_name="Bidir Stop", + device_data={"device_id": "BD03", "device_enum": "12"}, + ) + cover_stop.hass = hass + cover_stop._attr_is_opening = True + cover_stop._attr_current_cover_position = 50 + + stop_order: list[str] = [] + + def _record_stop() -> None: + stop_order.append("stop_tracking") + cover_stop._attr_current_cover_position = 50 # mock freeze + + with patch.object( + cover_stop, "_stop_position_tracking", side_effect=_record_stop + ): + with patch.object(cover_stop, "_update_position"): + with patch.object(cover_stop, "async_write_ha_state"): + await cover_stop.async_stop_cover() + + # _stop_position_tracking must be called before CMD_STOP dispatch + assert stop_order == ["stop_tracking"], ( + "stop_tracking must be called before position mutation" + ) + assert cover_stop._attr_is_opening is False + assert cover_stop._attr_is_closing is False + _async_mock(mock_api.control_blind).assert_awaited_once_with( + cover_stop._device_enum, CMD_STOP + ) + + +@pytest.mark.asyncio +async def test_bidir_set_position_not_gated( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CTRL-05 / D-05: bidirectional set-position is NOT gated on calibration. + + A bidirectional cover with NO calibration times (_is_calibrated False) + must still perform a real move — async_open_cover must be awaited when + the target exceeds the current position. The D-05 uncalibrated gate is + timed-only (`not self._is_bidirectional and not self._is_calibrated`). + """ + cover = SchellenbergCover( + api=mock_api, + device_id="BD01", + device_enum="10", + device_name="Bidir Cover", + # No CONF_OPEN_TIME / CONF_CLOSE_TIME → _is_calibrated False + device_data={CONF_BIDIRECTIONAL: True}, + ) + cover.hass = hass + cover._attr_current_cover_position = 50 + + assert cover._is_bidirectional is True + assert cover._is_calibrated is False + + with patch.object( + cover, "async_open_cover", new_callable=AsyncMock + ) as mock_open: + await cover.async_set_cover_position(**{ATTR_POSITION: 80}) + + # Gate must NOT have fired — open was called + mock_open.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_bidir_restore_not_snapped_to_endstop( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CTRL-05 / D-08 / REVIEW-02: bidirectional mid-move restore is NOT endstop-snapped. + + A bidirectional cover recorded as 'opening' with current_position=60 must + restore to 60, NOT to 100. The D-08 endstop snap is timed-only; the + bidirectional branch calls the shared _restore_position_from_last_state + helper (REVIEW-02 canary — logic lives in exactly one place). + """ + cover = SchellenbergCover( + api=mock_api, + device_id="BD01", + device_enum="10", + device_name="Bidir Cover", + device_data={CONF_BIDIRECTIONAL: True}, + ) + cover.hass = hass + + last_state = State( + "cover.bidir_cover", "opening", {"current_position": 60} + ) + with patch.object(cover, "async_get_last_state", return_value=last_state): + with patch( + "custom_components.schellenberg_usb.cover" + ".async_dispatcher_connect" + ): + with patch.object(cover, "async_write_ha_state"): + await cover.async_added_to_hass() + + assert cover._attr_current_cover_position == 60, ( + "Bidirectional mid-move restore must use recorded position (60)," + f" not the endstop snap (100); got {cover._attr_current_cover_position}" + ) + assert cover._attr_is_closed is False + + +@pytest.mark.asyncio +async def test_legacy_no_bidirectional_key_defaults_bidirectional( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CTRL-05 / REVIEW-06: subentry with NO CONF_BIDIRECTIONAL key is bidirectional. + + A cover built from a legacy Phase-1 subentry (no CONF_BIDIRECTIONAL key + at all) must have _is_bidirectional True (read-default at cover.py:276) + AND its extra_state_attributes must expose mode='bidirectional' with NO + 'calibrated' key. Dedicated test per REVIEW-06 — not folded into another. + """ + cover = SchellenbergCover( + api=mock_api, + device_id="ABC123", + device_enum="10", + device_name="Legacy Cover", + # Intentionally omit CONF_BIDIRECTIONAL — simulates Phase-1 subentry + device_data={"device_id": "ABC123", "device_enum": "10"}, + ) + + assert cover._is_bidirectional is True, ( + "No-key subentry must default to bidirectional (read-default True)" + ) + attrs = cover.extra_state_attributes + assert attrs["mode"] == "bidirectional", ( + f"Expected mode='bidirectional', got mode='{attrs['mode']}'" + ) + assert "calibrated" not in attrs, ( + f"Bidirectional cover must NOT expose 'calibrated'; attrs={attrs}" + ) From 43b5cc04e097fde6a94e881dac9ae9f55faf5e08 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 14:51:06 +0200 Subject: [PATCH 7/8] fix(03): apply code review findings (CR-01 stale-target + WR-01..04) --- custom_components/schellenberg_usb/cover.py | 68 +++++++++++++++------ tests/test_cover.py | 56 +++++++++++++++++ 2 files changed, 106 insertions(+), 18 deletions(-) diff --git a/custom_components/schellenberg_usb/cover.py b/custom_components/schellenberg_usb/cover.py index 04a982c..241a703 100644 --- a/custom_components/schellenberg_usb/cover.py +++ b/custom_components/schellenberg_usb/cover.py @@ -66,9 +66,14 @@ async def _get_cal_store(hass: HomeAssistant) -> tuple[Store, dict[str, Any]]: if cache is None: try: cache = await store.async_load() or {} - except Exception: - # If the JSON is corrupted, don't break the integration setup - _LOGGER.exception("Failed to load calibration store, starting with empty data") + except Exception: # noqa: BLE001 + # If the JSON is corrupted, don't break the integration setup. + # Broad catch is intentional: Store.async_load can surface a + # range of deserialization/OS errors on a corrupt record, and + # setup must degrade to an empty cache rather than fail. + _LOGGER.exception( + "Failed to load calibration store, starting with empty data" + ) cache = {} data[_DATA_CACHE] = cache @@ -256,11 +261,16 @@ def __init__( # Position calculation attributes - use calibration times if available device_data_dict = dict(device_data) if device_data is not None else {} - self._travel_time_open: float = device_data_dict.get( - CONF_OPEN_TIME, DEFAULT_TRAVEL_TIME + # Coerce None/0.0 from persisted/merged data to the default: a + # partial/corrupt calibration record can store None for a time + # (and .get(key, default) returns the stored None when the key is + # present), and a 0-second travel time would divide-by-zero + # downstream — both must fall back to DEFAULT_TRAVEL_TIME (WR-03). + self._travel_time_open: float = ( + device_data_dict.get(CONF_OPEN_TIME) or DEFAULT_TRAVEL_TIME ) - self._travel_time_close: float = device_data_dict.get( - CONF_CLOSE_TIME, DEFAULT_TRAVEL_TIME + self._travel_time_close: float = ( + device_data_dict.get(CONF_CLOSE_TIME) or DEFAULT_TRAVEL_TIME ) # Mode flag: True = bidirectional (can receive events), False = timed. @@ -622,7 +632,6 @@ async def _async_position_update_loop(self) -> None: self._attr_is_closing = False self._attr_is_closed = self._attr_current_cover_position == 0 self._target_position = None - self._position_update_task = None self._move_start_time = None self._move_start_position = None self.async_write_ha_state() @@ -635,7 +644,6 @@ async def _async_position_update_loop(self) -> None: and self._attr_current_cover_position <= 0 ): self._attr_current_cover_position = 0 - self._position_update_task = None self._attr_is_opening = False self._attr_is_closing = False self._move_start_time = None @@ -649,7 +657,6 @@ async def _async_position_update_loop(self) -> None: and self._attr_current_cover_position >= 100 ): self._attr_current_cover_position = 100 - self._position_update_task = None self._attr_is_opening = False self._attr_is_closing = False self._move_start_time = None @@ -663,8 +670,13 @@ async def _async_position_update_loop(self) -> None: except asyncio.CancelledError: _LOGGER.debug("Position tracking cancelled for device %s", self._attr_name) - self._position_update_task = None raise + finally: + # Clear the handle only if it still points at THIS task, so a + # concurrent _start_position_tracking() that already swapped in a + # new task isn't clobbered by this one's exit (WR-02). + if self._position_update_task is asyncio.current_task(): + self._position_update_task = None def _update_position(self) -> None: """Calculate and update the position based on travel time.""" @@ -689,7 +701,9 @@ def _update_position(self) -> None: else: return - self._attr_current_cover_position = max(0, min(100, int(new_pos))) + self._attr_current_cover_position = max( + 0, min(100, int(round(new_pos))) + ) self._attr_is_closed = self._attr_current_cover_position == 0 _LOGGER.debug( @@ -700,9 +714,18 @@ def _update_position(self) -> None: travel_time, ) - async def async_open_cover(self, **kwargs: Any) -> None: - """Open the cover.""" + async def async_open_cover( + self, target: int | None = None, **kwargs: Any + ) -> None: + """Open the cover. + + ``target`` is the partial-move target for a set-position driven + move; a direct Open (the HA open button) passes ``None``, which + clears any stale set-position target so the cover runs to the + endstop instead of stopping at a leftover partial target (CR-01). + """ _LOGGER.debug("Opening cover %s (enum=%s)", self._attr_name, self._device_enum) + self._target_position = target self._attr_is_opening = True self._attr_is_closing = False self._move_start_time = time.monotonic() @@ -715,9 +738,18 @@ async def async_open_cover(self, **kwargs: Any) -> None: self.async_write_ha_state() await self._api.control_blind(self._device_enum, CMD_UP) - async def async_close_cover(self, **kwargs: Any) -> None: - """Close cover.""" + async def async_close_cover( + self, target: int | None = None, **kwargs: Any + ) -> None: + """Close cover. + + ``target`` is the partial-move target for a set-position driven + move; a direct Close (the HA close button) passes ``None``, which + clears any stale set-position target so the cover runs to the + endstop instead of stopping at a leftover partial target (CR-01). + """ _LOGGER.debug("Closing cover %s (enum=%s)", self._attr_name, self._device_enum) + self._target_position = target self._attr_is_opening = False self._attr_is_closing = True self._move_start_time = time.monotonic() @@ -777,13 +809,13 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: self._attr_name, target_position, ) - await self.async_open_cover() + await self.async_open_cover(target=target_position) else: _LOGGER.info( "Moving cover %s DOWN to reach target %d%%", self._attr_name, target_position, ) - await self.async_close_cover() + await self.async_close_cover(target=target_position) # The position tracking loop will automatically send the stop command # when the target position is reached. \ No newline at end of file diff --git a/tests/test_cover.py b/tests/test_cover.py index bcbcae5..fc9328a 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -1070,6 +1070,62 @@ async def test_timed_full_close_resets_to_0( assert loop_task.done(), "Expected loop task done after endstop reset" +# --------------------------------------------------------------------------- +# CR-01: A direct Open/Close after an in-flight set_position must NOT inherit +# the stale partial-move target and stop early. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_open_after_set_position_clears_stale_target( + hass: HomeAssistant, + mock_api: SchellenbergUsbApi, +) -> None: + """CR-01: full Open after an interrupted set_position runs to the endstop. + + Sequence: set_position(50) leaves _target_position=50 while the move is + in flight; the user then presses the full Open button (async_open_cover + with no target). The stale target must be cleared so the loop takes the + endstop branch and drives to 100 — NOT the partial-move branch that would + snap to 50 and stop early. + """ + import time as _time + + cover = SchellenbergCover( + api=mock_api, + device_id="TM01", + device_enum="10", + device_name="Timed Motor", + device_data={ + CONF_BIDIRECTIONAL: False, + CONF_OPEN_TIME: 0.2, + CONF_CLOSE_TIME: 0.2, + }, + ) + cover.hass = hass + cover._attr_current_cover_position = 0 + # Simulate an in-flight set_position(50) leaving a stale partial target. + cover._target_position = 50 + + with patch.object(cover, "async_write_ha_state"): + # Full Open button: no target → must clear the stale 50. + await cover.async_open_cover() + assert cover._target_position is None, ( + "async_open_cover must clear the stale set-position target" + ) + # Backdate so the loop drives straight past 100 on the first tick. + cover._move_start_time = _time.monotonic() - 0.5 + await asyncio.sleep(0.5) + + assert cover._attr_current_cover_position == 100, ( + "Expected full Open to reach 100 (not stop at stale target 50), " + f"got {cover._attr_current_cover_position}" + ) + assert cover._attr_is_opening is False + assert cover._target_position is None + mock_api.control_blind.assert_any_call("10", CMD_UP) + + # --------------------------------------------------------------------------- # 03-02: Calibrated flag (D-06 / REVIEW-01) and calibrated attribute (D-07) # --------------------------------------------------------------------------- From 7350a92d285a747545ff78576e5850f3d910ade0 Mon Sep 17 00:00:00 2001 From: hrabbach Date: Fri, 26 Jun 2026 15:34:35 +0200 Subject: [PATCH 8/8] =?UTF-8?q?chore(release):=20bump=20manifest=20to=20v1?= =?UTF-8?q?.2.0=20(Phase=203=20=E2=80=94=20timed=20motor=20control)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- custom_components/schellenberg_usb/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/schellenberg_usb/manifest.json b/custom_components/schellenberg_usb/manifest.json index aca54ef..305873f 100644 --- a/custom_components/schellenberg_usb/manifest.json +++ b/custom_components/schellenberg_usb/manifest.json @@ -15,5 +15,5 @@ "manufacturer": "van ooijen" } ], - "version": "1.1.2" + "version": "1.2.0" }