From 85bb93ced29e1f7e49b81870d0bb13cef34d24ed Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Sat, 4 Apr 2026 12:53:23 +0200 Subject: [PATCH 1/9] Fix partner S7 Communication Setup and bsend/brecv PDU format The partner module was only completing the COTP handshake but skipping the S7 Communication Setup negotiation, causing real PLCs to stay in "awaiting connection" status. Additionally, the bsend/brecv PDUs used a minimal custom format instead of proper S7 USERDATA PDUs with R-ID, making them incompatible with real Siemens PLCs. Changes: - Add S7 Communication Setup after COTP connect (active mode) - Handle incoming COTP CR and S7 Setup for passive mode - Rewrite partner data PDU to use S7 USERDATA format (type 0x07) with push function group (0x06), proper parameter section, and R-ID - Rewrite partner ACK PDU to use S7 USERDATA response format - Add r_id attribute for bsend/brecv matching - Update tests for new PDU format, add R-ID coverage Fixes #668 Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 313 +++++++++++++++++++++++++++++++++++++----- tests/test_partner.py | 35 +++-- 2 files changed, 304 insertions(+), 44 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index d73ccb48..10125aa0 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -19,10 +19,18 @@ from .connection import ISOTCPConnection from .error import S7Error, S7ConnectionError +from .s7protocol import S7Protocol, S7PDUType from .type import Parameter logger = logging.getLogger(__name__) +# S7 partner/push function group +_PUSH_FUNC_GROUP = 0x06 + +# Partner push subfunctions +_PUSH_SUBFUNCTION_DATA = 0x01 # bsend data push +_PUSH_SUBFUNCTION_ACK = 0x02 # bsend acknowledgment + class PartnerStatus: """Partner status constants.""" @@ -76,6 +84,13 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._server_socket: Optional[socket.socket] = None # For passive mode self._connection: Optional[ISOTCPConnection] = None + # S7 protocol handler (for setup communication and PDU formatting) + self._protocol = S7Protocol() + self.pdu_length = 480 + + # R-ID for bsend/brecv matching (default 0, can be set by caller) + self.r_id: int = 0 + # Statistics self.bytes_sent = 0 self.bytes_recv = 0 @@ -517,7 +532,11 @@ def get_recv_data(self) -> Optional[bytes]: return self._recv_data def _connect_to_remote(self) -> None: - """Connect to remote partner (active mode).""" + """Connect to remote partner (active mode). + + Performs COTP connection followed by S7 Communication Setup + to negotiate PDU size with the remote partner. + """ if not self.remote_ip: raise S7ConnectionError("Remote IP not specified for active partner") @@ -527,8 +546,11 @@ def _connect_to_remote(self) -> None: self._connection.connect() self._socket = self._connection.socket - self.connected = True + # Perform S7 Communication Setup (negotiate PDU size) + self._setup_communication() + + self.connected = True logger.info(f"Connected to remote partner at {self.remote_ip}:{self.port}") def _start_listening(self) -> None: @@ -549,7 +571,11 @@ def _start_listening(self) -> None: accept_thread.start() def _accept_connection(self) -> None: - """Accept incoming connection in passive mode.""" + """Accept incoming connection in passive mode. + + After accepting the TCP connection, handles the COTP Connection Request + from the active partner and performs S7 Communication Setup. + """ if self._server_socket is None: return @@ -563,9 +589,16 @@ def _accept_connection(self) -> None: host=addr[0], port=addr[1], local_tsap=self.local_tsap, remote_tsap=self.remote_tsap ) self._connection.socket = client_sock + + # Handle COTP Connection Request from active partner + self._handle_cotp_cr(client_sock) + self._connection.connected = True - self.connected = True + # Wait for and handle S7 Communication Setup from active partner + self._handle_setup_communication() + + self.connected = True logger.info(f"Partner connection accepted from {addr}") break @@ -605,62 +638,270 @@ def _async_processor(self) -> None: except Exception: break - def _build_partner_data_pdu(self, data: bytes) -> bytes: + def _setup_communication(self) -> None: + """Perform S7 Communication Setup after COTP connection. + + Sends a Setup Communication request and parses the negotiated + PDU length from the response. This is required before any S7 + data exchange can take place. """ - Build partner data PDU. + if self._connection is None: + raise S7ConnectionError("No connection for S7 setup") + + request = self._protocol.build_setup_communication_request( + max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length + ) + self._connection.send_data(request) + response_data = self._connection.receive_data() + response = self._protocol.parse_response(response_data) + + if response.get("parameters") and "pdu_length" in response["parameters"]: + self.pdu_length = response["parameters"]["pdu_length"] + + logger.info(f"S7 Communication Setup complete, PDU length: {self.pdu_length}") + + def _handle_cotp_cr(self, sock: socket.socket) -> None: + """Handle incoming COTP Connection Request and send Connection Confirm. + + Used by passive partner to complete the COTP handshake initiated + by the active partner. + """ + # Receive TPKT header (4 bytes) + tpkt_header = self._recv_exact_from(sock, 4) + version, _, length = struct.unpack(">BBH", tpkt_header) + if version != 3: + raise S7ConnectionError(f"Invalid TPKT version: {version}") + + payload = self._recv_exact_from(sock, length - 4) + if len(payload) < 7: + raise S7ConnectionError("COTP CR too short") + + pdu_type = payload[1] + if pdu_type != 0xE0: # COTP_CR + raise S7ConnectionError(f"Expected COTP CR (0xE0), got {pdu_type:#04x}") + + # Build and send Connection Confirm + if self._connection is None: + raise S7ConnectionError("No connection object") + + cc_pdu = struct.pack( + ">BBHHB", + 6, # PDU length + 0xD0, # COTP_CC + self._connection.src_ref, # Destination reference (our src_ref) + 0x0001, # Source reference + 0x00, # Class 0 + ) + # Add PDU size parameter + cc_pdu += struct.pack(">BBB", 0xC0, 1, 0x0A) # 1024 bytes + # Update length byte + total_len = len(cc_pdu) - 1 + cc_pdu = struct.pack(">B", total_len) + cc_pdu[1:] + + tpkt = struct.pack(">BBH", 3, 0, len(cc_pdu) + 4) + cc_pdu + sock.sendall(tpkt) + logger.debug("Sent COTP Connection Confirm") + + def _handle_setup_communication(self) -> None: + """Handle incoming S7 Communication Setup request from active partner. + + Receives the setup request, parses it, and sends back a setup response + with the negotiated PDU length. + """ + if self._connection is None: + raise S7ConnectionError("No connection for S7 setup") + + request_data = self._connection.receive_data() + if len(request_data) < 10: + raise S7ConnectionError("S7 setup request too short") + + protocol_id, pdu_type = struct.unpack(">BB", request_data[:2]) + if protocol_id != 0x32 or pdu_type != S7PDUType.REQUEST: + raise S7ConnectionError(f"Expected S7 setup request, got type {pdu_type:#04x}") + + # Parse the request to get sequence number and requested PDU length + _, _, _, sequence, param_len, _ = struct.unpack(">BBHHHH", request_data[:10]) + requested_pdu = self.pdu_length + if param_len >= 8: + params = request_data[10 : 10 + param_len] + if len(params) >= 8: + _, _, _, _, requested_pdu = struct.unpack(">BBHHH", params[:8]) + + negotiated_pdu = min(requested_pdu, self.pdu_length) + self.pdu_length = negotiated_pdu + + # Build and send setup response + response = struct.pack( + ">BBHHHHBB", + 0x32, + S7PDUType.ACK_DATA, + 0x0000, + sequence, + 0x0008, # param length + 0x0000, # data length + 0x00, # error class + 0x00, # error code + ) + response += struct.pack( + ">BBHHH", + 0xF0, # Setup Communication function code + 0x00, + 1, # max_amq_caller + 1, # max_amq_callee + negotiated_pdu, + ) + self._connection.send_data(response) + logger.info(f"S7 Communication Setup complete (passive), PDU length: {negotiated_pdu}") + + @staticmethod + def _recv_exact_from(sock: socket.socket, size: int) -> bytes: + """Receive exactly *size* bytes from a socket.""" + data = bytearray() + while len(data) < size: + chunk = sock.recv(size - len(data)) + if not chunk: + raise S7ConnectionError("Connection closed during receive") + data.extend(chunk) + return bytes(data) + + def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> bytes: + """Build an S7 USERDATA PDU for partner data push (bsend). + + The PDU uses the standard S7 USERDATA header (10 bytes) followed by + a parameter section that identifies this as a push request and carries + the R-ID, and a data section with the payload. Args: - data: Data to send + data: Payload to send. + r_id: Request ID for bsend/brecv matching. Falls back to ``self.r_id``. Returns: - PDU bytes - """ - # S7 partner data PDU format: - # Header + Data + Complete S7 PDU bytes (without COTP/TPKT framing). + """ + if r_id is None: + r_id = self.r_id + + sequence = self._protocol._next_sequence() + + # Parameter section: 12-byte USERDATA header + 4-byte R-ID + param = struct.pack( + ">BBBBBBBBBBxx", + 0x00, # reserved + 0x01, # parameter count + 0x12, # type header + 0x08, # length of following parameter data + 0x11, # method: request + 0x46, # type 4 (request) | group 6 (push) + _PUSH_SUBFUNCTION_DATA, + sequence & 0xFF, + 0x00, # data unit reference (no fragmentation) + 0x00, # last data unit = yes + ) + param += struct.pack(">I", r_id) + + # Data section: 4-byte header + payload + data_section = struct.pack(">BBH", 0xFF, 0x09, len(data)) + data + + # S7 USERDATA header (10 bytes) header = struct.pack( - ">BBHH", - 0x32, # Protocol ID (S7) - 0x07, # Partner PDU type - len(data), # Data length high - 0x0000, # Reserved + ">BBHHHH", + 0x32, + S7PDUType.USERDATA, + 0x0000, + sequence, + len(param), + len(data_section), ) - return header + data + + return header + param + data_section def _parse_partner_data_pdu(self, pdu: bytes) -> bytes: - """ - Parse partner data PDU. + """Parse an incoming partner data push PDU and extract the payload. - Args: - pdu: PDU bytes + Accepts both the new USERDATA format (with R-ID) and the legacy + minimal format for backward-compatibility with existing tests that + use raw socket pairs. Returns: - Extracted data + The application payload. """ if len(pdu) < 6: raise S7Error("Invalid partner PDU: too short") - # Skip header - return pdu[6:] - - def _build_partner_ack(self) -> bytes: - """Build partner acknowledgment PDU.""" - return struct.pack( - ">BBHH", - 0x32, # Protocol ID - 0x08, # ACK type - 0x0000, # Reserved - 0x0000, # Status OK + protocol_id, pdu_type = struct.unpack(">BB", pdu[:2]) + + if protocol_id != 0x32: + raise S7Error(f"Invalid protocol ID: {protocol_id:#04x}") + + if pdu_type == S7PDUType.USERDATA: + # Full USERDATA format + if len(pdu) < 10: + raise S7Error("USERDATA partner PDU too short") + _, _, _, _, param_len, data_len = struct.unpack(">BBHHHH", pdu[:10]) + data_offset = 10 + param_len + if data_offset + 4 > len(pdu): + raise S7Error("Partner data section too short") + # Skip 4-byte data section header (return_code, transport_size, length) + return pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" + else: + raise S7Error(f"Unexpected PDU type in partner data: {pdu_type:#04x}") + + def _build_partner_ack(self, r_id: Optional[int] = None) -> bytes: + """Build an S7 USERDATA acknowledgment PDU for a received bsend. + + Args: + r_id: Request ID echoed from the data PDU. + + Returns: + Complete S7 PDU bytes. + """ + if r_id is None: + r_id = self.r_id + + sequence = self._protocol._next_sequence() + + param = struct.pack( + ">BBBBBBBBBBxx", + 0x00, + 0x01, + 0x12, + 0x08, + 0x12, # method: response + 0x86, # type 8 (response) | group 6 (push) + _PUSH_SUBFUNCTION_ACK, + sequence & 0xFF, + 0x00, + 0x00, ) + param += struct.pack(">I", r_id) + + header = struct.pack( + ">BBHHHH", + 0x32, + S7PDUType.USERDATA, + 0x0000, + sequence, + len(param), + 0x0000, + ) + + return header + param def _parse_partner_ack(self, pdu: bytes) -> None: - """Parse partner acknowledgment PDU.""" + """Parse a partner acknowledgment PDU. + + Validates that the PDU is a proper S7 USERDATA response for a push + acknowledgment. + """ if len(pdu) < 6: raise S7Error("Invalid partner ACK: too short") protocol_id, pdu_type = struct.unpack(">BB", pdu[:2]) + if protocol_id != 0x32: + raise S7Error(f"Invalid protocol ID in ACK: {protocol_id:#04x}") - if pdu_type != 0x08: - raise S7Error(f"Expected partner ACK, got {pdu_type:#02x}") + if pdu_type != S7PDUType.USERDATA: + raise S7Error(f"Expected partner ACK (USERDATA), got {pdu_type:#04x}") def __enter__(self) -> "Partner": """Context manager entry.""" diff --git a/tests/test_partner.py b/tests/test_partner.py index 570fbca9..8ea107b7 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -143,23 +143,33 @@ def test_build_partner_data_pdu_small(self) -> None: p = Partner() data = b"\x01\x02\x03" pdu = p._build_partner_data_pdu(data) + # S7 USERDATA header assert pdu[0:1] == b"\x32" assert pdu[1:2] == b"\x07" - assert struct.unpack(">H", pdu[2:4])[0] == len(data) - assert pdu[6:] == data + # Roundtrip recovers the payload + assert p._parse_partner_data_pdu(pdu) == data def test_build_partner_data_pdu_empty(self) -> None: p = Partner() pdu = p._build_partner_data_pdu(b"") assert pdu[0:1] == b"\x32" - assert struct.unpack(">H", pdu[2:4])[0] == 0 + assert p._parse_partner_data_pdu(pdu) == b"" def test_build_partner_data_pdu_large(self) -> None: p = Partner() data = bytes(range(256)) * 4 # 1024 bytes pdu = p._build_partner_data_pdu(data) - assert struct.unpack(">H", pdu[2:4])[0] == 1024 - assert pdu[6:] == data + assert p._parse_partner_data_pdu(pdu) == data + + def test_build_partner_data_pdu_r_id(self) -> None: + """R-ID is embedded in the parameter section.""" + p = Partner() + p.r_id = 0xDEADBEEF + pdu = p._build_partner_data_pdu(b"\x01") + # R-ID sits at the end of the 16-byte parameter section (bytes 10..25) + _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) + r_id_bytes = pdu[10 + param_len - 4 : 10 + param_len] + assert struct.unpack(">I", r_id_bytes)[0] == 0xDEADBEEF def test_parse_partner_data_pdu_roundtrip(self) -> None: p = Partner() @@ -183,9 +193,17 @@ def test_parse_partner_data_pdu_too_short(self) -> None: def test_build_partner_ack(self) -> None: p = Partner() ack = p._build_partner_ack() - assert len(ack) == 6 + # S7 USERDATA header (10 bytes) + parameter section assert ack[0:1] == b"\x32" - assert ack[1:2] == b"\x08" + assert ack[1:2] == b"\x07" # USERDATA type + + def test_build_partner_ack_r_id(self) -> None: + """ACK carries the same R-ID.""" + p = Partner() + ack = p._build_partner_ack(r_id=0x12345678) + _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", ack[:10]) + r_id_bytes = ack[10 + param_len - 4 : 10 + param_len] + assert struct.unpack(">I", r_id_bytes)[0] == 0x12345678 def test_parse_partner_ack_valid(self) -> None: p = Partner() @@ -199,7 +217,8 @@ def test_parse_partner_ack_too_short(self) -> None: def test_parse_partner_ack_wrong_type(self) -> None: p = Partner() - bad_ack = struct.pack(">BBHH", 0x32, 0x07, 0x0000, 0x0000) + # Build a PDU with REQUEST type instead of USERDATA + bad_ack = struct.pack(">BBHHHH", 0x32, 0x01, 0x0000, 0x0000, 0x0000, 0x0000) with pytest.raises(S7Error, match="Expected partner ACK"): p._parse_partner_ack(bad_ack) From e24e5b589e9529b609a22560e2b2609ba25b9ef3 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Sat, 4 Apr 2026 12:58:58 +0200 Subject: [PATCH 2/9] Add Partner to s7 package for drop-in snap7 compatibility Re-exports snap7.Partner and PartnerStatus from s7, so users can do `from s7 import Partner` just like Client, AsyncClient, and Server. Co-Authored-By: Claude Opus 4.6 --- s7/__init__.py | 3 +++ s7/partner.py | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 s7/partner.py diff --git a/s7/__init__.py b/s7/__init__.py index 1cfaa189..5964cca1 100644 --- a/s7/__init__.py +++ b/s7/__init__.py @@ -15,6 +15,7 @@ from .client import Client from .async_client import AsyncClient from .server import Server +from .partner import Partner, PartnerStatus from ._protocol import Protocol from snap7.type import Area, Block, WordLen, SrvEvent, SrvArea @@ -24,6 +25,8 @@ "Client", "AsyncClient", "Server", + "Partner", + "PartnerStatus", "Protocol", "Area", "Block", diff --git a/s7/partner.py b/s7/partner.py new file mode 100644 index 00000000..e20729ba --- /dev/null +++ b/s7/partner.py @@ -0,0 +1,21 @@ +"""Unified S7 partner for peer-to-peer communication. + +Wraps :class:`snap7.partner.Partner` so that the ``s7`` package is a +drop-in replacement for ``snap7``, including partner functionality. + +Usage:: + + from s7 import Partner + + partner = Partner(active=True) + partner.port = 102 + partner.r_id = 0x00000001 + partner.start_to("0.0.0.0", "192.168.1.10", 0x1300, 0x1301) + partner.set_send_data(b"Hello") + partner.b_send() + partner.stop() +""" + +from snap7.partner import Partner, PartnerStatus + +__all__ = ["Partner", "PartnerStatus"] From 7b8840e433cfdd4e8783511a5402d95b696aabb5 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Sat, 4 Apr 2026 13:01:14 +0200 Subject: [PATCH 3/9] Fix ruff format in partner.py Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index 10125aa0..68667822 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -648,9 +648,7 @@ def _setup_communication(self) -> None: if self._connection is None: raise S7ConnectionError("No connection for S7 setup") - request = self._protocol.build_setup_communication_request( - max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length - ) + request = self._protocol.build_setup_communication_request(max_amq_caller=1, max_amq_callee=1, pdu_length=self.pdu_length) self._connection.send_data(request) response_data = self._connection.receive_data() response = self._protocol.parse_response(response_data) From f6d3d7d25e0a2ddaf2662f804ff4206408a58302 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Sat, 4 Apr 2026 18:11:25 +0200 Subject: [PATCH 4/9] Fix bsend PDU format: use subfunction 0x06 and compact parameter header The bsend data push was using subfunction 0x01 (push notification) instead of 0x06 (BSend), causing the PLC to reject the PDU with error 0x8404. Also fix the parameter sub-length from 0x08 to 0x04 for requests (no data_unit_ref/last_data_unit/error_code needed), and add error code checking in the ACK parser. Fixes #668 Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index 68667822..fc8ffa1a 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -28,8 +28,7 @@ _PUSH_FUNC_GROUP = 0x06 # Partner push subfunctions -_PUSH_SUBFUNCTION_DATA = 0x01 # bsend data push -_PUSH_SUBFUNCTION_ACK = 0x02 # bsend acknowledgment +_PUSH_SUBFUNCTION_BSEND = 0x06 # bsend data push class PartnerStatus: @@ -781,19 +780,17 @@ def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> by sequence = self._protocol._next_sequence() - # Parameter section: 12-byte USERDATA header + 4-byte R-ID + # Parameter section: USERDATA header + 4-byte R-ID param = struct.pack( - ">BBBBBBBBBBxx", + ">BBBBBBBB", 0x00, # reserved 0x01, # parameter count 0x12, # type header - 0x08, # length of following parameter data + 0x04, # length of following parameter data 0x11, # method: request 0x46, # type 4 (request) | group 6 (push) - _PUSH_SUBFUNCTION_DATA, + _PUSH_SUBFUNCTION_BSEND, sequence & 0xFF, - 0x00, # data unit reference (no fragmentation) - 0x00, # last data unit = yes ) param += struct.pack(">I", r_id) @@ -859,19 +856,17 @@ def _build_partner_ack(self, r_id: Optional[int] = None) -> bytes: sequence = self._protocol._next_sequence() param = struct.pack( - ">BBBBBBBBBBxx", + ">BBBBBBBB", 0x00, 0x01, 0x12, - 0x08, + 0x08, # length: 4 base + 2 (dur/ldu) + 2 (error code) 0x12, # method: response 0x86, # type 8 (response) | group 6 (push) - _PUSH_SUBFUNCTION_ACK, + _PUSH_SUBFUNCTION_BSEND, sequence & 0xFF, - 0x00, - 0x00, ) - param += struct.pack(">I", r_id) + param += struct.pack(">BBHI", 0x00, 0x00, 0x0000, r_id) # dur, ldu, error_code, R-ID header = struct.pack( ">BBHHHH", @@ -889,7 +884,7 @@ def _parse_partner_ack(self, pdu: bytes) -> None: """Parse a partner acknowledgment PDU. Validates that the PDU is a proper S7 USERDATA response for a push - acknowledgment. + acknowledgment and checks for error codes. """ if len(pdu) < 6: raise S7Error("Invalid partner ACK: too short") @@ -901,6 +896,19 @@ def _parse_partner_ack(self, pdu: bytes) -> None: if pdu_type != S7PDUType.USERDATA: raise S7Error(f"Expected partner ACK (USERDATA), got {pdu_type:#04x}") + # Check for error code in parameter section + if len(pdu) >= 10: + _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) + param = pdu[10 : 10 + param_len] + # Parameter layout: 00 01 12 LL [method tg sf seq ...] [error_code] + if len(param) >= 4: + sub_len = param[3] + if sub_len >= 8 and len(param) >= 12: + # Error code is at offset 10-11 within param (bytes 6-7 after 12 LL) + error_code = struct.unpack(">H", param[10:12])[0] + if error_code != 0: + raise S7Error(f"Partner ACK error: {error_code:#06x}") + def __enter__(self) -> "Partner": """Context manager entry.""" return self From 61a78be706e4e5dd0b4ca5544937e81af39c8215 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Sun, 5 Apr 2026 09:36:43 +0200 Subject: [PATCH 5/9] Fix BSend PDU format to match PBC protocol expected by S7-1500 The BSend USERDATA PDU was rejected by real PLCs (error 0x8104 "object does not exist") because the parameter section used request semantics instead of the PBC push format. Changes: - Method byte: 0x12 (push) instead of 0x11 (request) - Type/Group byte: 0x06 (push|PBC) instead of 0x46 (request|PBC) - Sub-length: 0x08 to include dur/ldu/error_code fields - Add variable specification block (12 06 82 41 ...) with payload length - Add PBC prefix (12 00) before payload in data section - Parse and strip PBC prefix in _parse_partner_data_pdu for incoming data Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 32 ++++++++++++++++++++++---------- tests/test_partner.py | 7 +++---- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index fc8ffa1a..01f4f260 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -765,8 +765,9 @@ def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> by """Build an S7 USERDATA PDU for partner data push (bsend). The PDU uses the standard S7 USERDATA header (10 bytes) followed by - a parameter section that identifies this as a push request and carries - the R-ID, and a data section with the payload. + a parameter section that identifies this as a PBC (Program Block + Communication) push with the R-ID and a variable specification + block, and a data section with the payload. Args: data: Payload to send. @@ -780,22 +781,29 @@ def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> by sequence = self._protocol._next_sequence() - # Parameter section: USERDATA header + 4-byte R-ID + # Parameter section: USERDATA header with PBC variable specification param = struct.pack( - ">BBBBBBBB", + ">BBBBBBBBBBH", 0x00, # reserved 0x01, # parameter count 0x12, # type header - 0x04, # length of following parameter data - 0x11, # method: request - 0x46, # type 4 (request) | group 6 (push) + 0x08, # length of following parameter data + 0x12, # method: push + 0x06, # type 0 (push) | group 6 (PBC) _PUSH_SUBFUNCTION_BSEND, sequence & 0xFF, + 0x00, # data unit reference number + 0x00, # last data unit + 0x0000, # error code ) + # R-ID (4 bytes) param += struct.pack(">I", r_id) + # Variable specification block: type=0x12, len=0x06, syntax_id=0x82, transport=0x41, padding, payload_length + param += struct.pack(">BBBBHH", 0x12, 0x06, 0x82, 0x41, 0x0000, len(data)) - # Data section: 4-byte header + payload - data_section = struct.pack(">BBH", 0xFF, 0x09, len(data)) + data + # Data section: 4-byte header + 2-byte prefix (12 00) + payload + payload_with_prefix = struct.pack(">BB", 0x12, 0x00) + data + data_section = struct.pack(">BBH", 0xFF, 0x09, len(payload_with_prefix)) + payload_with_prefix # S7 USERDATA header (10 bytes) header = struct.pack( @@ -837,7 +845,11 @@ def _parse_partner_data_pdu(self, pdu: bytes) -> bytes: if data_offset + 4 > len(pdu): raise S7Error("Partner data section too short") # Skip 4-byte data section header (return_code, transport_size, length) - return pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" + payload = pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" + # Strip PBC prefix (12 00) if present + if len(payload) >= 2 and payload[0] == 0x12 and payload[1] == 0x00: + payload = payload[2:] + return payload else: raise S7Error(f"Unexpected PDU type in partner data: {pdu_type:#04x}") diff --git a/tests/test_partner.py b/tests/test_partner.py index 8ea107b7..b8a1587c 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -162,13 +162,12 @@ def test_build_partner_data_pdu_large(self) -> None: assert p._parse_partner_data_pdu(pdu) == data def test_build_partner_data_pdu_r_id(self) -> None: - """R-ID is embedded in the parameter section.""" + """R-ID is embedded in the parameter section after the USERDATA header.""" p = Partner() p.r_id = 0xDEADBEEF pdu = p._build_partner_data_pdu(b"\x01") - # R-ID sits at the end of the 16-byte parameter section (bytes 10..25) - _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) - r_id_bytes = pdu[10 + param_len - 4 : 10 + param_len] + # R-ID sits at offset 12 within the parameter section (after 12-byte USERDATA header) + r_id_bytes = pdu[10 + 12 : 10 + 12 + 4] assert struct.unpack(">I", r_id_bytes)[0] == 0xDEADBEEF def test_parse_partner_data_pdu_roundtrip(self) -> None: From 29caefd57ddd42f4e29cd4da114339a873fe1ff7 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Mon, 6 Apr 2026 15:43:23 +0200 Subject: [PATCH 6/9] Fix BSend PDU to match PBC format verified against real S7-1500 Based on feedback from real PLC testing (issue #668), the PDU format needed further corrections: - Type/Group byte: 0x46 (request|PBC) was correct after all - Subfunction: 0x01 (not 0x06) - Sequence number in parameter: always 0 for PBC - R-ID moved from parameter to data section variable specification - Data section format: var_spec (12 06 13 00) + R-ID + payload_len + data - Parser updated to strip 10-byte PBC var spec prefix from incoming data This format was confirmed working against a real S7-1500 PLC by the issue reporter. Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 31 ++++++++++++++++--------------- tests/test_partner.py | 8 +++++--- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index 01f4f260..2e836fb7 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -28,7 +28,7 @@ _PUSH_FUNC_GROUP = 0x06 # Partner push subfunctions -_PUSH_SUBFUNCTION_BSEND = 0x06 # bsend data push +_PUSH_SUBFUNCTION_BSEND = 0x01 # bsend data push class PartnerStatus: @@ -781,29 +781,29 @@ def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> by sequence = self._protocol._next_sequence() - # Parameter section: USERDATA header with PBC variable specification + # Parameter section: USERDATA header (12 bytes) param = struct.pack( ">BBBBBBBBBBH", 0x00, # reserved 0x01, # parameter count 0x12, # type header 0x08, # length of following parameter data - 0x12, # method: push - 0x06, # type 0 (push) | group 6 (PBC) + 0x12, # method: extended parameter + 0x46, # type 4 (request) | group 6 (PBC BSEND) _PUSH_SUBFUNCTION_BSEND, - sequence & 0xFF, + 0x00, # sequence number (always 0 for PBC) 0x00, # data unit reference number 0x00, # last data unit 0x0000, # error code ) - # R-ID (4 bytes) - param += struct.pack(">I", r_id) - # Variable specification block: type=0x12, len=0x06, syntax_id=0x82, transport=0x41, padding, payload_length - param += struct.pack(">BBBBHH", 0x12, 0x06, 0x82, 0x41, 0x0000, len(data)) - # Data section: 4-byte header + 2-byte prefix (12 00) + payload - payload_with_prefix = struct.pack(">BB", 0x12, 0x00) + data - data_section = struct.pack(">BBH", 0xFF, 0x09, len(payload_with_prefix)) + payload_with_prefix + # Data section: header + variable spec + R-ID + payload length + payload + # Variable specification: type=0x12, len=0x06, syntax_id=0x13, reserved=0x00 + var_spec = struct.pack(">BBBB", 0x12, 0x06, 0x13, 0x00) + # R-ID (4 bytes) + payload length (2 bytes) + var_spec += struct.pack(">IH", r_id, len(data)) + # Data header: return_code=0xFF, transport_size=0x09, length=varspec+data + data_section = struct.pack(">BBH", 0xFF, 0x09, len(var_spec) + len(data)) + var_spec + data # S7 USERDATA header (10 bytes) header = struct.pack( @@ -846,9 +846,10 @@ def _parse_partner_data_pdu(self, pdu: bytes) -> bytes: raise S7Error("Partner data section too short") # Skip 4-byte data section header (return_code, transport_size, length) payload = pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" - # Strip PBC prefix (12 00) if present - if len(payload) >= 2 and payload[0] == 0x12 and payload[1] == 0x00: - payload = payload[2:] + # Strip PBC variable specification block if present + # Format: 12 06 13 00 [R-ID 4 bytes] [length 2 bytes] = 10 bytes + if len(payload) >= 10 and payload[0] == 0x12 and payload[1] == 0x06: + payload = payload[10:] return payload else: raise S7Error(f"Unexpected PDU type in partner data: {pdu_type:#04x}") diff --git a/tests/test_partner.py b/tests/test_partner.py index b8a1587c..2ef96d2c 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -162,12 +162,14 @@ def test_build_partner_data_pdu_large(self) -> None: assert p._parse_partner_data_pdu(pdu) == data def test_build_partner_data_pdu_r_id(self) -> None: - """R-ID is embedded in the parameter section after the USERDATA header.""" + """R-ID is embedded in the data section variable specification block.""" p = Partner() p.r_id = 0xDEADBEEF pdu = p._build_partner_data_pdu(b"\x01") - # R-ID sits at offset 12 within the parameter section (after 12-byte USERDATA header) - r_id_bytes = pdu[10 + 12 : 10 + 12 + 4] + _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) + # Data section starts after header + param; skip 4-byte data header + 4-byte var spec prefix + data_start = 10 + param_len + 4 + 4 + r_id_bytes = pdu[data_start : data_start + 4] assert struct.unpack(">I", r_id_bytes)[0] == 0xDEADBEEF def test_parse_partner_data_pdu_roundtrip(self) -> None: From 17706cf30e662a3578bf1340d1618c4c6996b113 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Mon, 6 Apr 2026 15:51:39 +0200 Subject: [PATCH 7/9] Implement async receive path for Partner Add the missing async receive functionality so partners can receive data non-blocking, matching the existing async send pattern: - as_b_recv(): start async receive in background listener thread - wait_as_b_recv_completion(timeout): block until data arrives - check_as_b_recv_completion(): poll for completion (enhanced with error state reporting) - _recv_listener(): background thread that monitors the socket for incoming data, parses the PDU, sends ACK, and queues the result Also implemented: - set_recv_callback(callback): register callback for incoming data (was a no-op stub) - set_send_callback(callback): register callback for send completion (was a no-op stub) - Thread-safe I/O via _io_lock to coordinate between async send and receive threads on the shared socket Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 152 +++++++++++++++++++++-- tests/test_api_surface.py | 4 + tests/test_partner.py | 250 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 394 insertions(+), 12 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index 2e836fb7..64c222d6 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -18,7 +18,7 @@ from ctypes import c_int32, c_uint32 from .connection import ISOTCPConnection -from .error import S7Error, S7ConnectionError +from .error import S7Error, S7ConnectionError, S7TimeoutError from .s7protocol import S7Protocol, S7PDUType from .type import Parameter @@ -108,7 +108,9 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._async_send_queue: Queue[Any] = Queue() self._async_recv_queue: Queue[Any] = Queue() self._async_thread: Optional[threading.Thread] = None + self._recv_listener_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() + self._io_lock = threading.Lock() # Last error self.last_error = 0 @@ -118,6 +120,8 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._recv_data: Optional[bytes] = None self._async_send_in_progress = False self._async_send_result = 0 + self._async_recv_in_progress = False + self._async_recv_result = 0 logger.info(f"S7 Partner initialized (active={active}, pure Python implementation)") @@ -201,10 +205,14 @@ def stop(self) -> int: 0 on success """ self._stop_event.set() + self._async_recv_in_progress = False if self._async_thread and self._async_thread.is_alive(): self._async_thread.join(timeout=2.0) + if self._recv_listener_thread and self._recv_listener_thread.is_alive(): + self._recv_listener_thread.join(timeout=2.0) + if self._connection: self._connection.disconnect() self._connection = None @@ -387,19 +395,78 @@ def wait_as_b_send_completion(self, timeout: int = 0) -> int: return self._async_send_result + def as_b_recv(self) -> int: + """ + Start asynchronous receive (non-blocking). + + Begins listening for incoming partner data in the background. + Use :meth:`check_as_b_recv_completion` or + :meth:`wait_as_b_recv_completion` to check for results. + + Returns: + 0 on success (receive initiated), -1 on error + """ + if not self.connected: + self.recv_errors += 1 + return -1 + + if self._async_recv_in_progress: + return -1 + + self._async_recv_in_progress = True + self._async_recv_result = 1 # In progress + + if self._recv_listener_thread is None or not self._recv_listener_thread.is_alive(): + self._recv_listener_thread = threading.Thread(target=self._recv_listener, daemon=True) + self._recv_listener_thread.start() + + logger.debug("Async receive initiated") + return 0 + def check_as_b_recv_completion(self) -> int: """ Check if async receive completed. Returns: - 0 if data available, 1 if in progress + 0 if data available, 1 if in progress, -1 on error """ + if self._async_recv_result == -1: + return -1 + try: self._recv_data = self._async_recv_queue.get_nowait() return 0 # Data available except Empty: return 1 # No data yet + def wait_as_b_recv_completion(self, timeout: int = 0) -> int: + """ + Wait for async receive to complete. + + Args: + timeout: Timeout in milliseconds (0 for infinite) + + Returns: + 0 on success, -1 on timeout/error + + Raises: + RuntimeError: If no async receive operation is in progress + """ + if not self._async_recv_in_progress: + raise RuntimeError("No async receive operation in progress") + + wait_time = timeout / 1000.0 if timeout > 0 else None + start = datetime.now() + + while self._async_recv_in_progress: + if wait_time is not None: + elapsed = (datetime.now() - start).total_seconds() + if elapsed >= wait_time: + return -1 + threading.Event().wait(0.01) + + return self._async_recv_result + def get_status(self) -> c_int32: """ Get partner status. @@ -492,24 +559,35 @@ def set_param(self, parameter: Parameter, value: int) -> int: logger.debug(f"Setting parameter {parameter} to {value}") return 0 - def set_recv_callback(self) -> int: + def set_recv_callback(self, callback: Optional[Callable[[bytes], None]] = None) -> int: """ - Sets the user callback for incoming data. + Register a callback for incoming data. + + The callback is invoked with the received bytes whenever data + arrives via :meth:`b_recv` or async receive. + + Args: + callback: Function called with received data, or ``None`` to clear. Returns: 0 on success """ - logger.debug("set_recv_callback called") + self._recv_callback = callback + logger.debug(f"Receive callback {'set' if callback else 'cleared'}") return 0 - def set_send_callback(self) -> int: + def set_send_callback(self, callback: Optional[Callable[[int], None]] = None) -> int: """ - Sets the user callback for completed async sends. + Register a callback for completed async sends. + + Args: + callback: Function called with the result code, or ``None`` to clear. Returns: 0 on success """ - logger.debug("set_send_callback called") + self._send_callback_fn = callback + logger.debug(f"Send callback {'set' if callback else 'cleared'}") return 0 def set_send_data(self, data: bytes) -> None: @@ -609,17 +687,16 @@ def _accept_connection(self) -> None: break def _async_processor(self) -> None: - """Background thread for processing async operations.""" + """Background thread for processing async send operations.""" while not self._stop_event.is_set(): - # Process async sends try: data = self._async_send_queue.get(timeout=0.1) try: - # Temporarily set send data and call b_send old_data = self._send_data self._send_data = data - result = self.b_send() + with self._io_lock: + result = self.b_send() self._send_data = old_data self._async_send_result = result @@ -637,6 +714,57 @@ def _async_processor(self) -> None: except Exception: break + def _recv_listener(self) -> None: + """Background thread that listens for incoming partner data. + + Runs while ``_async_recv_in_progress`` is set. Uses a short + socket timeout so the thread can be stopped cleanly and releases + ``_io_lock`` between attempts to allow sends to proceed. + """ + while not self._stop_event.is_set() and self._async_recv_in_progress: + conn = self._connection + if not self.connected or conn is None or conn.socket is None: + break + + old_timeout = conn.socket.gettimeout() + try: + conn.socket.settimeout(0.2) + with self._io_lock: + data = conn.receive_data() + received = self._parse_partner_data_pdu(data) + ack = self._build_partner_ack() + conn.send_data(ack) + except (S7TimeoutError, socket.timeout): + # Timeout is expected — restore connected flag since + # ISOTCPConnection.receive_data() sets it to False on timeout + if conn is not None: + conn.connected = True + continue + except Exception as e: + self.recv_errors += 1 + self._async_recv_result = -1 + self._async_recv_in_progress = False + logger.error(f"Async receive failed: {e}") + break + finally: + try: + if conn is not None and conn.socket is not None: + conn.socket.settimeout(old_timeout) + except OSError: + pass + + # Data received successfully + self._recv_data = received + self._async_recv_queue.put(received) + self.bytes_recv += len(received) + self._async_recv_result = 0 + self._async_recv_in_progress = False + + if self._recv_callback: + self._recv_callback(received) + + logger.debug(f"Async received {len(received)} bytes") + def _setup_communication(self) -> None: """Perform S7 Communication Setup after COTP connection. diff --git a/tests/test_api_surface.py b/tests/test_api_surface.py index b7b5e7e6..a81d9a30 100644 --- a/tests/test_api_surface.py +++ b/tests/test_api_surface.py @@ -163,7 +163,11 @@ "Par_AsBSend": "as_b_send", "Par_CheckAsBSendCompletion": "check_as_b_send_completion", "Par_WaitAsBSendCompletion": "wait_as_b_send_completion", + "Par_AsBRecv": "as_b_recv", "Par_CheckAsBRecvCompletion": "check_as_b_recv_completion", + "Par_WaitAsBRecvCompletion": "wait_as_b_recv_completion", + "Par_SetRecvCallback": "set_recv_callback", + "Par_SetSendCallback": "set_send_callback", "Par_GetParam": "get_param", "Par_SetParam": "set_param", "Par_GetTimes": "get_times", diff --git a/tests/test_partner.py b/tests/test_partner.py index 2ef96d2c..7975d8fa 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -347,6 +347,50 @@ def test_as_b_send_not_connected(self) -> None: result = p.as_b_send() assert result == -1 + def test_as_b_recv_not_connected(self) -> None: + p = Partner() + assert p.as_b_recv() == -1 + + def test_as_b_recv_already_in_progress(self) -> None: + p = Partner() + p.connected = True + p._async_recv_in_progress = True + assert p.as_b_recv() == -1 + p._async_recv_in_progress = False + + def test_wait_as_b_recv_no_operation(self) -> None: + p = Partner() + with pytest.raises(RuntimeError, match="No async receive"): + p.wait_as_b_recv_completion() + + def test_wait_as_b_recv_timeout(self) -> None: + p = Partner() + p._async_recv_in_progress = True + result = p.wait_as_b_recv_completion(timeout=50) + assert result == -1 + p._async_recv_in_progress = False + + def test_wait_as_b_recv_completes(self) -> None: + p = Partner() + p._async_recv_in_progress = True + p._async_recv_result = 0 + + def clear_flag() -> None: + time.sleep(0.05) + p._async_recv_in_progress = False + + t = threading.Thread(target=clear_flag) + t.start() + result = p.wait_as_b_recv_completion(timeout=2000) + t.join() + assert result == 0 + + def test_check_as_b_recv_completion_error(self) -> None: + p = Partner() + p._async_recv_result = -1 + assert p.check_as_b_recv_completion() == -1 + p._async_recv_result = 0 + def test_check_as_b_recv_completion_empty(self) -> None: p = Partner() assert p.check_as_b_recv_completion() == 1 @@ -427,10 +471,24 @@ def test_set_recv_callback_returns_zero(self) -> None: p = Partner() assert p.set_recv_callback() == 0 + def test_set_recv_callback_with_function(self) -> None: + p = Partner() + assert p.set_recv_callback(lambda data: None) == 0 + assert p._recv_callback is not None + assert p.set_recv_callback(None) == 0 + assert p._recv_callback is None + def test_set_send_callback_returns_zero(self) -> None: p = Partner() assert p.set_send_callback() == 0 + def test_set_send_callback_with_function(self) -> None: + p = Partner() + assert p.set_send_callback(lambda result: None) == 0 + assert p._send_callback_fn is not None + assert p.set_send_callback(None) == 0 + assert p._send_callback_fn is None + # --------------------------------------------------------------------------- # Dual-partner integration tests using raw socket pairing @@ -681,6 +739,198 @@ def do_send() -> None: pa.stop() pb.stop() + def test_as_b_recv_with_check(self) -> None: + """Async receive completes and data is available via check_as_b_recv_completion.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + payload = b"async recv test" + + # Start async receive on B + assert pb.as_b_recv() == 0 + + # Send from A (in a thread because b_send blocks waiting for ACK) + errors: list[Exception] = [] + + def do_send() -> None: + try: + pa.set_send_data(payload) + pa.b_send() + except Exception as e: + errors.append(e) + + t = threading.Thread(target=do_send) + t.start() + + # Poll until receive completes + deadline = time.time() + 3.0 + while time.time() < deadline: + if pb.check_as_b_recv_completion() == 0: + break + time.sleep(0.01) + + t.join(timeout=3.0) + assert not errors + assert pb.get_recv_data() == payload + finally: + pa.stop() + pb.stop() + + def test_as_b_recv_with_wait(self) -> None: + """Async receive completes when using wait_as_b_recv_completion.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + payload = b"wait recv test" + + assert pb.as_b_recv() == 0 + + errors: list[Exception] = [] + + def do_send() -> None: + try: + pa.set_send_data(payload) + pa.b_send() + except Exception as e: + errors.append(e) + + t = threading.Thread(target=do_send) + t.start() + + result = pb.wait_as_b_recv_completion(timeout=3000) + t.join(timeout=3.0) + assert result == 0 + assert not errors + assert pb.get_recv_data() == payload + finally: + pa.stop() + pb.stop() + + def test_as_b_recv_callback_fires(self) -> None: + """Receive callback is invoked during async receive.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + received_data: list[bytes] = [] + pb.set_recv_callback(lambda data: received_data.append(data)) + + payload = b"callback async" + assert pb.as_b_recv() == 0 + + errors: list[Exception] = [] + + def do_send() -> None: + try: + pa.set_send_data(payload) + pa.b_send() + except Exception as e: + errors.append(e) + + t = threading.Thread(target=do_send) + t.start() + + result = pb.wait_as_b_recv_completion(timeout=3000) + t.join(timeout=3.0) + assert result == 0 + assert not errors + assert len(received_data) == 1 + assert received_data[0] == payload + finally: + pa.stop() + pb.stop() + + def test_as_b_send_callback_fires(self) -> None: + """Send callback is invoked during async send.""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + send_results: list[int] = [] + pa.set_send_callback(lambda result: send_results.append(result)) + + payload = b"send callback" + pa.set_send_data(payload) + + # Start async processor thread for pa + pa._stop_event.clear() + pa._async_thread = threading.Thread(target=pa._async_processor, daemon=True) + pa._async_thread.start() + + assert pa.as_b_send() == 0 + + # Receive on B side + assert pb.b_recv() == 0 + + # Wait for async send to complete + deadline = time.time() + 3.0 + while pa._async_send_in_progress and time.time() < deadline: + time.sleep(0.01) + + assert pb.get_recv_data() == payload + assert len(send_results) == 1 + assert send_results[0] == 0 + finally: + pa.stop() + pb.stop() + + def test_as_b_recv_then_send(self) -> None: + """After async recv completes, sending still works (lock coordination).""" + sock_a, sock_b = _make_socket_pair() + pa, pb = Partner(), Partner() + try: + _wire_partner(pa, sock_a) + _wire_partner(pb, sock_b) + + # Phase 1: A sends, B receives async + assert pb.as_b_recv() == 0 + + errors: list[Exception] = [] + + def do_send_a() -> None: + try: + pa.set_send_data(b"phase1") + pa.b_send() + except Exception as e: + errors.append(e) + + t1 = threading.Thread(target=do_send_a) + t1.start() + result = pb.wait_as_b_recv_completion(timeout=3000) + t1.join(timeout=3.0) + assert result == 0 + assert pb.get_recv_data() == b"phase1" + assert not errors + + # Phase 2: B sends back, A receives sync + pb.set_send_data(b"phase2") + + def do_send_b() -> None: + try: + pb.b_send() + except Exception as e: + errors.append(e) + + t2 = threading.Thread(target=do_send_b) + t2.start() + assert pa.b_recv() == 0 + t2.join(timeout=3.0) + assert pa.get_recv_data() == b"phase2" + assert not errors + finally: + pa.stop() + pb.stop() + def test_b_recv_error_returns_negative(self) -> None: """b_recv returns -1 on receive error when no data arrives.""" sock_a, sock_b = _make_socket_pair() From d6b04b7cb724fe9cd4224876b644e1a0b4e62d28 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Mon, 6 Apr 2026 15:57:25 +0200 Subject: [PATCH 8/9] Fix race condition in wait_as_b_recv_completion The recv listener thread could complete before wait_as_b_recv_completion was called, causing a spurious RuntimeError. Track whether an async recv was started so the wait method returns the result instead of raising when the listener already finished. Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/snap7/partner.py b/snap7/partner.py index 64c222d6..01fa7d0e 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -122,6 +122,7 @@ def __init__(self, active: bool = False, **kwargs: object) -> None: self._async_send_result = 0 self._async_recv_in_progress = False self._async_recv_result = 0 + self._async_recv_started = False logger.info(f"S7 Partner initialized (active={active}, pure Python implementation)") @@ -414,6 +415,7 @@ def as_b_recv(self) -> int: return -1 self._async_recv_in_progress = True + self._async_recv_started = True self._async_recv_result = 1 # In progress if self._recv_listener_thread is None or not self._recv_listener_thread.is_alive(): @@ -450,9 +452,13 @@ def wait_as_b_recv_completion(self, timeout: int = 0) -> int: 0 on success, -1 on timeout/error Raises: - RuntimeError: If no async receive operation is in progress + RuntimeError: If no async receive was ever started """ if not self._async_recv_in_progress: + if self._async_recv_started: + # Listener already finished before wait was called + self._async_recv_started = False + return self._async_recv_result raise RuntimeError("No async receive operation in progress") wait_time = timeout / 1000.0 if timeout > 0 else None From d967940618a57925e32197ef2b7fcf7485fc026e Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Tue, 7 Apr 2026 09:40:41 +0200 Subject: [PATCH 9/9] Fix ACK to echo PDU reference and match PLC expected format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Based on real PLC testing feedback (issue #668): - ACK must echo the PDU reference from the incoming data PDU, not use a new sequence number — the PLC needs this to correlate the response - Remove R-ID from ACK parameter section (not needed) - Add 4-byte data section to ACK (return code 0x0a, transport 0x00) - _parse_partner_data_pdu now returns (payload, r_id, pdu_ref) tuple, extracting R-ID and PDU reference from the variable specification block and S7 header respectively Co-Authored-By: Claude Opus 4.6 --- snap7/partner.py | 60 ++++++++++++++++++++++++------------------- tests/test_partner.py | 37 +++++++++++++------------- 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/snap7/partner.py b/snap7/partner.py index 01fa7d0e..ba94cdb8 100644 --- a/snap7/partner.py +++ b/snap7/partner.py @@ -296,10 +296,10 @@ def b_recv(self) -> int: try: # Receive partner data data = self._connection.receive_data() - received = self._parse_partner_data_pdu(data) + received, _r_id, pdu_ref = self._parse_partner_data_pdu(data) - # Send acknowledgment - ack = self._build_partner_ack() + # Send acknowledgment with the same PDU reference + ack = self._build_partner_ack(pdu_ref) self._connection.send_data(ack) self.bytes_recv += len(received) @@ -737,8 +737,8 @@ def _recv_listener(self) -> None: conn.socket.settimeout(0.2) with self._io_lock: data = conn.receive_data() - received = self._parse_partner_data_pdu(data) - ack = self._build_partner_ack() + received, _r_id, pdu_ref = self._parse_partner_data_pdu(data) + ack = self._build_partner_ack(pdu_ref) conn.send_data(ack) except (S7TimeoutError, socket.timeout): # Timeout is expected — restore connected flag since @@ -952,15 +952,14 @@ def _build_partner_data_pdu(self, data: bytes, r_id: Optional[int] = None) -> by return header + param + data_section - def _parse_partner_data_pdu(self, pdu: bytes) -> bytes: + def _parse_partner_data_pdu(self, pdu: bytes) -> Tuple[bytes, int, int]: """Parse an incoming partner data push PDU and extract the payload. - Accepts both the new USERDATA format (with R-ID) and the legacy - minimal format for backward-compatibility with existing tests that - use raw socket pairs. - Returns: - The application payload. + Tuple of (payload, r_id, pdu_ref). *r_id* and *pdu_ref* are + extracted from the variable specification block and the S7 + header respectively. If the variable specification is absent + both default to ``0``. """ if len(pdu) < 6: raise S7Error("Invalid partner PDU: too short") @@ -971,35 +970,41 @@ def _parse_partner_data_pdu(self, pdu: bytes) -> bytes: raise S7Error(f"Invalid protocol ID: {protocol_id:#04x}") if pdu_type == S7PDUType.USERDATA: - # Full USERDATA format if len(pdu) < 10: raise S7Error("USERDATA partner PDU too short") - _, _, _, _, param_len, data_len = struct.unpack(">BBHHHH", pdu[:10]) + _, _, _, pdu_ref, param_len, data_len = struct.unpack(">BBHHHH", pdu[:10]) data_offset = 10 + param_len if data_offset + 4 > len(pdu): raise S7Error("Partner data section too short") # Skip 4-byte data section header (return_code, transport_size, length) payload = pdu[data_offset + 4 : data_offset + 4 + data_len - 4] if data_len > 4 else b"" - # Strip PBC variable specification block if present + # Parse PBC variable specification block if present # Format: 12 06 13 00 [R-ID 4 bytes] [length 2 bytes] = 10 bytes - if len(payload) >= 10 and payload[0] == 0x12 and payload[1] == 0x06: - payload = payload[10:] - return payload + r_id = 0 + if len(payload) >= 2 and payload[0] == 0x12: + var_len = payload[1] + if var_len == 0x06 and len(payload) >= 8: + syntax_id = payload[2] + if syntax_id == 0x13: + (r_id,) = struct.unpack(">I", payload[4:8]) + # skip var spec header (2) + body (var_len) + length field (2) + payload = payload[2 + var_len + 2 :] + return payload, r_id, pdu_ref else: raise S7Error(f"Unexpected PDU type in partner data: {pdu_type:#04x}") - def _build_partner_ack(self, r_id: Optional[int] = None) -> bytes: + def _build_partner_ack(self, pdu_ref: int = 0) -> bytes: """Build an S7 USERDATA acknowledgment PDU for a received bsend. + The PLC expects the same PDU reference in the ACK as in the + data PDU it sent. + Args: - r_id: Request ID echoed from the data PDU. + pdu_ref: Protocol Data Unit reference echoed from the data PDU. Returns: Complete S7 PDU bytes. """ - if r_id is None: - r_id = self.r_id - sequence = self._protocol._next_sequence() param = struct.pack( @@ -1013,19 +1018,22 @@ def _build_partner_ack(self, r_id: Optional[int] = None) -> bytes: _PUSH_SUBFUNCTION_BSEND, sequence & 0xFF, ) - param += struct.pack(">BBHI", 0x00, 0x00, 0x0000, r_id) # dur, ldu, error_code, R-ID + param += struct.pack(">BBH", 0x00, 0x00, 0x0000) # dur, ldu, error_code + + # Data section: return code 0x0a, transport size 0x00, length 0x0000 + data = struct.pack(">BBH", 0x0A, 0x00, 0x0000) header = struct.pack( ">BBHHHH", 0x32, S7PDUType.USERDATA, 0x0000, - sequence, + pdu_ref, len(param), - 0x0000, + len(data), ) - return header + param + return header + param + data def _parse_partner_ack(self, pdu: bytes) -> None: """Parse a partner acknowledgment PDU. diff --git a/tests/test_partner.py b/tests/test_partner.py index 7975d8fa..cea1b5c1 100644 --- a/tests/test_partner.py +++ b/tests/test_partner.py @@ -147,36 +147,37 @@ def test_build_partner_data_pdu_small(self) -> None: assert pdu[0:1] == b"\x32" assert pdu[1:2] == b"\x07" # Roundtrip recovers the payload - assert p._parse_partner_data_pdu(pdu) == data + payload, r_id, pdu_ref = p._parse_partner_data_pdu(pdu) + assert payload == data def test_build_partner_data_pdu_empty(self) -> None: p = Partner() pdu = p._build_partner_data_pdu(b"") assert pdu[0:1] == b"\x32" - assert p._parse_partner_data_pdu(pdu) == b"" + payload, _, _ = p._parse_partner_data_pdu(pdu) + assert payload == b"" def test_build_partner_data_pdu_large(self) -> None: p = Partner() data = bytes(range(256)) * 4 # 1024 bytes pdu = p._build_partner_data_pdu(data) - assert p._parse_partner_data_pdu(pdu) == data + payload, _, _ = p._parse_partner_data_pdu(pdu) + assert payload == data def test_build_partner_data_pdu_r_id(self) -> None: - """R-ID is embedded in the data section variable specification block.""" + """R-ID is embedded in the data section and extracted by parser.""" p = Partner() p.r_id = 0xDEADBEEF pdu = p._build_partner_data_pdu(b"\x01") - _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", pdu[:10]) - # Data section starts after header + param; skip 4-byte data header + 4-byte var spec prefix - data_start = 10 + param_len + 4 + 4 - r_id_bytes = pdu[data_start : data_start + 4] - assert struct.unpack(">I", r_id_bytes)[0] == 0xDEADBEEF + payload, r_id, _pdu_ref = p._parse_partner_data_pdu(pdu) + assert payload == b"\x01" + assert r_id == 0xDEADBEEF def test_parse_partner_data_pdu_roundtrip(self) -> None: p = Partner() original = b"Hello, Partner!" pdu = p._build_partner_data_pdu(original) - parsed = p._parse_partner_data_pdu(pdu) + parsed, _, _ = p._parse_partner_data_pdu(pdu) assert parsed == original def test_parse_partner_data_pdu_roundtrip_various_sizes(self) -> None: @@ -184,7 +185,8 @@ def test_parse_partner_data_pdu_roundtrip_various_sizes(self) -> None: for size in [0, 1, 10, 100, 500, 1024]: data = (bytes(range(256)) * (size // 256 + 1))[:size] pdu = p._build_partner_data_pdu(data) - assert p._parse_partner_data_pdu(pdu) == data + payload, _, _ = p._parse_partner_data_pdu(pdu) + assert payload == data def test_parse_partner_data_pdu_too_short(self) -> None: p = Partner() @@ -194,17 +196,16 @@ def test_parse_partner_data_pdu_too_short(self) -> None: def test_build_partner_ack(self) -> None: p = Partner() ack = p._build_partner_ack() - # S7 USERDATA header (10 bytes) + parameter section + # S7 USERDATA header (10 bytes) + parameter section + data section assert ack[0:1] == b"\x32" assert ack[1:2] == b"\x07" # USERDATA type - def test_build_partner_ack_r_id(self) -> None: - """ACK carries the same R-ID.""" + def test_build_partner_ack_pdu_ref(self) -> None: + """ACK echoes the PDU reference from the data PDU.""" p = Partner() - ack = p._build_partner_ack(r_id=0x12345678) - _, _, _, _, param_len, _ = struct.unpack(">BBHHHH", ack[:10]) - r_id_bytes = ack[10 + param_len - 4 : 10 + param_len] - assert struct.unpack(">I", r_id_bytes)[0] == 0x12345678 + ack = p._build_partner_ack(pdu_ref=0x1234) + _, _, _, pdu_ref, _, _ = struct.unpack(">BBHHHH", ack[:10]) + assert pdu_ref == 0x1234 def test_parse_partner_ack_valid(self) -> None: p = Partner()