Skip to content
163 changes: 156 additions & 7 deletions custom_components/schellenberg_usb/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from __future__ import annotations

import logging
from typing import Any, Awaitable, cast
import re
from collections.abc import Awaitable
from typing import Any, cast

import serial # NOTE: blocking open used only to sanity-check connectivity
import voluptuous as vol
Expand All @@ -18,7 +20,10 @@
from homeassistant.helpers.service_info.usb import UsbServiceInfo

from .const import (
CONF_BIDIRECTIONAL,
CONF_CLOSE_TIME,
CONF_DEVICE_ID,
CONF_INITIAL_POSITION,
CONF_OPEN_TIME,
CONF_SERIAL_PORT,
DOMAIN,
Expand Down Expand Up @@ -201,6 +206,7 @@ def __init__(self) -> None:
self._pending_device_id: str | None = None
self._pending_device_enum: str | None = None
self._pending_device_name: str | None = None
self._pending_is_bidirectional: bool = False

def _get_calibration_handler(self) -> CalibrationFlowHandler:
"""Return (and lazily create) the calibration flow handler."""
Expand All @@ -218,16 +224,146 @@ async def _await_subentry_result(
async def async_step_blind(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Entry point when the user clicks the 'Pair device' button.
"""Entry point when the user clicks the 'Add device' button.

Home Assistant calls async_step_{subentry_type}() where subentry_type is
the key returned by async_get_supported_subentry_types. Since our type is
'blind', we implement async_step_blind(). Previously this was named
async_step_pairing, which caused the flow to fall back and the
translation key for the initiate button to be missing.
'blind', we implement async_step_blind(). Delegates to async_step_menu
so the user can choose between auto-pair and manual-add.
"""
_LOGGER.debug("Subentry blind flow initiated (pairing new device)")
return await self.async_step_user(user_input)
_LOGGER.debug("Subentry blind flow initiated")
return await self.async_step_menu(user_input)

async def async_step_menu(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Show menu: Pair automatically or Add manually."""
return self.async_show_menu(
step_id="menu",
menu_options=["user", "manual_add"],
)

async def async_step_manual_add(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Collect device enum, mode, and optional name for manual-add."""
errors: dict[str, str] = {}

if user_input is not None:
# Normalize to uppercase before validation and storage (Pitfall 4)
device_enum = user_input.get("device_enum", "").upper()

# Format check: exactly 2 hex characters
if not re.match(r"^[0-9A-Fa-f]{2}$", device_enum):
errors["device_enum"] = "invalid_enum_format"
else:
# Duplicate check across existing blind subentries
hub_entry = self._get_entry()
existing_enums = {
s.data.get("device_enum")
for s in hub_entry.subentries.values()
if s.subentry_type == SUBENTRY_TYPE_BLIND
}
if device_enum in existing_enums:
errors["device_enum"] = "duplicate_enum"

if not errors:
# Resolve mode — BooleanSelector returns a real Python bool
is_bidirectional: bool = bool(
user_input.get(CONF_BIDIRECTIONAL, True)
)
device_name = (
user_input.get("device_name") or f"Blind {device_enum}"
)
self._pending_device_enum = device_enum
self._pending_device_name = device_name
self._pending_is_bidirectional = is_bidirectional

if is_bidirectional:
_LOGGER.info(
"Creating bidirectional manual subentry for enum %s",
device_enum,
)
return self.async_create_entry(
title=device_name,
data={
CONF_DEVICE_ID: device_enum,
"device_enum": device_enum,
CONF_BIDIRECTIONAL: True,
},
unique_id=device_enum,
)
# Timed: advance to initial-position step
_LOGGER.debug(
"Timed motor %s: advancing to position step", device_enum
)
return await self.async_step_manual_position()

return self.async_show_form(
step_id="manual_add",
data_schema=vol.Schema(
{
vol.Required("device_enum"): selector.TextSelector(),
vol.Required(
CONF_BIDIRECTIONAL, default=True
): selector.BooleanSelector(),
vol.Optional("device_name"): selector.TextSelector(),
}
),
errors=errors,
)

async def async_step_manual_position(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Collect initial position for timed motors (shown only after mode=timed)."""
if not self._pending_device_enum:
return self.async_abort(reason="pairing_failed")

if user_input is not None:
initial_position = int(user_input.get("initial_position", 100))
# Clamp to 0-100 as defense in depth (slider already bounds, but be safe)
initial_position = max(0, min(100, initial_position))
device_enum = self._pending_device_enum or ""
device_name = self._pending_device_name or f"Blind {device_enum}"
_LOGGER.info(
"Creating timed manual subentry for enum %s at initial position %d%%",
device_enum,
initial_position,
)
return self.async_create_entry(
title=device_name,
data={
CONF_DEVICE_ID: device_enum,
"device_enum": device_enum,
CONF_BIDIRECTIONAL: False,
CONF_INITIAL_POSITION: initial_position,
},
unique_id=device_enum,
)

return self.async_show_form(
step_id="manual_position",
data_schema=vol.Schema(
{
vol.Optional(
"initial_position", default=100
): selector.NumberSelector(
selector.NumberSelectorConfig(
min=0,
max=100,
step=1,
unit_of_measurement="%",
mode=selector.NumberSelectorMode.SLIDER,
)
),
}
),
description_placeholders={
"device_name": self._pending_device_name or "",
},
last_step=True,
)

async def async_step_user(
self, user_input: dict[str, Any] | None = None
Expand Down Expand Up @@ -329,6 +465,19 @@ async def async_step_reconfigure(
if not device_id:
return self.async_abort(reason="device_not_found")

# Guard: timed motors cannot calibrate via the event-waiting CalibrationFlowHandler
# (they never send EVENT_STARTED_MOVING_*/EVENT_STOPPED, so calibration hangs).
# Use the same missing-key default as cover.py (True = bidirectional) so legacy
# flag-less subentries are still treated as bidirectional here (REVIEW-2, T-02-04).
# Timed-motor calibration is deferred to Phase 4 / CAL-01.
is_bidirectional = bool(subentry.data.get(CONF_BIDIRECTIONAL, True))
if not is_bidirectional:
_LOGGER.debug(
"Reconfigure blocked for timed motor %s: calibration not yet supported",
device_id,
)
return self.async_abort(reason="timed_calibration_unavailable")

# Build a minimal device record; calibration handler will enrich after timing
device_name = subentry.title or f"Blind {device_id}"
handler.set_selected_device(
Expand Down
4 changes: 4 additions & 0 deletions custom_components/schellenberg_usb/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,7 @@
CONF_OPEN_TIME = "open_time" # Time it takes to open (up) in seconds
CONF_CLOSE_TIME = "close_time" # Time it takes to close (down) in seconds
CONF_DEVICE_ID = "device_id" # Device ID for calibration

# Manual-add device mode flag (stored in subentry.data)
CONF_BIDIRECTIONAL = "bidirectional" # bool; False = timed/non-bidirectional
CONF_INITIAL_POSITION = "initial_position" # int 0-100; timed motors only
58 changes: 51 additions & 7 deletions custom_components/schellenberg_usb/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
CMD_DOWN,
CMD_STOP,
CMD_UP,
CONF_BIDIRECTIONAL,
CONF_CLOSE_TIME,
CONF_INITIAL_POSITION,
CONF_OPEN_TIME,
CONF_SERIAL_PORT,
DOMAIN,
Expand Down Expand Up @@ -213,6 +215,7 @@ class SchellenbergCover(CoverEntity, RestoreEntity):

_attr_has_entity_name = True
_attr_should_poll = False
_unrecorded_attributes = frozenset({"mode"})

_attr_supported_features = (
CoverEntityFeature.OPEN
Expand Down Expand Up @@ -260,6 +263,25 @@ def __init__(
CONF_CLOSE_TIME, DEFAULT_TRAVEL_TIME
)

# Mode flag: True = bidirectional (can receive events), False = timed.
# Read-default is True so legacy Phase-1 auto-paired subentries that have
# NO CONF_BIDIRECTIONAL key are treated as bidirectional — preventing a
# CTRL-05 regression (Phase 3 would route them through timed control).
# Manual adds ALWAYS write the key explicitly, so this default only
# affects pre-existing flag-less subentries. (Phase 2 known limitation:
# bidirectional manual adds store device_id as 2-char enum, so inbound
# 6-char ss-frame device_id matches will miss _registered_devices — see
# RESEARCH.md "Signal Filter Coupling". No fix needed for timed motors
# as they produce no inbound frames. Tracked for a v2 story.)
self._is_bidirectional: bool = bool(
device_data_dict.get(CONF_BIDIRECTIONAL, True)
)
self._initial_position: int | None = (
int(device_data_dict[CONF_INITIAL_POSITION])
if CONF_INITIAL_POSITION in device_data_dict
else None
)

self._move_start_time: float | None = None
self._move_start_position: int | None = None
self._position_update_task: asyncio.Task[None] | None = None
Expand All @@ -286,6 +308,13 @@ def entity_registry_enabled_default(self) -> bool:
"""Return if entity should be enabled by default."""
return True

@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return device-specific state attributes."""
return {
"mode": "bidirectional" if self._is_bidirectional else "timed",
}

async def async_added_to_hass(self) -> None:
"""Run when entity about to be added to hass."""
await super().async_added_to_hass()
Expand Down Expand Up @@ -329,13 +358,24 @@ async def async_added_to_hass(self) -> None:
)

if self._attr_current_cover_position is None:
self._attr_current_cover_position = 0
self._attr_is_closed = True
_LOGGER.debug(
"No previous state for %s (%s); defaulting position to 0%% (closed)",
self._attr_name,
self._device_id,
)
if self._initial_position is not None:
self._attr_current_cover_position = max(
0, min(100, self._initial_position)
)
self._attr_is_closed = self._attr_current_cover_position == 0
_LOGGER.debug(
"Seeding initial position for %s to %d%% from subentry.data",
self._attr_name,
self._attr_current_cover_position,
)
else:
self._attr_current_cover_position = 0
self._attr_is_closed = True
_LOGGER.debug(
"No previous state for %s (%s); defaulting position to 0%% (closed)",
self._attr_name,
self._device_id,
)

self.async_write_ha_state()

Expand Down Expand Up @@ -500,6 +540,10 @@ async def _async_position_update_loop(self) -> None:
if self._target_position not in (0, 100):
await self._api.control_blind(self._device_enum, CMD_STOP)

self._attr_is_opening = False
self._attr_is_closing = False
self._attr_is_closed = self._attr_current_cover_position == 0
self._target_position = None
self._position_update_task = None
self._move_start_time = None
self._move_start_position = None
Expand Down
2 changes: 1 addition & 1 deletion custom_components/schellenberg_usb/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
"manufacturer": "van ooijen"
}
],
"version": "1.0.0"
"version": "1.1.0"
}
35 changes: 30 additions & 5 deletions custom_components/schellenberg_usb/strings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
"menu": {
"title": "Schellenberg USB setup",
"menu_options": {
"user": "Set up USB hub",
"pair_device_menu": "Pair new device"
"user": "Set up USB hub"
}
},
"user": {
Expand Down Expand Up @@ -38,10 +37,33 @@
"blind": {
"entry_type": "Blind",
"initiate_flow": {
"user": "Pair device",
"user": "Add device",
"reconfigure": "Calibrate"
},
"step": {
"menu": {
"title": "Add a blind",
"menu_options": {
"user": "Pair automatically",
"manual_add": "Add manually (already paired)"
}
},
"manual_add": {
"title": "Add blind by transmit address",
"description": "Enter the 2-character hex transmit address (enum slot) the motor already listens to. Example: 10, 1A, 2B.",
"data": {
"device_enum": "Transmit address (hex)",
"bidirectional": "Bidirectional motor",
"device_name": "Friendly name (optional)"
}
},
"manual_position": {
"title": "Set initial position",
"description": "Where is {device_name} right now? This sets the starting position for the position slider. Note: positioning for timed motors uses a default travel time and is uncalibrated until calibration support is added in a later update.",
"data": {
"initial_position": "Current position"
}
},
"user": {
"title": "Pair a new device",
"description": "Press the pair button to activate pairing mode on your Schellenberg USB stick. Then press the pairing button on your blind motor within 2 minutes.",
Expand Down Expand Up @@ -80,12 +102,15 @@
"pairing_failed": "Pairing failed.",
"calibration_timeout": "Calibration timed out. The blind took more than 5 minutes to complete the movement. Please check the device and try again.",
"calibration_start_timeout": "Timeout waiting for the blind to start moving. Please ensure the device is properly connected and try again.",
"unknown": "An unexpected error occurred during calibration. Please try again."
"unknown": "An unexpected error occurred during calibration. Please try again.",
"invalid_enum_format": "Transmit address must be exactly 2 hexadecimal characters (e.g. 10, 1A, 2B).",
"duplicate_enum": "A blind with this transmit address is already added to the integration."
},
"abort": {
"pairing_timeout": "No device responded within 2 minutes. Please try again.",
"pairing_failed": "Pairing failed.",
"reconfigure_successful": "Calibration completed successfully"
"reconfigure_successful": "Calibration completed successfully",
"timed_calibration_unavailable": "Calibration for timed motors is not yet available. It will be added in a future update."
}
}
},
Expand Down
Loading