diff --git a/paradox/connections/prt3/protocol.py b/paradox/connections/prt3/protocol.py index a187b460..a525a357 100644 --- a/paradox/connections/prt3/protocol.py +++ b/paradox/connections/prt3/protocol.py @@ -59,7 +59,7 @@ def data_received(self, data: bytes): if cfg.LOGGING_DUMP_PACKETS: logger.debug("PRT3 <- %s", binascii.hexlify(line_with_cr)) - if line.strip(): # skip empty / whitespace-only lines + if line.strip(): # skip empty / whitespace-only lines self.handler.on_message(line_with_cr) def send_message(self, message: bytes): @@ -72,6 +72,11 @@ def send_message(self, message: bytes): self.check_active() if cfg.LOGGING_DUMP_PACKETS: - logger.debug("PRT3 -> %s", binascii.hexlify(message)) + # AA (arm with code) and AD (disarm) embed the user code in the + # payload; show only the command prefix to keep it out of logs. + if message[:2] in (b"AA", b"AD"): + logger.debug("PRT3 -> %s", binascii.hexlify(message[:5])) + else: + logger.debug("PRT3 -> %s", binascii.hexlify(message)) self.transport.write(message) diff --git a/paradox/hardware/prt3/panel.py b/paradox/hardware/prt3/panel.py index 05ceb49c..d61f8049 100644 --- a/paradox/hardware/prt3/panel.py +++ b/paradox/hardware/prt3/panel.py @@ -41,7 +41,6 @@ from paradox.config import config as cfg from paradox.hardware.panel import Panel from paradox.hardware.prt3 import adapter, encoder -from paradox.hardware.prt3.event import EVENT_MAP, PRT3Event from paradox.hardware.prt3.parser import ( PRT3AreaStatus, PRT3BufferFull, @@ -62,19 +61,19 @@ # Maps PAI command string → (encoder_fn, arm_mode_char) # All arm variants that don't require a user code use quick-arm. _QUICK_ARM_MODES = { - "arm": "A", # default arm → away - "arm_away": "A", - "arm_stay": "S", - "arm_force": "F", + "arm": "A", # default arm → away + "arm_away": "A", + "arm_stay": "S", + "arm_force": "F", "arm_instant": "I", - "arm_sleep": "I", # instant arm (no entry delay) — PRT3 closest to HA arm_night + "arm_sleep": "I", # instant arm (no entry delay) — PRT3 closest to HA arm_night } # Panic type → encoder function _PANIC_ENCODERS = { "emergency": encoder.encode_panic_emergency, - "medical": encoder.encode_panic_medical, - "fire": encoder.encode_panic_fire, + "medical": encoder.encode_panic_medical, + "fire": encoder.encode_panic_fire, } @@ -109,6 +108,24 @@ def __init__(self, core): cfg.PRT3_MAX_ZONES * cfg.IO_TIMEOUT, cfg.KEEP_ALIVE_INTERVAL, ) + # Labels are loaded serially at startup; with PRT3_MAX_USERS=999 and + # IO_TIMEOUT=0.5 the worst-case (all timeouts) is ~8 min just for users. + # Warn when the combined worst-case load time exceeds 2 min so it + # doesn't fire on the default 8+96+32 config. + label_load_seconds = ( + cfg.PRT3_MAX_AREAS + cfg.PRT3_MAX_ZONES + cfg.PRT3_MAX_USERS + ) * cfg.IO_TIMEOUT + if label_load_seconds > 120: + logger.warning( + "PRT3: startup label load may take up to %.0f s " + "(%d areas + %d zones + %d users at %.1f s timeout) — " + "consider reducing PRT3_MAX_USERS / PRT3_MAX_ZONES", + label_load_seconds, + cfg.PRT3_MAX_AREAS, + cfg.PRT3_MAX_ZONES, + cfg.PRT3_MAX_USERS, + cfg.IO_TIMEOUT, + ) # ------------------------------------------------------------------ # Message parsing @@ -160,7 +177,10 @@ async def _prt3_send_wait( timed out or hit buffer-full. """ async with self.core.request_lock: - buffer_full_or_match = lambda m: predicate(m) or isinstance(m, PRT3BufferFull) + + def buffer_full_or_match(m): + return predicate(m) or isinstance(m, PRT3BufferFull) + for attempt in range(1, retries + 1): self.core.connection.write(command_bytes) try: @@ -170,7 +190,9 @@ async def _prt3_send_wait( if isinstance(result, PRT3BufferFull): logger.warning( "PRT3: buffer full on attempt %d/%d, retrying: %s", - attempt, retries, command_bytes[:5], # [:5] excludes user code + attempt, + retries, + command_bytes[:5], # [:5] excludes user code ) continue return result @@ -178,7 +200,9 @@ async def _prt3_send_wait( if attempt < retries: logger.warning( "PRT3: timeout on attempt %d/%d, retrying: %s", - attempt, retries, command_bytes[:5], # [:5] excludes user code + attempt, + retries, + command_bytes[:5], # [:5] excludes user code ) return None @@ -210,10 +234,18 @@ async def initialize_communication(self, password) -> bool: """ from paradox.hardware.prt3.parser import PRT3SystemEvent - _any_prt3_msg = lambda m: isinstance( - m, (PRT3CommStatus, PRT3AreaStatus, PRT3ZoneStatus, PRT3LabelReply, - PRT3CommandEcho, PRT3SystemEvent) - ) + def _any_prt3_msg(m): + return isinstance( + m, + ( + PRT3CommStatus, + PRT3AreaStatus, + PRT3ZoneStatus, + PRT3LabelReply, + PRT3CommandEcho, + PRT3SystemEvent, + ), + ) logger.info("PRT3: checking serial link liveness") @@ -242,11 +274,14 @@ async def initialize_communication(self, password) -> bool: if isinstance(msg, PRT3CommStatus) and not msg.ok: logger.error("PRT3: panel communication failure (COMM&fail)") return False - logger.info("PRT3: serial link live (probe response: %s)", type(msg).__name__) + logger.info( + "PRT3: serial link live (probe response: %s)", type(msg).__name__ + ) return True except asyncio.TimeoutError: logger.error( - "PRT3: serial link unresponsive after %.0fs probe", cfg.PRT3_COMM_TIMEOUT + "PRT3: serial link unresponsive after %.0fs probe", + cfg.PRT3_COMM_TIMEOUT, ) return False @@ -273,7 +308,11 @@ async def _load_label_range( msg = await self._prt3_send_wait( cmd, lambda m, ec=expected_cmd, et=element_type, idx=i: ( - (isinstance(m, PRT3LabelReply) and m.element_type == et and m.index == idx) + ( + isinstance(m, PRT3LabelReply) + and m.element_type == et + and m.index == idx + ) or (isinstance(m, PRT3CommandEcho) and m.cmd == ec) ), ) @@ -297,9 +336,15 @@ async def load_labels(self) -> dict: """ logger.info("PRT3: loading labels") replies = ( - await self._load_label_range("area", cfg.PRT3_MAX_AREAS, encoder.encode_area_label_request, "AL") - + await self._load_label_range("zone", cfg.PRT3_MAX_ZONES, encoder.encode_zone_label_request, "ZL") - + await self._load_label_range("user", cfg.PRT3_MAX_USERS, encoder.encode_user_label_request, "UL") + await self._load_label_range( + "area", cfg.PRT3_MAX_AREAS, encoder.encode_area_label_request, "AL" + ) + + await self._load_label_range( + "zone", cfg.PRT3_MAX_ZONES, encoder.encode_zone_label_request, "ZL" + ) + + await self._load_label_range( + "user", cfg.PRT3_MAX_USERS, encoder.encode_user_label_request, "UL" + ) ) labels = adapter.labels_dict_from_replies(replies) logger.info( @@ -407,7 +452,10 @@ def _build_partition_cmd( mode = _QUICK_ARM_MODES[command] if user_code: try: - return encoder.encode_arm(partition, mode, user_code), f"AA{partition:03d}" + return ( + encoder.encode_arm(partition, mode, user_code), + f"AA{partition:03d}", + ) except ValueError as exc: logger.error("PRT3: invalid PRT3_USER_CODE for arm: %s", exc) return None @@ -439,7 +487,7 @@ async def control_partitions(self, partitions: list, command: str) -> bool: lambda m, ec=expected_echo: ( isinstance(m, PRT3CommandEcho) and m.cmd == ec ), - retries=2, + retries=1, # non-idempotent: retry can double-arm if echo was lost ) if msg is None: logger.warning("PRT3: timeout on %s partition %d", command, partition) @@ -448,7 +496,9 @@ async def control_partitions(self, partitions: list, command: str) -> bool: accepted = True else: logger.warning( - "PRT3: %s partition %d rejected by panel (&fail)", command, partition + "PRT3: %s partition %d rejected by panel (&fail)", + command, + partition, ) return accepted @@ -498,10 +548,12 @@ async def send_panic(self, partitions: list, panic_type: str, _code) -> bool: lambda m, ec=expected_echo: ( isinstance(m, PRT3CommandEcho) and m.cmd == ec ), - retries=2, + retries=1, # non-idempotent: retry can re-trigger panic if echo was lost ) if msg is None: - logger.warning("PRT3: timeout on %s panic area %d", panic_type, partition) + logger.warning( + "PRT3: timeout on %s panic area %d", panic_type, partition + ) continue if not msg.ok: logger.warning( diff --git a/paradox/hardware/prt3/parser.py b/paradox/hardware/prt3/parser.py index 43b171b3..32771d9f 100644 --- a/paradox/hardware/prt3/parser.py +++ b/paradox/hardware/prt3/parser.py @@ -46,9 +46,9 @@ [9] L=low_battery / O=no """ +from dataclasses import dataclass import logging import re -from dataclasses import dataclass from typing import Optional, Union logger = logging.getLogger("PAI").getChild(__name__) @@ -58,11 +58,11 @@ # Arm-state constants (value of PRT3AreaStatus.arm_state) # --------------------------------------------------------------------------- -ARM_DISARMED = "disarmed" -ARM_AWAY = "armed_away" -ARM_FORCE = "armed_force" -ARM_STAY = "armed_stay" -ARM_INSTANT = "armed_instant" +ARM_DISARMED = "disarmed" +ARM_AWAY = "armed_away" +ARM_FORCE = "armed_force" +ARM_STAY = "armed_stay" +ARM_INSTANT = "armed_instant" _ARM_STATE_MAP: dict = { "D": ARM_DISARMED, @@ -76,9 +76,9 @@ # Zone open-state constants (value of PRT3ZoneStatus.open_state) # --------------------------------------------------------------------------- -ZONE_CLOSED = "closed" -ZONE_OPEN = "open" -ZONE_TAMPERED = "tampered" +ZONE_CLOSED = "closed" +ZONE_OPEN = "open" +ZONE_TAMPERED = "tampered" ZONE_FIRE_LOOP_TROUBLE = "fire_loop_trouble" _ZONE_OPEN_MAP: dict = { @@ -96,6 +96,7 @@ @dataclass class PRT3CommStatus: """``COMM&ok`` or ``COMM&fail`` — combus/module communication status.""" + ok: bool # True = panel ready @@ -112,7 +113,8 @@ class PRT3CommandEcho: reply. For info commands that fail (e.g. unknown area), this is also returned. """ - cmd: str # exactly the first 5 ASCII chars of the command that was sent + + cmd: str # exactly the first 5 ASCII chars of the command that was sent ok: bool @@ -124,11 +126,12 @@ class PRT3AreaStatus: ready to arm (open zone, active trouble, etc.). The adapter layer should negate this to produce a ``ready`` property. """ + area: int - arm_state: str # one of the ARM_* constants above + arm_state: str # one of the ARM_* constants above in_programming: bool trouble: bool - not_ready: bool # True → area is NOT ready + not_ready: bool # True → area is NOT ready alarm: bool strobe: bool zone_in_memory: bool @@ -137,8 +140,9 @@ class PRT3AreaStatus: @dataclass class PRT3ZoneStatus: """Reply to ``RZ{nnn}`` — zone status flags.""" + zone: int - open_state: str # one of the ZONE_* constants above + open_state: str # one of the ZONE_* constants above alarm: bool fire_alarm: bool supervision_trouble: bool @@ -148,9 +152,10 @@ class PRT3ZoneStatus: @dataclass class PRT3LabelReply: """Reply to ``ZL/AL/UL{nnn}`` — 16-character ASCII label (spaces preserved).""" + element_type: str # "zone", "area", or "user" index: int - label: str # exactly 16 chars; trailing spaces not stripped + label: str # exactly 16 chars; trailing spaces not stripped @dataclass @@ -160,16 +165,18 @@ class PRT3SystemEvent: ``area == 0`` means the event occurred in all enabled areas (global). ``area == 255`` means at least one enabled area (per spec Note 1). """ - group: int # 3-digit event-group code (000-066) - number: int # event-specific identifier: zone, user, door, key, … - area: int # 0 = global / all, 1-8 = specific area, 255 = any + + group: int # 3-digit event-group code (000-066) + number: int # event-specific identifier: zone, user, door, key, … + area: int # 0 = global / all, 1-8 = specific area, 255 = any @dataclass class PRT3PgmEvent: """Virtual PGM activation/deactivation event (v1 scope: parsed, not acted on).""" - pgm: int # 1-30 - on: bool # True if PGMxxON (activated), False if PGMxxOFF (deactivated) + + pgm: int # 1-30 + on: bool # True if PGMxxON (activated), False if PGMxxOFF (deactivated) # Union type exported for type annotations in callers @@ -189,19 +196,19 @@ class PRT3PgmEvent: # --------------------------------------------------------------------------- _RE_SYSTEM_EVENT = re.compile(r"^G(\d{3})N(\d{3})A(\d{3})$") -_RE_PGM_ON = re.compile(r"^PGM(\d{2})ON$") -_RE_PGM_OFF = re.compile(r"^PGM(\d{2})OFF$") +_RE_PGM_ON = re.compile(r"^PGM(\d{2})ON$") +_RE_PGM_OFF = re.compile(r"^PGM(\d{2})OFF$") # Lengths of fully-formed info replies (after \r stripped) -_AREA_STATUS_LEN = 12 # RA + 3-digit area + 7 flags -_ZONE_STATUS_LEN = 10 # RZ + 3-digit zone + 5 flags -_LABEL_LEN = 21 # 2-char type + 3-digit index + 16-char label +_AREA_STATUS_LEN = 12 # RA + 3-digit area + 7 flags +_ZONE_STATUS_LEN = 10 # RZ + 3-digit zone + 5 flags +_LABEL_LEN = 21 # 2-char type + 3-digit index + 16-char label # Lengths of command echoes (after \r stripped) -_ECHO_OK_LEN = 8 # 5-char prefix + "&OK" -_ECHO_FAIL_LEN = 10 # 5-char prefix + "&fail" +_ECHO_OK_LEN = 8 # 5-char prefix + "&OK" +_ECHO_FAIL_LEN = 10 # 5-char prefix + "&fail" -_LABEL_PREFIXES = {"ZL": "zone", "AL": "area", "UL": "user"} +_LABEL_PREFIXES = {"ZL": "zone", "AL": "area", "UL": "user"} # --------------------------------------------------------------------------- @@ -268,7 +275,7 @@ def parse_line(line: str) -> Optional[PRT3Message]: # Note: some panel firmware sends lowercase "&ok"; accept both. if len(line) == _ECHO_OK_LEN and line.upper().endswith("&OK"): return PRT3CommandEcho(cmd=line[:5], ok=True) - if len(line) == _ECHO_FAIL_LEN and line.endswith("&fail"): + if len(line) == _ECHO_FAIL_LEN and line.upper().endswith("&FAIL"): return PRT3CommandEcho(cmd=line[:5], ok=False) logger.warning("PRT3 parser: unrecognised line %r", line) @@ -300,13 +307,14 @@ def _parse_info_reply(line: str) -> Optional[PRT3Message]: def _parse_area_status(line: str) -> Optional[PRT3AreaStatus]: """Parse ``RA{nnn}{7 flags}`` → ``PRT3AreaStatus`` or ``None`` on bad flags.""" - area = int(line[2:5]) - arm_char = line[5] + area = int(line[2:5]) + arm_char = line[5] arm_state = _ARM_STATE_MAP.get(arm_char) if arm_state is None: logger.warning( "PRT3 parser: unknown arm-state char %r in area-status line %r", - arm_char, line, + arm_char, + line, ) return None return PRT3AreaStatus( @@ -323,13 +331,14 @@ def _parse_area_status(line: str) -> Optional[PRT3AreaStatus]: def _parse_zone_status(line: str) -> Optional[PRT3ZoneStatus]: """Parse ``RZ{nnn}{5 flags}`` → ``PRT3ZoneStatus`` or ``None`` on bad flags.""" - zone = int(line[2:5]) - open_char = line[5] + zone = int(line[2:5]) + open_char = line[5] open_state = _ZONE_OPEN_MAP.get(open_char) if open_state is None: logger.warning( "PRT3 parser: unknown zone-open char %r in zone-status line %r", - open_char, line, + open_char, + line, ) return None return PRT3ZoneStatus( @@ -345,6 +354,6 @@ def _parse_zone_status(line: str) -> Optional[PRT3ZoneStatus]: def _parse_label(line: str) -> PRT3LabelReply: """Parse ``ZL/AL/UL{nnn}{16-char label}`` → ``PRT3LabelReply``.""" element_type = _LABEL_PREFIXES[line[:2]] - index = int(line[2:5]) - label = line[5:] # always 16 chars (spec-mandated, spaces padded) + index = int(line[2:5]) + label = line[5:] # always 16 chars (spec-mandated, spaces padded) return PRT3LabelReply(element_type=element_type, index=index, label=label) diff --git a/paradox/paradox.py b/paradox/paradox.py index fbe1177f..b9ab6daf 100644 --- a/paradox/paradox.py +++ b/paradox/paradox.py @@ -57,7 +57,7 @@ def __init__(self, retries=3): self._partition_arm_freeze_until: dict = {} ps.subscribe(self._on_labels_load, "labels_loaded") - ps.subscribe(self._on_definitions_load, "definitons_loaded") + ps.subscribe(self._on_definitions_load, "definitions_loaded") ps.subscribe(self._on_status_update, "status_update") ps.subscribe(self._on_event, "events") ps.subscribe(self._on_property_change, "changes") @@ -146,7 +146,9 @@ async def _prt3_connect(self) -> bool: self.panel = PRT3Panel(self) try: if not await self.panel.initialize_communication(None): - raise ConnectionError("PRT3 serial link unresponsive — no messages received") + raise ConnectionError( + "PRT3 serial link unresponsive — no messages received" + ) # PRT3 has no binary identification exchange; synthesise a # DetectedPanel from the configured port so HA discovery has a # stable device identity to anchor entity unique_ids to. @@ -810,14 +812,16 @@ def handle_error_message(self, message): def handle_prt3_event_message(self, message): """Dispatch an async PRT3 system event into PAI's event pipeline.""" - from paradox.hardware.prt3.parser import PRT3SystemEvent from paradox.hardware.prt3.event import PRT3Event + from paradox.hardware.prt3.parser import PRT3SystemEvent if not isinstance(message, PRT3SystemEvent): return logger.info( "PRT3 system event: G%03dN%03dA%03d", - message.group, message.number, message.area, + message.group, + message.number, + message.area, ) try: evt = PRT3Event.from_prt3(message, label_provider=self.get_label) @@ -843,9 +847,10 @@ def _apply_prt3_event_change(self, evt): for pid in self.storage.get_container("partition").keys(): self.storage.update_container_object("partition", pid, evt.change) else: - element = self.storage.get_container_object(evt.type, evt.id) - if element: - self.storage.update_container_object(evt.type, evt.id, evt.change) + # MemoryStorage.update_container_object auto-creates the element if + # missing — do not guard with get_container_object, or fire-alarm + # events for unlabeled zones beyond PRT3_MAX_ZONES are dropped. + self.storage.update_container_object(evt.type, evt.id, evt.change) async def disconnect(self): logger.info("Disconnecting from the Alarm Panel") diff --git a/tests/connection/prt3/test_protocol.py b/tests/connection/prt3/test_protocol.py index 52714dfe..0583ed7f 100644 --- a/tests/connection/prt3/test_protocol.py +++ b/tests/connection/prt3/test_protocol.py @@ -188,3 +188,59 @@ def test_send_message_delegates_to_transport(): transport.write.assert_called_once_with(b"AQ001A\r") finally: loop.close() + + +@pytest.mark.parametrize( + "cmd_bytes", + [ + b"AA0011234\r", # arm partition 1 with user code 1234 + b"AD0011234\r", # disarm partition 1 with user code 1234 + ], +) +def test_send_message_redacts_user_code_in_dump(monkeypatch, caplog, cmd_bytes): + """AA/AD payloads contain the user code; the dump must not log it.""" + from paradox.config import config as cfg + + monkeypatch.setattr(cfg, "LOGGING_DUMP_PACKETS", True) + + proto, _ = _make_proto() + transport = MagicMock() + proto.transport = transport + import asyncio + + loop = asyncio.new_event_loop() + try: + proto._closed = loop.create_future() + with caplog.at_level("DEBUG", logger="PAI.paradox.connections.prt3.protocol"): + proto.send_message(cmd_bytes) + + joined = " ".join(r.getMessage() for r in caplog.records) + assert "1234" not in joined + assert "31323334" not in joined # hex(b"1234") + assert "" in joined + transport.write.assert_called_once_with(cmd_bytes) + finally: + loop.close() + + +def test_send_message_does_not_redact_non_code_commands(monkeypatch, caplog): + """Commands without user codes (AQ/PE/RA/...) dump in full as before.""" + from paradox.config import config as cfg + + monkeypatch.setattr(cfg, "LOGGING_DUMP_PACKETS", True) + + proto, _ = _make_proto() + transport = MagicMock() + proto.transport = transport + import asyncio + + loop = asyncio.new_event_loop() + try: + proto._closed = loop.create_future() + with caplog.at_level("DEBUG", logger="PAI.paradox.connections.prt3.protocol"): + proto.send_message(b"AQ001A\r") + + joined = " ".join(r.getMessage() for r in caplog.records) + assert "" not in joined + finally: + loop.close() diff --git a/tests/hardware/prt3/test_command_dispatch.py b/tests/hardware/prt3/test_command_dispatch.py index 9f5ec0fa..2cb8a296 100644 --- a/tests/hardware/prt3/test_command_dispatch.py +++ b/tests/hardware/prt3/test_command_dispatch.py @@ -15,7 +15,6 @@ from paradox.data.enums import RunState - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -40,6 +39,7 @@ def _make_paradox(monkeypatch, connection_type="PRT3"): paradox._partition_arm_freeze_until = {} from paradox.data.memory_storage import MemoryStorage + paradox.storage = MemoryStorage() mock_panel = MagicMock() @@ -60,21 +60,17 @@ def _make_paradox(monkeypatch, connection_type="PRT3"): async def test_control_partition_calls_panel(monkeypatch): """control_partition resolves partition from storage and calls panel.""" - from paradox.paradox import Paradox - paradox, mock_panel = _make_paradox(monkeypatch) # Populate storage with a named partition - paradox.storage.get_container("partition")[1] = { - "id": 1, "key": 1, "label": "Home" - } + paradox.storage.get_container("partition")[1] = {"id": 1, "key": 1, "label": "Home"} result = await paradox.control_partition("1", "arm") assert result is True mock_panel.control_partitions.assert_awaited_once() call_args = mock_panel.control_partitions.call_args - assert call_args[0][1] == "arm" # command arg + assert call_args[0][1] == "arm" # command arg paradox.request_status_refresh.assert_called_once() @@ -94,9 +90,7 @@ async def test_control_partition_panel_refuses(monkeypatch): paradox, mock_panel = _make_paradox(monkeypatch) mock_panel.control_partitions = AsyncMock(return_value=False) - paradox.storage.get_container("partition")[1] = { - "id": 1, "key": 1, "label": "Home" - } + paradox.storage.get_container("partition")[1] = {"id": 1, "key": 1, "label": "Home"} result = await paradox.control_partition("1", "disarm") @@ -109,9 +103,7 @@ async def test_control_partition_not_implemented(monkeypatch): paradox, mock_panel = _make_paradox(monkeypatch) mock_panel.control_partitions = AsyncMock(side_effect=NotImplementedError) - paradox.storage.get_container("partition")[1] = { - "id": 1, "key": 1, "label": "Home" - } + paradox.storage.get_container("partition")[1] = {"id": 1, "key": 1, "label": "Home"} result = await paradox.control_partition("1", "arm") assert result is False @@ -183,7 +175,9 @@ async def test_utility_key_transport_success_is_true(monkeypatch): async def test_utility_key_transport_timeout_is_false(monkeypatch): """Timeout (panel never echoed) → False (transport failure).""" paradox, mock_panel = _make_paradox(monkeypatch, connection_type="PRT3") - mock_panel.send_utility_key = AsyncMock(return_value=False) # panel returns False on timeout + mock_panel.send_utility_key = AsyncMock( + return_value=False + ) # panel returns False on timeout assert await paradox.control_utility_key(1) is False @@ -205,10 +199,18 @@ def _make_paradox_with_partitions(monkeypatch): """Paradox instance pre-populated with two partitions in storage.""" paradox, _ = _make_paradox(monkeypatch) paradox.storage.get_container("partition")[1] = { - "id": 1, "key": "home", "label": "Home", "arm": True, "exit_delay": True, + "id": 1, + "key": "home", + "label": "Home", + "arm": True, + "exit_delay": True, } paradox.storage.get_container("partition")[2] = { - "id": 2, "key": "downstairs", "label": "Downstairs", "arm": True, "exit_delay": True, + "id": 2, + "key": "downstairs", + "label": "Downstairs", + "arm": True, + "exit_delay": True, } return paradox @@ -232,8 +234,12 @@ def test_global_disarm_event_clears_exit_delay_on_all_partitions(monkeypatch): p1 = paradox.storage.get_container_object("partition", 1) p2 = paradox.storage.get_container_object("partition", 2) - assert p1["exit_delay"] is False, "partition 1 exit_delay must be cleared by global disarm" - assert p2["exit_delay"] is False, "partition 2 exit_delay must be cleared by global disarm" + assert ( + p1["exit_delay"] is False + ), "partition 1 exit_delay must be cleared by global disarm" + assert ( + p2["exit_delay"] is False + ), "partition 2 exit_delay must be cleared by global disarm" assert p1["arm"] is False, "partition 1 arm must be cleared by global disarm" assert p2["arm"] is False, "partition 2 arm must be cleared by global disarm" @@ -266,8 +272,32 @@ def test_specific_area_disarm_only_updates_that_partition(monkeypatch): p1 = paradox.storage.get_container_object("partition", 1) p2 = paradox.storage.get_container_object("partition", 2) - assert p1["exit_delay"] is True, "partition 1 must NOT be touched by area=2 event" - assert p2["exit_delay"] is False, "partition 2 exit_delay must be cleared by area=2 disarm" + assert p1["exit_delay"] is True, "partition 1 must NOT be touched by area=2 event" + assert ( + p2["exit_delay"] is False + ), "partition 2 exit_delay must be cleared by area=2 disarm" + + +def test_event_for_unmapped_zone_is_not_dropped(monkeypatch): + """A zone-alarm event for a zone beyond PRT3_MAX_ZONES must still apply. + + Regression: a manual existence guard prior to update_container_object + silently dropped events (including fire alarms) for zones that had no + label loaded (e.g. zone 100 when PRT3_MAX_ZONES was 96). MemoryStorage + auto-creates the element, so the guard defeated that protection. + """ + from paradox.hardware.prt3.parser import PRT3SystemEvent + + with patch("paradox.paradox.ps.sendChange"), patch("paradox.paradox.ps.sendEvent"): + paradox, _ = _make_paradox(monkeypatch) + # No zones pre-populated; storage container is empty. + + # G024 = "Zone in alarm" per PRT3 spec; zone 100 is unlabeled. + msg = PRT3SystemEvent(group=24, number=100, area=1) + paradox.handle_prt3_event_message(msg) + + zone = paradox.storage.get_container_object("zone", 100) + assert zone is not None, "zone 100 must be auto-created by the event" # --------------------------------------------------------------------------- @@ -287,13 +317,22 @@ async def test_disarm_optimistically_clears_arm_and_exit_delay(monkeypatch): """ paradox, _ = _make_paradox(monkeypatch, connection_type="PRT3") paradox.storage.get_container("partition")[2] = { - "id": 2, "key": "downstairs", "label": "Downstairs", - "arm": True, "arm_stay": True, "arm_away": False, "arm_force": False, - "exit_delay": True, "entry_delay": False, + "id": 2, + "key": "downstairs", + "label": "Downstairs", + "arm": True, + "arm_stay": True, + "arm_away": False, + "arm_force": False, + "exit_delay": True, + "entry_delay": False, } - with patch("paradox.paradox.ps.sendChange"), patch("paradox.paradox.ps.sendEvent"), \ - patch("paradox.paradox.ps.sendMessage"), patch("paradox.paradox.ps.sendNotification"): + with patch("paradox.paradox.ps.sendChange"), patch( + "paradox.paradox.ps.sendEvent" + ), patch("paradox.paradox.ps.sendMessage"), patch( + "paradox.paradox.ps.sendNotification" + ): result = await paradox.control_partition("2", "disarm") assert result is True @@ -314,12 +353,19 @@ async def test_disarm_freezes_arm_against_stale_ra_poll(monkeypatch): """ paradox, _ = _make_paradox(monkeypatch, connection_type="PRT3") paradox.storage.get_container("partition")[2] = { - "id": 2, "key": "downstairs", "label": "Downstairs", - "arm": True, "arm_stay": True, "exit_delay": True, + "id": 2, + "key": "downstairs", + "label": "Downstairs", + "arm": True, + "arm_stay": True, + "exit_delay": True, } - with patch("paradox.paradox.ps.sendChange"), patch("paradox.paradox.ps.sendEvent"), \ - patch("paradox.paradox.ps.sendMessage"), patch("paradox.paradox.ps.sendNotification"): + with patch("paradox.paradox.ps.sendChange"), patch( + "paradox.paradox.ps.sendEvent" + ), patch("paradox.paradox.ps.sendMessage"), patch( + "paradox.paradox.ps.sendNotification" + ): await paradox.control_partition("2", "disarm") # Simulate an RA poll arriving during the freeze window — should be a no-op @@ -341,15 +387,24 @@ async def test_arm_does_not_optimistically_change_state(monkeypatch): """control_partition('arm_stay') leaves storage alone — G065N001 + RA set state.""" paradox, _ = _make_paradox(monkeypatch, connection_type="PRT3") paradox.storage.get_container("partition")[2] = { - "id": 2, "key": "downstairs", "label": "Downstairs", - "arm": False, "arm_stay": False, "exit_delay": False, + "id": 2, + "key": "downstairs", + "label": "Downstairs", + "arm": False, + "arm_stay": False, + "exit_delay": False, } - with patch("paradox.paradox.ps.sendChange"), patch("paradox.paradox.ps.sendEvent"), \ - patch("paradox.paradox.ps.sendMessage"), patch("paradox.paradox.ps.sendNotification"): + with patch("paradox.paradox.ps.sendChange"), patch( + "paradox.paradox.ps.sendEvent" + ), patch("paradox.paradox.ps.sendMessage"), patch( + "paradox.paradox.ps.sendNotification" + ): result = await paradox.control_partition("2", "arm_stay") assert result is True p2 = paradox.storage.get_container_object("partition", 2) assert p2["arm"] is False, "arm should be set by G065N001/RA, not optimistically" - assert p2["exit_delay"] is False, "exit_delay should be set by G065N001, not optimistically" + assert ( + p2["exit_delay"] is False + ), "exit_delay should be set by G065N001, not optimistically" diff --git a/tests/hardware/prt3/test_panel.py b/tests/hardware/prt3/test_panel.py index 5abcd29a..3529885f 100644 --- a/tests/hardware/prt3/test_panel.py +++ b/tests/hardware/prt3/test_panel.py @@ -20,18 +20,16 @@ from paradox.hardware.prt3.parser import ( ARM_AWAY, ARM_DISARMED, - ARM_STAY, + ZONE_CLOSED, + ZONE_OPEN, PRT3AreaStatus, PRT3BufferFull, PRT3CommandEcho, PRT3CommStatus, PRT3LabelReply, PRT3ZoneStatus, - ZONE_CLOSED, - ZONE_OPEN, ) - # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @@ -88,31 +86,23 @@ def test_parse_message_strips_cr(panel): async def test_initialize_communication_ok(core, panel): - core.connection.wait_for_message = AsyncMock( - return_value=PRT3CommStatus(ok=True) - ) + core.connection.wait_for_message = AsyncMock(return_value=PRT3CommStatus(ok=True)) assert await panel.initialize_communication(None) is True async def test_initialize_communication_fail(core, panel): - core.connection.wait_for_message = AsyncMock( - return_value=PRT3CommStatus(ok=False) - ) + core.connection.wait_for_message = AsyncMock(return_value=PRT3CommStatus(ok=False)) assert await panel.initialize_communication(None) is False async def test_initialize_communication_timeout(core, panel): - core.connection.wait_for_message = AsyncMock( - side_effect=asyncio.TimeoutError - ) + core.connection.wait_for_message = AsyncMock(side_effect=asyncio.TimeoutError) assert await panel.initialize_communication(None) is False async def test_initialize_communication_ignores_password_arg(core, panel): """PRT3 has no password; argument must be accepted but ignored.""" - core.connection.wait_for_message = AsyncMock( - return_value=PRT3CommStatus(ok=True) - ) + core.connection.wait_for_message = AsyncMock(return_value=PRT3CommStatus(ok=True)) assert await panel.initialize_communication("secret") is True @@ -265,7 +255,9 @@ async def test_control_partitions_arm_stay(core, panel, monkeypatch): assert cmd[5:6] == b"S" -async def test_control_partitions_disarm_no_code_returns_false(core, panel, monkeypatch): +async def test_control_partitions_disarm_no_code_returns_false( + core, panel, monkeypatch +): """Disarm without PRT3_USER_CODE configured must return False.""" from paradox.config import config as cfg @@ -349,11 +341,14 @@ async def test_control_outputs_raises_not_implemented(panel): # --------------------------------------------------------------------------- -@pytest.mark.parametrize("panic_type,prefix", [ - ("emergency", b"PE"), - ("medical", b"PM"), - ("fire", b"PF"), -]) +@pytest.mark.parametrize( + "panic_type,prefix", + [ + ("emergency", b"PE"), + ("medical", b"PM"), + ("fire", b"PF"), + ], +) async def test_send_panic_accepted(core, panel, panic_type, prefix): echo_cmd = f"{prefix.decode('ascii')}001" core.connection.wait_for_message = AsyncMock( @@ -382,6 +377,37 @@ async def test_send_panic_unknown_type(panel): assert await panel.send_panic([1], "unknown_panic_type", None) is False +async def test_send_panic_no_retry_on_timeout(core, panel): + """Panic must NOT retry on timeout — commands are not idempotent. + + A retry after a lost echo would re-trigger the panic; this would be + audibly loud and could escalate to monitoring station. + """ + core.connection.wait_for_message = AsyncMock( + side_effect=[asyncio.TimeoutError, PRT3CommandEcho(cmd="PE001", ok=True)] + ) + result = await panel.send_panic([1], "emergency", None) + assert result is False + assert core.connection.write.call_count == 1 + + +async def test_control_partitions_no_retry_on_timeout(core, panel, monkeypatch): + """Arm/disarm must NOT retry on timeout — commands are not idempotent. + + A retry after a lost echo would double-fire AA/AQ/AD and could move the + partition through arm states a second time. + """ + from paradox.config import config as cfg + + monkeypatch.setattr(cfg, "PRT3_USER_CODE", "") + core.connection.wait_for_message = AsyncMock( + side_effect=[asyncio.TimeoutError, PRT3CommandEcho(cmd="AQ001", ok=True)] + ) + result = await panel.control_partitions([1], "arm") + assert result is False + assert core.connection.write.call_count == 1 + + # --------------------------------------------------------------------------- # load_labels() # --------------------------------------------------------------------------- @@ -436,9 +462,7 @@ async def test_load_labels_skips_fail_echo(core, panel, monkeypatch): async def test_send_wait_succeeds_on_first_attempt(core, panel): """No retry needed when the first attempt returns a reply.""" - core.connection.wait_for_message = AsyncMock( - return_value=PRT3CommStatus(ok=True) - ) + core.connection.wait_for_message = AsyncMock(return_value=PRT3CommStatus(ok=True)) result = await panel._prt3_send_wait(b"COMM\r", lambda m: True, retries=2) assert result is not None assert core.connection.write.call_count == 1 @@ -456,9 +480,7 @@ async def test_send_wait_retries_on_timeout(core, panel): async def test_send_wait_returns_none_when_all_retries_exhausted(core, panel): """Returns None only when every attempt times out.""" - core.connection.wait_for_message = AsyncMock( - side_effect=asyncio.TimeoutError - ) + core.connection.wait_for_message = AsyncMock(side_effect=asyncio.TimeoutError) result = await panel._prt3_send_wait(b"COMM\r", lambda m: True, retries=3) assert result is None assert core.connection.write.call_count == 3 @@ -506,7 +528,9 @@ async def test_send_wait_buffer_full_retries_then_succeeds(core, panel): assert core.connection.write.call_count == 2 -async def test_control_partitions_malformed_code_returns_false(core, panel, monkeypatch): +async def test_control_partitions_malformed_code_returns_false( + core, panel, monkeypatch +): """Malformed PRT3_USER_CODE returns False without propagating ValueError.""" from paradox.config import config as cfg @@ -540,9 +564,7 @@ async def test_send_utility_key_rejected_by_panel(core, panel): async def test_send_utility_key_timeout_returns_false(core, panel): - core.connection.wait_for_message = AsyncMock( - side_effect=asyncio.TimeoutError - ) + core.connection.wait_for_message = AsyncMock(side_effect=asyncio.TimeoutError) assert await panel.send_utility_key(1) is False diff --git a/tests/hardware/prt3/test_parser.py b/tests/hardware/prt3/test_parser.py index 157b1ae9..31bac16c 100644 --- a/tests/hardware/prt3/test_parser.py +++ b/tests/hardware/prt3/test_parser.py @@ -22,6 +22,10 @@ ARM_FORCE, ARM_INSTANT, ARM_STAY, + ZONE_CLOSED, + ZONE_FIRE_LOOP_TROUBLE, + ZONE_OPEN, + ZONE_TAMPERED, PRT3AreaStatus, PRT3BufferFull, PRT3CommandEcho, @@ -30,14 +34,10 @@ PRT3PgmEvent, PRT3SystemEvent, PRT3ZoneStatus, - ZONE_CLOSED, - ZONE_FIRE_LOOP_TROUBLE, - ZONE_OPEN, - ZONE_TAMPERED, parse_line, ) -from tests.hardware.prt3 import fixtures as fx +from tests.hardware.prt3 import fixtures as fx # =========================================================================== # COMM status @@ -90,15 +90,15 @@ def test_buffer_full_from_fixture(): @pytest.mark.parametrize( "line, expected_cmd", [ - ("AA001&OK", "AA001"), # arm - ("AQ001&OK", "AQ001"), # quick arm - ("AD001&OK", "AD001"), # disarm - ("PE001&OK", "PE001"), # emergency panic - ("PM001&OK", "PM001"), # medical panic - ("PF001&OK", "PF001"), # fire panic - ("UK001&OK", "UK001"), # utility key 1 - ("UK251&OK", "UK251"), # utility key max - ("AA008&OK", "AA008"), # arm area 8 + ("AA001&OK", "AA001"), # arm + ("AQ001&OK", "AQ001"), # quick arm + ("AD001&OK", "AD001"), # disarm + ("PE001&OK", "PE001"), # emergency panic + ("PM001&OK", "PM001"), # medical panic + ("PF001&OK", "PF001"), # fire panic + ("UK001&OK", "UK001"), # utility key 1 + ("UK251&OK", "UK251"), # utility key max + ("AA008&OK", "AA008"), # arm area 8 ], ) def test_echo_ok(line, expected_cmd): @@ -129,9 +129,9 @@ def test_echo_fail(line, expected_cmd): @pytest.mark.parametrize( "line, expected_cmd", [ - ("AA001&ok", "AA001"), # arm — lowercase firmware variant - ("UK001&ok", "UK001"), # utility key — observed on live Paradox panel - ("AD001&ok", "AD001"), # disarm + ("AA001&ok", "AA001"), # arm — lowercase firmware variant + ("UK001&ok", "UK001"), # utility key — observed on live Paradox panel + ("AD001&ok", "AD001"), # disarm ], ) def test_echo_ok_lowercase(line, expected_cmd): @@ -142,26 +142,56 @@ def test_echo_ok_lowercase(line, expected_cmd): assert result.cmd == expected_cmd +@pytest.mark.parametrize( + "line, expected_cmd", + [ + ("AA001&FAIL", "AA001"), + ("AA001&Fail", "AA001"), + ("AD001&FAIL", "AD001"), + ], +) +def test_echo_fail_case_insensitive(line, expected_cmd): + """Accept &fail in any case for symmetry with &OK/&ok handling.""" + result = parse_line(line) + assert isinstance(result, PRT3CommandEcho) + assert result.ok is False + assert result.cmd == expected_cmd + + def test_echo_cmd_is_exactly_5_chars(): result = parse_line("AA001&OK") assert len(result.cmd) == 5 -@pytest.mark.parametrize("line", [ - fx.ECHO_ARM_OK, fx.ECHO_QUICK_ARM_OK, fx.ECHO_DISARM_OK, - fx.ECHO_PANIC_EMERG_OK, fx.ECHO_PANIC_MED_OK, fx.ECHO_PANIC_FIRE_OK, - fx.ECHO_UTILITY_KEY_OK, - fx.ECHO_ARM_OK_LOWER, fx.ECHO_UTILITY_KEY_OK_LOWER, -]) +@pytest.mark.parametrize( + "line", + [ + fx.ECHO_ARM_OK, + fx.ECHO_QUICK_ARM_OK, + fx.ECHO_DISARM_OK, + fx.ECHO_PANIC_EMERG_OK, + fx.ECHO_PANIC_MED_OK, + fx.ECHO_PANIC_FIRE_OK, + fx.ECHO_UTILITY_KEY_OK, + fx.ECHO_ARM_OK_LOWER, + fx.ECHO_UTILITY_KEY_OK_LOWER, + ], +) def test_echo_ok_from_fixtures(line): result = parse_line(line) assert isinstance(result, PRT3CommandEcho) assert result.ok is True -@pytest.mark.parametrize("line", [ - fx.ECHO_ARM_FAIL, fx.ECHO_DISARM_FAIL, fx.ECHO_STATUS_FAIL, fx.ECHO_LABEL_FAIL, -]) +@pytest.mark.parametrize( + "line", + [ + fx.ECHO_ARM_FAIL, + fx.ECHO_DISARM_FAIL, + fx.ECHO_STATUS_FAIL, + fx.ECHO_LABEL_FAIL, + ], +) def test_echo_fail_from_fixtures(line): result = parse_line(line) assert isinstance(result, PRT3CommandEcho) @@ -302,7 +332,7 @@ def test_zone_in_memory_from_fixture(self): assert r.area == 8 def test_area_number_max(self): - r = parse_line(fx.AREA_MAX_NUMBER) # "RA008DOOOOOO" + r = parse_line(fx.AREA_MAX_NUMBER) # "RA008DOOOOOO" assert r.area == 8 @pytest.mark.parametrize("area_num", [1, 4, 8]) @@ -317,7 +347,7 @@ def test_unknown_arm_char_returns_none(self): assert parse_line("RA001XOOOOOO") is None def test_wrong_length_short_returns_none(self): - assert parse_line("RA001DOOOOO") is None # 11 chars + assert parse_line("RA001DOOOOO") is None # 11 chars def test_wrong_length_long_returns_none(self): assert parse_line("RA001DOOOOOOO") is None # 13 chars @@ -332,7 +362,7 @@ class TestZoneStatus: """RZ{nnn}{5 flags} replies — 10 chars total.""" def test_closed_all_clear(self): - r = parse_line(fx.ZONE_CLOSED_OK) # "RZ001COOOO" + r = parse_line(fx.ZONE_CLOSED_OK) # "RZ001COOOO" assert isinstance(r, PRT3ZoneStatus) assert r.zone == 1 assert r.open_state == ZONE_CLOSED @@ -356,7 +386,7 @@ def test_open_states(self, line, expected_state): assert r.open_state == expected_state def test_alarm_flag(self): - r = parse_line(fx.ZONE_ALARM) # "RZ005OAOOO" + r = parse_line(fx.ZONE_ALARM) # "RZ005OAOOO" assert r.alarm is True assert r.fire_alarm is False @@ -376,7 +406,7 @@ def test_low_battery_flag(self): assert r.supervision_trouble is False def test_all_flags_active(self): - r = parse_line(fx.ZONE_ALL_FLAGS) # "RZ009OAFSL" + r = parse_line(fx.ZONE_ALL_FLAGS) # "RZ009OAFSL" assert r.zone == 9 assert r.open_state == ZONE_OPEN assert r.alarm is True @@ -399,7 +429,7 @@ def test_unknown_open_state_returns_none(self): assert parse_line("RZ001XOOOO") is None def test_wrong_length_short_returns_none(self): - assert parse_line("RZ001COOO") is None # 9 chars + assert parse_line("RZ001COOO") is None # 9 chars def test_wrong_length_long_returns_none(self): assert parse_line("RZ001COOOOO") is None # 11 chars @@ -438,7 +468,7 @@ def test_user_label(self): def test_label_trailing_spaces_preserved(self): # The full 16-char label including trailing spaces must not be stripped r = parse_line(fx.AREA_LABEL_HOME) - assert r.label == "Home " # 4 chars + 12 spaces + assert r.label == "Home " # 4 chars + 12 spaces def test_zone_label_max_index(self): r = parse_line(fx.ZONE_LABEL_MAX_ZONE) @@ -454,8 +484,10 @@ def test_user_label_max_index(self): def test_label_is_always_16_chars(self): for line in [ - fx.ZONE_LABEL_FRONT_DOOR, fx.ZONE_LABEL_BACK_DOOR, - fx.AREA_LABEL_HOME, fx.USER_LABEL_MASTER, + fx.ZONE_LABEL_FRONT_DOOR, + fx.ZONE_LABEL_BACK_DOOR, + fx.AREA_LABEL_HOME, + fx.USER_LABEL_MASTER, ]: r = parse_line(line) assert len(r.label) == 16, f"label length for {line!r}: {len(r.label)}" @@ -478,14 +510,14 @@ class TestSystemEvent: """G{ggg}N{nnn}A{aaa} events — 12 chars.""" def test_zone_open_event(self): - r = parse_line(fx.EVENT_ZONE_OPEN) # G001N005A006 + r = parse_line(fx.EVENT_ZONE_OPEN) # G001N005A006 assert isinstance(r, PRT3SystemEvent) assert r.group == 1 assert r.number == 5 assert r.area == 6 def test_zone_ok_event(self): - r = parse_line(fx.EVENT_ZONE_OK) # G000N005A006 + r = parse_line(fx.EVENT_ZONE_OK) # G000N005A006 assert r.group == 0 assert r.number == 5 assert r.area == 6 @@ -501,7 +533,7 @@ def test_area_255_any_area(self): assert r.area == 255 def test_all_zero_event(self): - r = parse_line(fx.EVENT_ALL_ZERO) # G000N000A000 + r = parse_line(fx.EVENT_ALL_ZERO) # G000N000A000 assert isinstance(r, PRT3SystemEvent) assert r.group == 0 assert r.number == 0 @@ -513,16 +545,19 @@ def test_max_values(self): assert r.number == 999 assert r.area == 255 - @pytest.mark.parametrize("line, group, number, area", [ - ("G001N005A006", 1, 5, 6), - ("G010N001A001", 10, 1, 1), - ("G014N002A002", 14, 2, 2), - ("G024N003A001", 24, 3, 1), - ("G048N001A000", 48, 1, 0), - ("G064N000A001", 64, 0, 1), - ("G002N012A002", 2, 12, 2), - ("G025N007A001", 25, 7, 1), - ]) + @pytest.mark.parametrize( + "line, group, number, area", + [ + ("G001N005A006", 1, 5, 6), + ("G010N001A001", 10, 1, 1), + ("G014N002A002", 14, 2, 2), + ("G024N003A001", 24, 3, 1), + ("G048N001A000", 48, 1, 0), + ("G064N000A001", 64, 0, 1), + ("G002N012A002", 2, 12, 2), + ("G025N007A001", 25, 7, 1), + ], + ) def test_event_fields_parametrized(self, line, group, number, area): r = parse_line(line) assert isinstance(r, PRT3SystemEvent) @@ -531,13 +566,13 @@ def test_event_fields_parametrized(self, line, group, number, area): assert r.area == area def test_wrong_format_too_short_group(self): - assert parse_line("G01N005A006") is None # group only 2 digits + assert parse_line("G01N005A006") is None # group only 2 digits def test_wrong_format_too_short_number(self): - assert parse_line("G001N5A006") is None # number only 1 digit + assert parse_line("G001N5A006") is None # number only 1 digit def test_wrong_format_too_short_area(self): - assert parse_line("G001N005A06") is None # area only 2 digits + assert parse_line("G001N005A06") is None # area only 2 digits def test_wrong_format_too_long_area(self): assert parse_line("G001N005A0060") is None # area 4 digits @@ -552,7 +587,7 @@ class TestPgmEvent: """PGM{nn}ON/OFF events — v1 scope: parsed but not acted on.""" def test_pgm_on(self): - r = parse_line(fx.PGM_01_ON) # "PGM01ON" + r = parse_line(fx.PGM_01_ON) # "PGM01ON" assert isinstance(r, PRT3PgmEvent) assert r.pgm == 1 assert r.on is True @@ -564,7 +599,7 @@ def test_pgm_off(self): assert r.on is False def test_pgm_max_on(self): - r = parse_line(fx.PGM_30_ON) # "PGM30ON" + r = parse_line(fx.PGM_30_ON) # "PGM30ON" assert r.pgm == 30 assert r.on is True @@ -575,11 +610,11 @@ def test_pgm_max_off(self): @pytest.mark.parametrize("pgm_num", [1, 15, 30]) def test_pgm_number_range(self, pgm_num): - on_line = f"PGM{pgm_num:02d}ON" + on_line = f"PGM{pgm_num:02d}ON" off_line = f"PGM{pgm_num:02d}OFF" - r_on = parse_line(on_line) + r_on = parse_line(on_line) r_off = parse_line(off_line) - assert isinstance(r_on, PRT3PgmEvent) and r_on.pgm == pgm_num + assert isinstance(r_on, PRT3PgmEvent) and r_on.pgm == pgm_num assert isinstance(r_off, PRT3PgmEvent) and r_off.pgm == pgm_num def test_pgm_single_digit_returns_none(self): @@ -611,16 +646,16 @@ def test_comm_uppercase_ok_is_unknown(self): assert parse_line("COMM&OK") is None def test_area_status_too_short(self): - assert parse_line("RA001DOOOOO") is None # 11 chars + assert parse_line("RA001DOOOOO") is None # 11 chars def test_area_status_too_long(self): assert parse_line("RA001DOOOOOOO") is None # 13 chars def test_zone_status_too_short(self): - assert parse_line("RZ001COOO") is None # 9 chars + assert parse_line("RZ001COOO") is None # 9 chars def test_zone_status_too_long(self): - assert parse_line("RZ001COOOOO") is None # 11 chars + assert parse_line("RZ001COOOOO") is None # 11 chars def test_label_too_short(self): # 20 chars — one below the required 21 @@ -639,7 +674,7 @@ def test_zone_unknown_open_char(self): assert parse_line("RZ001XOOOO") is None def test_echo_ok_wrong_length_short(self): - assert parse_line("AA01&OK") is None # 7 chars (4-char prefix) + assert parse_line("AA01&OK") is None # 7 chars (4-char prefix) def test_echo_ok_wrong_length_long(self): assert parse_line("AA001 &OK") is None # 9 chars (extra space)