diff --git a/s7/__init__.py b/s7/__init__.py index 1cfaa189..5964cca1 100644 --- a/s7/__init__.py +++ b/s7/__init__.py @@ -15,6 +15,7 @@ from .client import Client from .async_client import AsyncClient from .server import Server +from .partner import Partner, PartnerStatus from ._protocol import Protocol from snap7.type import Area, Block, WordLen, SrvEvent, SrvArea @@ -24,6 +25,8 @@ "Client", "AsyncClient", "Server", + "Partner", + "PartnerStatus", "Protocol", "Area", "Block", diff --git a/s7/partner.py b/s7/partner.py new file mode 100644 index 00000000..e20729ba --- /dev/null +++ b/s7/partner.py @@ -0,0 +1,21 @@ +"""Unified S7 partner for peer-to-peer communication. + +Wraps :class:`snap7.partner.Partner` so that the ``s7`` package is a +drop-in replacement for ``snap7``, including partner functionality. + +Usage:: + + from s7 import Partner + + partner = Partner(active=True) + partner.port = 102 + partner.r_id = 0x00000001 + partner.start_to("0.0.0.0", "192.168.1.10", 0x1300, 0x1301) + partner.set_send_data(b"Hello") + partner.b_send() + partner.stop() +""" + +from snap7.partner import Partner, PartnerStatus + +__all__ = ["Partner", "PartnerStatus"] diff --git a/snap7/partner.py b/snap7/partner.py index d73ccb48..ba94cdb8 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -18,11 +18,18 @@ from ctypes import c_int32, c_uint32 from .connection import ISOTCPConnection -from .error import S7Error, S7ConnectionError +from .error import S7Error, S7ConnectionError, S7TimeoutError +from .s7protocol import S7Protocol, S7PDUType from .type import Parameter logger = logging.getLogger(__name__) +# S7 partner/push function group +_PUSH_FUNC_GROUP = 0x06 + +# Partner push subfunctions +_PUSH_SUBFUNCTION_BSEND = 0x01 # bsend data push + class PartnerStatus: """Partner status constants.""" @@ -76,6 +83,13 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._server_socket: Optional[socket.socket] = None # For passive mode self._connection: Optional[ISOTCPConnection] = None + # S7 protocol handler (for setup communication and PDU formatting) + self._protocol = S7Protocol() + self.pdu_length = 480 + + # R-ID for bsend/brecv matching (default 0, can be set by caller) + self.r_id: int = 0 + # Statistics self.bytes_sent = 0 self.bytes_recv = 0 @@ -94,7 +108,9 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._async_send_queue: Queue[Any] = Queue() self._async_recv_queue: Queue[Any] = Queue() self._async_thread: Optional[threading.Thread] = None + self._recv_listener_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() + self._io_lock = threading.Lock() # Last error self.last_error = 0 @@ -104,6 +120,9 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._recv_data: Optional[bytes] = None self._async_send_in_progress = False self._async_send_result = 0 + self._async_recv_in_progress = False + self._async_recv_result = 0 + self._async_recv_started = False logger.info(f"S7 Partner initialized (active={active}, pure Python implementation)") @@ -187,10 +206,14 @@ def stop(self) -> int: 0 on success """ self._stop_event.set() + self._async_recv_in_progress = False if self._async_thread and self._async_thread.is_alive(): self._async_thread.join(timeout=2.0) + if self._recv_listener_thread and self._recv_listener_thread.is_alive(): + self._recv_listener_thread.join(timeout=2.0) + if self._connection: self._connection.disconnect() self._connection = None @@ -273,10 +296,10 @@ def b_recv(self) -> int: try: # Receive partner data data = self._connection.receive_data() - received = self._parse_partner_data_pdu(data) + received, _r_id, pdu_ref = self._parse_partner_data_pdu(data) - # Send acknowledgment - ack = self._build_partner_ack() + # Send acknowledgment with the same PDU reference + ack = self._build_partner_ack(pdu_ref) self._connection.send_data(ack) self.bytes_recv += len(received) @@ -373,19 +396,83 @@ def wait_as_b_send_completion(self, timeout: int = 0) -> int: return self._async_send_result + def as_b_recv(self) -> int: + """ + Start asynchronous receive (non-blocking). + + Begins listening for incoming partner data in the background. + Use :meth:`check_as_b_recv_completion` or + :meth:`wait_as_b_recv_completion` to check for results. + + Returns: + 0 on success (receive initiated), -1 on error + """ + if not self.connected: + self.recv_errors += 1 + return -1 + + if self._async_recv_in_progress: + return -1 + + self._async_recv_in_progress = True + self._async_recv_started = True + self._async_recv_result = 1 # In progress + + if self._recv_listener_thread is None or not self._recv_listener_thread.is_alive(): + self._recv_listener_thread = threading.Thread(target=self._recv_listener, daemon=True) + self._recv_listener_thread.start() + + logger.debug("Async receive initiated") + return 0 + def check_as_b_recv_completion(self) -> int: """ Check if async receive completed. Returns: - 0 if data available, 1 if in progress + 0 if data available, 1 if in progress, -1 on error """ + if self._async_recv_result == -1: + return -1 + try: self._recv_data = self._async_recv_queue.get_nowait() return 0 # Data available except Empty: return 1 # No data yet + def wait_as_b_recv_completion(self, timeout: int = 0) -> int: + """ + Wait for async receive to complete. + + Args: + timeout: Timeout in milliseconds (0 for infinite) + + Returns: + 0 on success, -1 on timeout/error + + Raises: + RuntimeError: If no async receive was ever started + """ + if not self._async_recv_in_progress: + if self._async_recv_started: + # Listener already finished before wait was called + self._async_recv_started = False + return self._async_recv_result + raise RuntimeError("No async receive operation in progress") + + wait_time = timeout / 1000.0 if timeout > 0 else None + start = datetime.now() + + while self._async_recv_in_progress: + if wait_time is not None: + elapsed = (datetime.now() - start).total_seconds() + if elapsed >= wait_time: + return -1 + threading.Event().wait(0.01) + + return self._async_recv_result + def get_status(self) -> c_int32: """ Get partner status. @@ -478,24 +565,35 @@ def set_param(self, parameter: Parameter, value: int) -> int: logger.debug(f"Setting parameter {parameter} to {value}") return 0 - def set_recv_callback(self) -> int: + def set_recv_callback(self, callback: Optional[Callable[[bytes], None]] = None) -> int: """ - Sets the user callback for incoming data. + Register a callback for incoming data. + + The callback is invoked with the received bytes whenever data + arrives via :meth:`b_recv` or async receive. + + Args: + callback: Function called with received data, or ``None`` to clear. Returns: 0 on success """ - logger.debug("set_recv_callback called") + self._recv_callback = callback + logger.debug(f"Receive callback {'set' if callback else 'cleared'}") return 0 - def set_send_callback(self) -> int: + def set_send_callback(self, callback: Optional[Callable[[int], None]] = None) -> int: """ - Sets the user callback for completed async sends. + Register a callback for completed async sends. + + Args: + callback: Function called with the result code, or ``None`` to clear. Returns: 0 on success """ - logger.debug("set_send_callback called") + self._send_callback_fn = callback + logger.debug(f"Send callback {'set' if callback else 'cleared'}") return 0 def set_send_data(self, data: bytes) -> None: @@ -517,7 +615,11 @@ def get_recv_data(self) -> Optional[bytes]: return self._recv_data def _connect_to_remote(self) -> None: - """Connect to remote partner (active mode).""" + """Connect to remote partner (active mode). + + Performs COTP connection followed by S7 Communication Setup + to negotiate PDU size with the remote partner. + """ if not self.remote_ip: raise S7ConnectionError("Remote IP not specified for active partner") @@ -527,8 +629,11 @@ def _connect_to_remote(self) -> None: self._connection.connect() self._socket = self._connection.socket - self.connected = True + # Perform S7 Communication Setup (negotiate PDU size) + self._setup_communication() + + self.connected = True logger.info(f"Connected to remote partner at {self.remote_ip}:{self.port}") def _start_listening(self) -> None: @@ -549,7 +654,11 @@ def _start_listening(self) -> None: accept_thread.start() def _accept_connection(self) -> None: - """Accept incoming connection in passive mode.""" + """Accept incoming connection in passive mode. + + After accepting the TCP connection, handles the COTP Connection Request + from the active partner and performs S7 Communication Setup. + """ if self._server_socket is None: return @@ -563,9 +672,16 @@ def _accept_connection(self) -> None: host=addr[0], port=addr[1], local_tsap=self.local_tsap, remote_tsap=self.remote_tsap ) self._connection.socket = client_sock + + # Handle COTP Connection Request from active partner + self._handle_cotp_cr(client_sock) + self._connection.connected = True - self.connected = True + # Wait for and handle S7 Communication Setup from active partner + self._handle_setup_communication() + + self.connected = True logger.info(f"Partner connection accepted from {addr}") break @@ -577,17 +693,16 @@ def _accept_connection(self) -> None: break def _async_processor(self) -> None: - """Background thread for processing async operations.""" + """Background thread for processing async send operations.""" while not self._stop_event.is_set(): - # Process async sends try: data = self._async_send_queue.get(timeout=0.1) try: - # Temporarily set send data and call b_send old_data = self._send_data self._send_data = data - result = self.b_send() + with self._io_lock: + result = self.b_send() self._send_data = old_data self._async_send_result = result @@ -605,62 +720,349 @@ def _async_processor(self) -> None: except Exception: break - def _build_partner_data_pdu(self, data: bytes) -> bytes: + def _recv_listener(self) -> None: + """Background thread that listens for incoming partner data. + + Runs while ``_async_recv_in_progress`` is set. Uses a short + socket timeout so the thread can be stopped cleanly and releases + ``_io_lock`` between attempts to allow sends to proceed. + """ + while not self._stop_event.is_set() and self._async_recv_in_progress: + conn = self._connection + if not self.connected or conn is None or conn.socket is None: + break + + old_timeout = conn.socket.gettimeout() + try: + conn.socket.settimeout(0.2) + with self._io_lock: + data = conn.receive_data() + received, _r_id, pdu_ref = self._parse_partner_data_pdu(data) + ack = self._build_partner_ack(pdu_ref) + conn.send_data(ack) + except (S7TimeoutError, socket.timeout): + # Timeout is expected — restore connected flag since + # ISOTCPConnection.receive_data() sets it to False on timeout + if conn is not None: + conn.connected = True + continue + except Exception as e: + self.recv_errors += 1 + self._async_recv_result = -1 + self._async_recv_in_progress = False + logger.error(f"Async receive failed: {e}") + break + finally: + try: + if conn is not None and conn.socket is not None: + conn.socket.settimeout(old_timeout) + except OSError: + pass + + # Data received successfully + self._recv_data = received + self._async_recv_queue.put(received) + self.bytes_recv += len(received) + self._async_recv_result = 0 + self._async_recv_in_progress = False + + if self._recv_callback: + self._recv_callback(received) + + logger.debug(f"Async received {len(received)} bytes") + + def _setup_communication(self) -> None: + """Perform S7 Communication Setup after COTP connection. + + Sends a Setup Communication request and parses the negotiated + PDU length from the response. This is required before any S7 + data exchange can take place. + """ + if self._connection is None: + raise S7ConnectionError("No connection for S7 setup") + + request = self._protocol.build_setup_communication_request(max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length) + self._connection.send_data(request) + response_data = self._connection.receive_data() + response = self._protocol.parse_response(response_data) + + if response.get("parameters") and "pdu_length" in response["parameters"]: + self.pdu_length = response["parameters"]["pdu_length"] + + logger.info(f"S7 Communication Setup complete, PDU length: {self.pdu_length}") + + def _handle_cotp_cr(self, sock: socket.socket) -> None: + """Handle incoming COTP Connection Request and send Connection Confirm. + + Used by passive partner to complete the COTP handshake initiated + by the active partner. + """ + # Receive TPKT header (4 bytes) + tpkt_header = self._recv_exact_from(sock, 4) + version, _, length = struct.unpack(">BBH", tpkt_header) + if version != 3: + raise S7ConnectionError(f"Invalid TPKT version: {version}") + + payload = self._recv_exact_from(sock, length - 4) + if len(payload) < 7: + raise S7ConnectionError("COTP CR too short") + + pdu_type = payload[1] + if pdu_type != 0xE0: # COTP_CR + raise S7ConnectionError(f"Expected COTP CR (0xE0), got {pdu_type:#04x}") + + # Build and send Connection Confirm + if self._connection is None: + raise S7ConnectionError("No connection object") + + cc_pdu = struct.pack( + ">BBHHB", + 6, # PDU length + 0xD0, # COTP_CC + self._connection.src_ref, # Destination reference (our src_ref) + 0x0001, # Source reference + 0x00, # Class 0 + ) + # Add PDU size parameter + cc_pdu += struct.pack(">BBB", 0xC0, 1, 0x0A) # 1024 bytes + # Update length byte + total_len = len(cc_pdu) - 1 + cc_pdu = struct.pack(">B", total_len) + cc_pdu[1:] + + tpkt = struct.pack(">BBH", 3, 0, len(cc_pdu) + 4) + cc_pdu + sock.sendall(tpkt) + logger.debug("Sent COTP Connection Confirm") + + def _handle_setup_communication(self) -> None: + """Handle incoming S7 Communication Setup request from active partner. + + Receives the setup request, parses it, and sends back a setup response + with the negotiated PDU length. """ - Build partner data PDU. + if self._connection is None: + raise S7ConnectionError("No connection for S7 setup") + + request_data = self._connection.receive_data() + if len(request_data) < 10: + raise S7ConnectionError("S7 setup request too short") + + protocol_id, pdu_type = struct.unpack(">BB", request_data[:2]) + if protocol_id != 0x32 or pdu_type != S7PDUType.REQUEST: + raise S7ConnectionError(f"Expected S7 setup request, got type {pdu_type:#04x}") + + # Parse the request to get sequence number and requested PDU length + _, _, _, sequence, param_len, _ = struct.unpack(">BBHHHH", request_data[:10]) + requested_pdu = self.pdu_length + if param_len >= 8: + params = request_data[10 : 10 + param_len] + if len(params) >= 8: + _, _, _, _, requested_pdu = struct.unpack(">BBHHH", params[:8]) + + negotiated_pdu = min(requested_pdu, self.pdu_length) + self.pdu_length = negotiated_pdu + + # Build and send setup response + response = struct.pack( + ">BBHHHHBB", + 0x32, + S7PDUType.ACK_DATA, + 0x0000, + sequence, + 0x0008, # param length + 0x0000, # data length + 0x00, # error class + 0x00, # error code + ) + response += struct.pack( + ">BBHHH", + 0xF0, # Setup Communication function code + 0x00, + 1, # max_amq_caller + 1, # max_amq_callee + negotiated_pdu, + ) + self._connection.send_data(response) + logger.info(f"S7 Communication Setup complete (passive), PDU length: {negotiated_pdu}") + + @staticmethod + def _recv_exact_from(sock: socket.socket, size: int) -> bytes: + """Receive exactly *size* bytes from a socket.""" + data = bytearray() + while len(data) < size: + chunk = sock.recv(size - len(data)) + if not chunk: + raise S7ConnectionError("Connection closed during receive") + data.extend(chunk) + return bytes(data) + + def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> bytes: + """Build an S7 USERDATA PDU for partner data push (bsend). + + The PDU uses the standard S7 USERDATA header (10 bytes) followed by + a parameter section that identifies this as a PBC (Program Block + Communication) push with the R-ID and a variable specification + block, and a data section with the payload. Args: - data: Data to send + data: Payload to send. + r_id: Request ID for bsend/brecv matching. Falls back to ``self.r_id``. Returns: - PDU bytes + Complete S7 PDU bytes (without COTP/TPKT framing). """ - # S7 partner data PDU format: - # Header + Data + if r_id is None: + r_id = self.r_id + + sequence = self._protocol._next_sequence() + + # Parameter section: USERDATA header (12 bytes) + param = struct.pack( + ">BBBBBBBBBBH", + 0x00, # reserved + 0x01, # parameter count + 0x12, # type header + 0x08, # length of following parameter data + 0x12, # method: extended parameter + 0x46, # type 4 (request) | group 6 (PBC BSEND) + _PUSH_SUBFUNCTION_BSEND, + 0x00, # sequence number (always 0 for PBC) + 0x00, # data unit reference number + 0x00, # last data unit + 0x0000, # error code + ) + + # Data section: header + variable spec + R-ID + payload length + payload + # Variable specification: type=0x12, len=0x06, syntax_id=0x13, reserved=0x00 + var_spec = struct.pack(">BBBB", 0x12, 0x06, 0x13, 0x00) + # R-ID (4 bytes) + payload length (2 bytes) + var_spec += struct.pack(">IH", r_id, len(data)) + # Data header: return_code=0xFF, transport_size=0x09, length=varspec+data + data_section = struct.pack(">BBH", 0xFF, 0x09, len(var_spec) + len(data)) + var_spec + data + + # S7 USERDATA header (10 bytes) header = struct.pack( - ">BBHH", - 0x32, # Protocol ID (S7) - 0x07, # Partner PDU type - len(data), # Data length high - 0x0000, # Reserved + ">BBHHHH", + 0x32, + S7PDUType.USERDATA, + 0x0000, + sequence, + len(param), + len(data_section), ) - return header + data - def _parse_partner_data_pdu(self, pdu: bytes) -> bytes: - """ - Parse partner data PDU. + return header + param + data_section - Args: - pdu: PDU bytes + def _parse_partner_data_pdu(self, pdu: bytes) -> Tuple[bytes, int, int]: + """Parse an incoming partner data push PDU and extract the payload. Returns: - Extracted data + Tuple of (payload, r_id, pdu_ref). *r_id* and *pdu_ref* are + extracted from the variable specification block and the S7 + header respectively. If the variable specification is absent + both default to ``0``. """ if len(pdu) < 6: raise S7Error("Invalid partner PDU: too short") - # Skip header - return pdu[6:] - - def _build_partner_ack(self) -> bytes: - """Build partner acknowledgment PDU.""" - return struct.pack( - ">BBHH", - 0x32, # Protocol ID - 0x08, # ACK type - 0x0000, # Reserved - 0x0000, # Status OK + protocol_id, pdu_type = struct.unpack(">BB", pdu[:2]) + + if protocol_id != 0x32: + raise S7Error(f"Invalid protocol ID: {protocol_id:#04x}") + + if pdu_type == S7PDUType.USERDATA: + if len(pdu) < 10: + raise S7Error("USERDATA partner PDU too short") + _, _, _, pdu_ref, param_len, data_len = struct.unpack(">BBHHHH", pdu[:10]) + data_offset = 10 + param_len + if data_offset + 4 > len(pdu): + raise S7Error("Partner data section too short") + # Skip 4-byte data section header (return_code, transport_size, length) + payload = pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" + # Parse PBC variable specification block if present + # Format: 12 06 13 00 [R-ID 4 bytes] [length 2 bytes] = 10 bytes + r_id = 0 + if len(payload) >= 2 and payload[0] == 0x12: + var_len = payload[1] + if var_len == 0x06 and len(payload) >= 8: + syntax_id = payload[2] + if syntax_id == 0x13: + (r_id,) = struct.unpack(">I", payload[4:8]) + # skip var spec header (2) + body (var_len) + length field (2) + payload = payload[2 + var_len + 2 :] + return payload, r_id, pdu_ref + else: + raise S7Error(f"Unexpected PDU type in partner data: {pdu_type:#04x}") + + def _build_partner_ack(self, pdu_ref: int = 0) -> bytes: + """Build an S7 USERDATA acknowledgment PDU for a received bsend. + + The PLC expects the same PDU reference in the ACK as in the + data PDU it sent. + + Args: + pdu_ref: Protocol Data Unit reference echoed from the data PDU. + + Returns: + Complete S7 PDU bytes. + """ + sequence = self._protocol._next_sequence() + + param = struct.pack( + ">BBBBBBBB", + 0x00, + 0x01, + 0x12, + 0x08, # length: 4 base + 2 (dur/ldu) + 2 (error code) + 0x12, # method: response + 0x86, # type 8 (response) | group 6 (push) + _PUSH_SUBFUNCTION_BSEND, + sequence & 0xFF, + ) + param += struct.pack(">BBH", 0x00, 0x00, 0x0000) # dur, ldu, error_code + + # Data section: return code 0x0a, transport size 0x00, length 0x0000 + data = struct.pack(">BBH", 0x0A, 0x00, 0x0000) + + header = struct.pack( + ">BBHHHH", + 0x32, + S7PDUType.USERDATA, + 0x0000, + pdu_ref, + len(param), + len(data), ) + return header + param + data + def _parse_partner_ack(self, pdu: bytes) -> None: - """Parse partner acknowledgment PDU.""" + """Parse a partner acknowledgment PDU. + + Validates that the PDU is a proper S7 USERDATA response for a push + acknowledgment and checks for error codes. + """ if len(pdu) < 6: raise S7Error("Invalid partner ACK: too short") protocol_id, pdu_type = struct.unpack(">BB", pdu[:2]) - - if pdu_type != 0x08: - raise S7Error(f"Expected partner ACK, got {pdu_type:#02x}") + if protocol_id != 0x32: + raise S7Error(f"Invalid protocol ID in ACK: {protocol_id:#04x}") + + if pdu_type != S7PDUType.USERDATA: + raise S7Error(f"Expected partner ACK (USERDATA), got {pdu_type:#04x}") + + # Check for error code in parameter section + if len(pdu) >= 10: + _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) + param = pdu[10 : 10 + param_len] + # Parameter layout: 00 01 12 LL [method tg sf seq ...] [error_code] + if len(param) >= 4: + sub_len = param[3] + if sub_len >= 8 and len(param) >= 12: + # Error code is at offset 10-11 within param (bytes 6-7 after 12 LL) + error_code = struct.unpack(">H", param[10:12])[0] + if error_code != 0: + raise S7Error(f"Partner ACK error: {error_code:#06x}") def __enter__(self) -> "Partner": """Context manager entry.""" diff --git a/tests/test_api_surface.py b/tests/test_api_surface.py index b7b5e7e6..a81d9a30 100644 --- a/tests/test_api_surface.py +++ b/tests/test_api_surface.py @@ -163,7 +163,11 @@ "Par_AsBSend": "as_b_send", "Par_CheckAsBSendCompletion": "check_as_b_send_completion", "Par_WaitAsBSendCompletion": "wait_as_b_send_completion", + "Par_AsBRecv": "as_b_recv", "Par_CheckAsBRecvCompletion": "check_as_b_recv_completion", + "Par_WaitAsBRecvCompletion": "wait_as_b_recv_completion", + "Par_SetRecvCallback": "set_recv_callback", + "Par_SetSendCallback": "set_send_callback", "Par_GetParam": "get_param", "Par_SetParam": "set_param", "Par_GetTimes": "get_times", diff --git a/tests/test_partner.py b/tests/test_partner.py index 570fbca9..cea1b5c1 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -143,29 +143,41 @@ def test_build_partner_data_pdu_small(self) -> None: p = Partner() data = b"\x01\x02\x03" pdu = p._build_partner_data_pdu(data) + # S7 USERDATA header assert pdu[0:1] == b"\x32" assert pdu[1:2] == b"\x07" - assert struct.unpack(">H", pdu[2:4])[0] == len(data) - assert pdu[6:] == data + # Roundtrip recovers the payload + payload, r_id, pdu_ref = p._parse_partner_data_pdu(pdu) + assert payload == data def test_build_partner_data_pdu_empty(self) -> None: p = Partner() pdu = p._build_partner_data_pdu(b"") assert pdu[0:1] == b"\x32" - assert struct.unpack(">H", pdu[2:4])[0] == 0 + payload, _, _ = p._parse_partner_data_pdu(pdu) + assert payload == b"" def test_build_partner_data_pdu_large(self) -> None: p = Partner() data = bytes(range(256)) * 4 # 1024 bytes pdu = p._build_partner_data_pdu(data) - assert struct.unpack(">H", pdu[2:4])[0] == 1024 - assert pdu[6:] == data + payload, _, _ = p._parse_partner_data_pdu(pdu) + assert payload == data + + def test_build_partner_data_pdu_r_id(self) -> None: + """R-ID is embedded in the data section and extracted by parser.""" + p = Partner() + p.r_id = 0xDEADBEEF + pdu = p._build_partner_data_pdu(b"\x01") + payload, r_id, _pdu_ref = p._parse_partner_data_pdu(pdu) + assert payload == b"\x01" + assert r_id == 0xDEADBEEF def test_parse_partner_data_pdu_roundtrip(self) -> None: p = Partner() original = b"Hello, Partner!" pdu = p._build_partner_data_pdu(original) - parsed = p._parse_partner_data_pdu(pdu) + parsed, _, _ = p._parse_partner_data_pdu(pdu) assert parsed == original def test_parse_partner_data_pdu_roundtrip_various_sizes(self) -> None: @@ -173,7 +185,8 @@ def test_parse_partner_data_pdu_roundtrip_various_sizes(self) -> None: for size in [0, 1, 10, 100, 500, 1024]: data = (bytes(range(256)) * (size // 256 + 1))[:size] pdu = p._build_partner_data_pdu(data) - assert p._parse_partner_data_pdu(pdu) == data + payload, _, _ = p._parse_partner_data_pdu(pdu) + assert payload == data def test_parse_partner_data_pdu_too_short(self) -> None: p = Partner() @@ -183,9 +196,16 @@ def test_parse_partner_data_pdu_too_short(self) -> None: def test_build_partner_ack(self) -> None: p = Partner() ack = p._build_partner_ack() - assert len(ack) == 6 + # S7 USERDATA header (10 bytes) + parameter section + data section assert ack[0:1] == b"\x32" - assert ack[1:2] == b"\x08" + assert ack[1:2] == b"\x07" # USERDATA type + + def test_build_partner_ack_pdu_ref(self) -> None: + """ACK echoes the PDU reference from the data PDU.""" + p = Partner() + ack = p._build_partner_ack(pdu_ref=0x1234) + _, _, _, pdu_ref, _, _ = struct.unpack(">BBHHHH", ack[:10]) + assert pdu_ref == 0x1234 def test_parse_partner_ack_valid(self) -> None: p = Partner() @@ -199,7 +219,8 @@ def test_parse_partner_ack_too_short(self) -> None: def test_parse_partner_ack_wrong_type(self) -> None: p = Partner() - bad_ack = struct.pack(">BBHH", 0x32, 0x07, 0x0000, 0x0000) + # Build a PDU with REQUEST type instead of USERDATA + bad_ack = struct.pack(">BBHHHH", 0x32, 0x01, 0x0000, 0x0000, 0x0000, 0x0000) with pytest.raises(S7Error, match="Expected partner ACK"): p._parse_partner_ack(bad_ack) @@ -327,6 +348,50 @@ def test_as_b_send_not_connected(self) -> None: result = p.as_b_send() assert result == -1 + def test_as_b_recv_not_connected(self) -> None: + p = Partner() + assert p.as_b_recv() == -1 + + def test_as_b_recv_already_in_progress(self) -> None: + p = Partner() + p.connected = True + p._async_recv_in_progress = True + assert p.as_b_recv() == -1 + p._async_recv_in_progress = False + + def test_wait_as_b_recv_no_operation(self) -> None: + p = Partner() + with pytest.raises(RuntimeError, match="No async receive"): + p.wait_as_b_recv_completion() + + def test_wait_as_b_recv_timeout(self) -> None: + p = Partner() + p._async_recv_in_progress = True + result = p.wait_as_b_recv_completion(timeout=50) + assert result == -1 + p._async_recv_in_progress = False + + def test_wait_as_b_recv_completes(self) -> None: + p = Partner() + p._async_recv_in_progress = True + p._async_recv_result = 0 + + def clear_flag() -> None: + time.sleep(0.05) + p._async_recv_in_progress = False + + t = threading.Thread(target=clear_flag) + t.start() + result = p.wait_as_b_recv_completion(timeout=2000) + t.join() + assert result == 0 + + def test_check_as_b_recv_completion_error(self) -> None: + p = Partner() + p._async_recv_result = -1 + assert p.check_as_b_recv_completion() == -1 + p._async_recv_result = 0 + def test_check_as_b_recv_completion_empty(self) -> None: p = Partner() assert p.check_as_b_recv_completion() == 1 @@ -407,10 +472,24 @@ def test_set_recv_callback_returns_zero(self) -> None: p = Partner() assert p.set_recv_callback() == 0 + def test_set_recv_callback_with_function(self) -> None: + p = Partner() + assert p.set_recv_callback(lambda data: None) == 0 + assert p._recv_callback is not None + assert p.set_recv_callback(None) == 0 + assert p._recv_callback is None + def test_set_send_callback_returns_zero(self) -> None: p = Partner() assert p.set_send_callback() == 0 + def test_set_send_callback_with_function(self) -> None: + p = Partner() + assert p.set_send_callback(lambda result: None) == 0 + assert p._send_callback_fn is not None + assert p.set_send_callback(None) == 0 + assert p._send_callback_fn is None + # --------------------------------------------------------------------------- # Dual-partner integration tests using raw socket pairing @@ -661,6 +740,198 @@ def do_send() -> None: pa.stop() pb.stop() + def test_as_b_recv_with_check(self) -> None: + """Async receive completes and data is available via check_as_b_recv_completion.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + payload = b"async recv test" + + # Start async receive on B + assert pb.as_b_recv() == 0 + + # Send from A (in a thread because b_send blocks waiting for ACK) + errors: list[Exception] = [] + + def do_send() -> None: + try: + pa.set_send_data(payload) + pa.b_send() + except Exception as e: + errors.append(e) + + t = threading.Thread(target=do_send) + t.start() + + # Poll until receive completes + deadline = time.time() + 3.0 + while time.time() < deadline: + if pb.check_as_b_recv_completion() == 0: + break + time.sleep(0.01) + + t.join(timeout=3.0) + assert not errors + assert pb.get_recv_data() == payload + finally: + pa.stop() + pb.stop() + + def test_as_b_recv_with_wait(self) -> None: + """Async receive completes when using wait_as_b_recv_completion.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + payload = b"wait recv test" + + assert pb.as_b_recv() == 0 + + errors: list[Exception] = [] + + def do_send() -> None: + try: + pa.set_send_data(payload) + pa.b_send() + except Exception as e: + errors.append(e) + + t = threading.Thread(target=do_send) + t.start() + + result = pb.wait_as_b_recv_completion(timeout=3000) + t.join(timeout=3.0) + assert result == 0 + assert not errors + assert pb.get_recv_data() == payload + finally: + pa.stop() + pb.stop() + + def test_as_b_recv_callback_fires(self) -> None: + """Receive callback is invoked during async receive.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + received_data: list[bytes] = [] + pb.set_recv_callback(lambda data: received_data.append(data)) + + payload = b"callback async" + assert pb.as_b_recv() == 0 + + errors: list[Exception] = [] + + def do_send() -> None: + try: + pa.set_send_data(payload) + pa.b_send() + except Exception as e: + errors.append(e) + + t = threading.Thread(target=do_send) + t.start() + + result = pb.wait_as_b_recv_completion(timeout=3000) + t.join(timeout=3.0) + assert result == 0 + assert not errors + assert len(received_data) == 1 + assert received_data[0] == payload + finally: + pa.stop() + pb.stop() + + def test_as_b_send_callback_fires(self) -> None: + """Send callback is invoked during async send.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + send_results: list[int] = [] + pa.set_send_callback(lambda result: send_results.append(result)) + + payload = b"send callback" + pa.set_send_data(payload) + + # Start async processor thread for pa + pa._stop_event.clear() + pa._async_thread = threading.Thread(target=pa._async_processor, daemon=True) + pa._async_thread.start() + + assert pa.as_b_send() == 0 + + # Receive on B side + assert pb.b_recv() == 0 + + # Wait for async send to complete + deadline = time.time() + 3.0 + while pa._async_send_in_progress and time.time() < deadline: + time.sleep(0.01) + + assert pb.get_recv_data() == payload + assert len(send_results) == 1 + assert send_results[0] == 0 + finally: + pa.stop() + pb.stop() + + def test_as_b_recv_then_send(self) -> None: + """After async recv completes, sending still works (lock coordination).""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + # Phase 1: A sends, B receives async + assert pb.as_b_recv() == 0 + + errors: list[Exception] = [] + + def do_send_a() -> None: + try: + pa.set_send_data(b"phase1") + pa.b_send() + except Exception as e: + errors.append(e) + + t1 = threading.Thread(target=do_send_a) + t1.start() + result = pb.wait_as_b_recv_completion(timeout=3000) + t1.join(timeout=3.0) + assert result == 0 + assert pb.get_recv_data() == b"phase1" + assert not errors + + # Phase 2: B sends back, A receives sync + pb.set_send_data(b"phase2") + + def do_send_b() -> None: + try: + pb.b_send() + except Exception as e: + errors.append(e) + + t2 = threading.Thread(target=do_send_b) + t2.start() + assert pa.b_recv() == 0 + t2.join(timeout=3.0) + assert pa.get_recv_data() == b"phase2" + assert not errors + finally: + pa.stop() + pb.stop() + def test_b_recv_error_returns_negative(self) -> None: """b_recv returns -1 on receive error when no data arrives.""" sock_a, sock_b = _make_socket_pair()