From 05a2a41c7d40f8ff98b21dc75250038b4703d676 Mon Sep 17 00:00:00 2001 From: deacon Date: Mon, 16 Mar 2026 09:24:49 -0400 Subject: [PATCH] Fix deprecated socket API usage (closes #51) - Add Connection class wrapping asyncio StreamReader/StreamWriter to replace removed TransportSocket.send/recv (Python 3.12+) - Update h_terminal.py to use modern aiohttp WebSocket API: socket.receive() instead of socket.recv(), socket.send_str() instead of socket.send() - Add unit tests for Connection class and WebSocket API usage --- app/c_connection.py | 26 ++++++++ app/h_terminal.py | 5 +- tests/__init__.py | 0 tests/test_connection.py | 128 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 app/c_connection.py create mode 100644 tests/__init__.py create mode 100644 tests/test_connection.py diff --git a/app/c_connection.py b/app/c_connection.py new file mode 100644 index 0000000..12dbed7 --- /dev/null +++ b/app/c_connection.py @@ -0,0 +1,26 @@ +class Connection: + """Wraps asyncio StreamReader/StreamWriter to provide send/recv interface. + + Python 3.12 removed the deprecated asyncio.Transport send/recv methods. + This class provides a compatible interface using the modern + StreamReader/StreamWriter API. + """ + + def __init__(self, reader, writer): + self.reader = reader + self.writer = writer + + async def recv(self, num_bytes): + """Read up to num_bytes from the stream.""" + return await self.reader.read(num_bytes) + + async def send(self, data): + """Write data to the stream and flush.""" + if isinstance(data, str): + data = data.encode() + self.writer.write(data) + await self.writer.drain() + + def close(self): + """Close the underlying writer.""" + self.writer.close() diff --git a/app/h_terminal.py b/app/h_terminal.py index a845711..bb2a235 100644 --- a/app/h_terminal.py +++ b/app/h_terminal.py @@ -13,11 +13,12 @@ def __init__(self, tag): @staticmethod async def run(socket, path, services): session_id = path.split('/')[2] - cmd = await socket.recv() + msg = await socket.receive() + cmd = msg.data if hasattr(msg, 'data') else str(msg) handler = services.get('term_svc').socket_conn.tcp_handler paw = next(i.paw for i in handler.sessions if i.id == int(session_id)) services.get('contact_svc').report['websocket'].append( dict(paw=paw, date=datetime.now(timezone.utc).strftime(BaseWorld.TIME_FORMAT), cmd=cmd) ) status, pwd, reply, response_time = await handler.send(session_id, cmd) - await socket.send(json.dumps(dict(response=reply.strip(), pwd=pwd, status=status, response_time=response_time))) + await socket.send_str(json.dumps(dict(response=reply.strip(), pwd=pwd, status=status, response_time=response_time))) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_connection.py b/tests/test_connection.py new file mode 100644 index 0000000..dc4d8e2 --- /dev/null +++ b/tests/test_connection.py @@ -0,0 +1,128 @@ +import asyncio +import unittest + + +class TestConnection(unittest.TestCase): + """Tests for the Connection class that wraps asyncio streams.""" + + def test_connection_init(self): + """Connection stores reader and writer references.""" + from app.c_connection import Connection + + mock_reader = object() + mock_writer = object() + conn = Connection(mock_reader, mock_writer) + self.assertIs(conn.reader, mock_reader) + self.assertIs(conn.writer, mock_writer) + + def test_recv(self): + """Connection.recv reads from the underlying StreamReader.""" + from app.c_connection import Connection + + class MockReader: + def __init__(self): + self.read_called_with = None + + async def read(self, n): + self.read_called_with = n + return b"hello" + + class MockWriter: + pass + + reader = MockReader() + conn = Connection(reader, MockWriter()) + result = asyncio.get_event_loop().run_until_complete(conn.recv(1024)) + self.assertEqual(result, b"hello") + self.assertEqual(reader.read_called_with, 1024) + + def test_send_bytes(self): + """Connection.send writes bytes to the underlying StreamWriter.""" + from app.c_connection import Connection + + class MockReader: + pass + + class MockWriter: + def __init__(self): + self.written = None + self.drained = False + + def write(self, data): + self.written = data + + async def drain(self): + self.drained = True + + writer = MockWriter() + conn = Connection(MockReader(), writer) + asyncio.get_event_loop().run_until_complete(conn.send(b"world")) + self.assertEqual(writer.written, b"world") + self.assertTrue(writer.drained) + + def test_send_str_encodes_to_bytes(self): + """Connection.send encodes str to bytes before writing.""" + from app.c_connection import Connection + + class MockReader: + pass + + class MockWriter: + def __init__(self): + self.written = None + + def write(self, data): + self.written = data + + async def drain(self): + pass + + writer = MockWriter() + conn = Connection(MockReader(), writer) + asyncio.get_event_loop().run_until_complete(conn.send("text")) + self.assertEqual(writer.written, b"text") + + def test_close(self): + """Connection.close closes the underlying writer.""" + from app.c_connection import Connection + + class MockReader: + pass + + class MockWriter: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + writer = MockWriter() + conn = Connection(MockReader(), writer) + conn.close() + self.assertTrue(writer.closed) + + +class TestHandleWebSocket(unittest.TestCase): + """Tests for the updated WebSocket API usage in h_terminal.py Handle.""" + + def test_handle_uses_receive_and_send_str(self): + """Verify Handle.run uses socket.receive() and socket.send_str().""" + import ast + with open("app/h_terminal.py") as f: + source = f.read() + + # Ensure deprecated methods are not used + self.assertNotIn("socket.recv()", source, + "Should use socket.receive() instead of deprecated socket.recv()") + self.assertNotIn("await socket.send(", source, + "Should use socket.send_str() instead of deprecated socket.send()") + + # Ensure modern methods are used + self.assertIn("socket.receive()", source, + "Should call socket.receive() for reading WebSocket messages") + self.assertIn("socket.send_str(", source, + "Should call socket.send_str() for sending WebSocket messages") + + +if __name__ == "__main__": + unittest.main()