From 4e9b5b33f48aa78648ace1a1f25499d0c403643a Mon Sep 17 00:00:00 2001 From: deacon Date: Mon, 16 Mar 2026 09:30:40 -0400 Subject: [PATCH] fix: replace deprecated socket send/recv with modern asyncio streams (#51) Caldera 5.0.0's contact_tcp.py calls writer.get_extra_info('socket') which returns a TransportSocket object that lacks send()/recv() methods, causing 'TransportSocket object has no attribute send'. This adds: - c_connection.py: async Connection wrapper around StreamReader/StreamWriter - c_session.py: Session class that caldera's contact_tcp.py imports - tcp_patch.py: runtime patch applied at plugin load to fix the TCP handler's accept/refresh/send methods to use the async Connection wrapper instead of the broken TransportSocket The patch auto-detects whether caldera has already been updated (main branch uses its own TCPSession) and is a no-op in that case. --- app/c_connection.py | 26 ++++++ app/c_session.py | 29 +++++++ app/tcp_patch.py | 127 ++++++++++++++++++++++++++++ hook.py | 9 ++ tests/__init__.py | 0 tests/test_c_connection.py | 55 ++++++++++++ tests/test_c_session.py | 27 ++++++ tests/test_tcp_patch.py | 167 +++++++++++++++++++++++++++++++++++++ 8 files changed, 440 insertions(+) create mode 100644 app/c_connection.py create mode 100644 app/c_session.py create mode 100644 app/tcp_patch.py create mode 100644 tests/__init__.py create mode 100644 tests/test_c_connection.py create mode 100644 tests/test_c_session.py create mode 100644 tests/test_tcp_patch.py diff --git a/app/c_connection.py b/app/c_connection.py new file mode 100644 index 0000000..fe3a5ed --- /dev/null +++ b/app/c_connection.py @@ -0,0 +1,26 @@ +"""Async connection wrapper for TCP sessions. + +Wraps asyncio StreamReader/StreamWriter to provide a simple async +send/recv interface, replacing the deprecated use of raw sockets +obtained via writer.get_extra_info('socket') which returns a +TransportSocket that lacks send()/recv() methods. + +See: https://github.com/mitre/manx/issues/51 +""" + +from app.utility.base_object import BaseObject + + +class Connection(BaseObject): + + def __init__(self, reader, writer): + super().__init__() + self.reader = reader + self.writer = writer + + async def recv(self, num_bytes): + return await self.reader.read(num_bytes) + + async def send(self, data): + self.writer.write(data) + await self.writer.drain() diff --git a/app/c_session.py b/app/c_session.py new file mode 100644 index 0000000..00b6741 --- /dev/null +++ b/app/c_session.py @@ -0,0 +1,29 @@ +"""TCP session model for the manx plugin. + +Imported by caldera's contact_tcp.py (v5.0.0) as: + from plugins.manx.app.c_session import Session + +See: https://github.com/mitre/manx/issues/51 +""" + +from app.utility.base_object import BaseObject + + +class Session(BaseObject): + + @property + def unique(self): + return self.hash('%s' % self.paw) + + def __init__(self, id, paw, connection): + super().__init__() + self.id = id + self.paw = paw + self.connection = connection + + def store(self, ram): + existing = self.retrieve(ram['sessions'], self.unique) + if not existing: + ram['sessions'].append(self) + return self.retrieve(ram['sessions'], self.unique) + return existing diff --git a/app/tcp_patch.py b/app/tcp_patch.py new file mode 100644 index 0000000..1479759 --- /dev/null +++ b/app/tcp_patch.py @@ -0,0 +1,127 @@ +"""Patch for caldera's TcpSessionHandler to fix deprecated socket API. + +Caldera 5.0.0's contact_tcp.py uses writer.get_extra_info('socket') which +returns a TransportSocket object that lacks send()/recv() methods, causing: + + AttributeError: 'TransportSocket' object has no attribute 'send' + +This module patches the TCP handler at runtime to use asyncio +StreamReader/StreamWriter via the Connection wrapper instead. + +If caldera has already been updated to use its own TCPSession class (main +branch), the patch detects this and is a no-op. + +See: https://github.com/mitre/manx/issues/51 +""" + +import json +import socket +import time + +from typing import Tuple + +from plugins.manx.app.c_connection import Connection +from plugins.manx.app.c_session import Session + + +def _needs_patching(handler): + """Check if the handler uses the old broken socket API. + + Returns False if the handler already uses modern stream-based sessions + (e.g., caldera main branch with TCPSession). + """ + # If the accept method references 'get_extra_info', it needs patching. + # If it's already been patched or uses TCPSession, skip. + import inspect + try: + source = inspect.getsource(handler.accept) + return 'get_extra_info' in source + except (OSError, TypeError): + # Can't inspect, assume patching is safe (it's idempotent) + return True + + +async def _patched_accept(self, reader, writer): + """Replacement for TcpSessionHandler.accept that uses Connection wrapper.""" + try: + profile = await self._handshake(reader) + except Exception as e: + self.log.debug('Handshake failed: %s' % e) + return + connection = Connection(reader, writer) + profile['executors'] = [e for e in profile['executors'].split(',') if e] + profile['contact'] = 'tcp' + agent, _ = await self.services.get('contact_svc').handle_heartbeat(**profile) + new_session = Session(id=self.generate_number(size=6), paw=agent.paw, connection=connection) + self.sessions.append(new_session) + await self.send(new_session.id, agent.paw, timeout=5) + + +async def _patched_refresh(self): + """Replacement for TcpSessionHandler.refresh that uses async send.""" + index = 0 + while index < len(self.sessions): + session = self.sessions[index] + try: + await session.connection.send(str.encode(' ')) + except (socket.error, OSError, ConnectionError): + self.log.debug( + 'Error occurred when refreshing session %s. Removing from session pool.', + session.id + ) + del self.sessions[index] + else: + index += 1 + + +async def _patched_send(self, session_id: int, cmd: str, timeout: int = 60) -> Tuple[int, str, str, str]: + """Replacement for TcpSessionHandler.send that uses async Connection.""" + try: + conn = next(i.connection for i in self.sessions if i.id == int(session_id)) + await conn.send(str.encode(' ')) + time.sleep(0.01) + await conn.send(str.encode('%s\n' % cmd)) + response = await _patched_attempt_connection(self, session_id, conn, timeout=timeout) + response = json.loads(response) + return response['status'], response['pwd'], response['response'], response.get('agent_reported_time', '') + except Exception as e: + self.log.exception(e) + return 1, '~$ ', str(e), '' + + +async def _patched_attempt_connection(self, session_id, connection, timeout): + """Replacement for TcpSessionHandler._attempt_connection using async recv.""" + buffer = 4096 + data = b'' + waited_seconds = 0 + time.sleep(0.1) # initial wait for fast operations. + while True: + try: + part = await connection.recv(buffer) + data += part + if len(part) < buffer: + break + except BlockingIOError as err: + if waited_seconds < timeout: + time.sleep(1) + waited_seconds += 1 + else: + self.log.error("Timeout reached for session %s", session_id) + return json.dumps(dict(status=1, pwd='~$ ', response=str(err))) + return str(data, 'utf-8') + + +def patch_tcp_handler(handler): + """Apply the modern async stream patches to a TcpSessionHandler instance. + + This is safe to call even if the handler has already been updated: + it checks whether patching is needed first. + """ + if not _needs_patching(handler): + return + + import types + handler.accept = types.MethodType(_patched_accept, handler) + handler.refresh = types.MethodType(_patched_refresh, handler) + handler.send = types.MethodType(_patched_send, handler) + handler._attempt_connection = types.MethodType(_patched_attempt_connection, handler) diff --git a/hook.py b/hook.py index abe0429..8072a89 100644 --- a/hook.py +++ b/hook.py @@ -1,6 +1,7 @@ from app.utility.base_world import BaseWorld from plugins.manx.app.h_terminal import Handle from plugins.manx.app.term_api import TermApi +from plugins.manx.app.tcp_patch import patch_tcp_handler name = 'Terminal' description = 'A toolset which supports terminal access' @@ -13,6 +14,14 @@ async def enable(services): app = services.get('app_svc').application term_api = TermApi(services) + # Patch the TCP handler to fix deprecated socket API (issue #51). + # Caldera 5.0.0 uses writer.get_extra_info('socket') which returns a + # TransportSocket that lacks send()/recv(). This patch replaces the + # handler methods to use asyncio StreamReader/StreamWriter instead. + tcp_contacts = [c for c in services.get('contact_svc').contacts if c.name == 'tcp'] + if tcp_contacts: + patch_tcp_handler(tcp_contacts[0].tcp_handler) + udp_contact = [c for c in services.get('contact_svc').contacts if c.name == 'websocket'] udp_contact[0].handler.handles.append(Handle(tag='manx')) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_c_connection.py b/tests/test_c_connection.py new file mode 100644 index 0000000..7136fd1 --- /dev/null +++ b/tests/test_c_connection.py @@ -0,0 +1,55 @@ +"""Tests for the Connection wrapper class.""" + +import asyncio +from unittest import mock + +import pytest + +from plugins.manx.app.c_connection import Connection + + +@pytest.fixture +def mock_reader(): + reader = mock.AsyncMock() + return reader + + +@pytest.fixture +def mock_writer(): + writer = mock.Mock() + writer.write = mock.Mock() + writer.drain = mock.AsyncMock() + return writer + + +@pytest.fixture +def connection(mock_reader, mock_writer): + return Connection(mock_reader, mock_writer) + + +class TestConnection: + + @pytest.mark.asyncio + async def test_recv_delegates_to_reader(self, connection, mock_reader): + mock_reader.read.return_value = b'hello' + result = await connection.recv(1024) + assert result == b'hello' + mock_reader.read.assert_awaited_once_with(1024) + + @pytest.mark.asyncio + async def test_send_writes_and_drains(self, connection, mock_writer): + await connection.send(b'world') + mock_writer.write.assert_called_once_with(b'world') + mock_writer.drain.assert_awaited_once() + + @pytest.mark.asyncio + async def test_send_empty_data(self, connection, mock_writer): + await connection.send(b'') + mock_writer.write.assert_called_once_with(b'') + mock_writer.drain.assert_awaited_once() + + @pytest.mark.asyncio + async def test_recv_empty(self, connection, mock_reader): + mock_reader.read.return_value = b'' + result = await connection.recv(4096) + assert result == b'' diff --git a/tests/test_c_session.py b/tests/test_c_session.py new file mode 100644 index 0000000..9015e41 --- /dev/null +++ b/tests/test_c_session.py @@ -0,0 +1,27 @@ +"""Tests for the Session class.""" + +from unittest import mock + +from plugins.manx.app.c_session import Session + + +class TestSession: + + def test_session_creation(self): + conn = mock.Mock() + session = Session(id=123, paw='test_paw', connection=conn) + assert session.id == 123 + assert session.paw == 'test_paw' + assert session.connection is conn + + def test_unique_property(self): + conn = mock.Mock() + session = Session(id=1, paw='abc', connection=conn) + # unique should be deterministic for the same paw + assert session.unique == session.unique + + def test_different_paws_different_unique(self): + conn = mock.Mock() + s1 = Session(id=1, paw='paw1', connection=conn) + s2 = Session(id=2, paw='paw2', connection=conn) + assert s1.unique != s2.unique diff --git a/tests/test_tcp_patch.py b/tests/test_tcp_patch.py new file mode 100644 index 0000000..2866513 --- /dev/null +++ b/tests/test_tcp_patch.py @@ -0,0 +1,167 @@ +"""Tests for the TCP handler patch.""" + +import json +import socket +from unittest import mock + +import pytest + +from plugins.manx.app.c_connection import Connection +from plugins.manx.app.c_session import Session +from plugins.manx.app.tcp_patch import ( + patch_tcp_handler, + _patched_accept, + _patched_refresh, + _patched_send, +) + + +def _make_handler(sessions=None): + """Create a mock TcpSessionHandler.""" + handler = mock.Mock() + handler.sessions = sessions or [] + handler.log = mock.Mock() + handler.services = mock.Mock() + handler.generate_number = mock.Mock(return_value=123456) + return handler + + +class TestPatchedRefresh: + + @pytest.mark.asyncio + async def test_removes_dead_sessions(self): + handler = _make_handler() + + dead_conn = mock.AsyncMock() + dead_conn.send.side_effect = OSError('connection reset') + dead_session = mock.Mock() + dead_session.connection = dead_conn + dead_session.id = 1 + + live_conn = mock.AsyncMock() + live_session = mock.Mock() + live_session.connection = live_conn + live_session.id = 2 + + handler.sessions = [dead_session, live_session] + await _patched_refresh(handler) + + assert len(handler.sessions) == 1 + assert handler.sessions[0].id == 2 + + @pytest.mark.asyncio + async def test_keeps_all_live_sessions(self): + handler = _make_handler() + + sessions = [] + for i in range(3): + conn = mock.AsyncMock() + s = mock.Mock() + s.connection = conn + s.id = i + sessions.append(s) + + handler.sessions = sessions + await _patched_refresh(handler) + assert len(handler.sessions) == 3 + + @pytest.mark.asyncio + async def test_removes_all_dead_sessions(self): + handler = _make_handler() + + sessions = [] + for i in range(3): + conn = mock.AsyncMock() + conn.send.side_effect = ConnectionError() + s = mock.Mock() + s.connection = conn + s.id = i + sessions.append(s) + + handler.sessions = sessions + await _patched_refresh(handler) + assert len(handler.sessions) == 0 + + +class TestPatchedSend: + + @pytest.mark.asyncio + async def test_send_returns_parsed_response(self): + handler = _make_handler() + conn = mock.AsyncMock() + session = Session(id=42, paw='testpaw', connection=conn) + handler.sessions = [session] + + response_data = { + 'status': 0, + 'pwd': '/home/test', + 'response': 'root', + 'agent_reported_time': '2024-01-01T00:00:00Z' + } + + # Mock _attempt_connection on the handler + async def mock_attempt(self_unused, sid, c, timeout): + return json.dumps(response_data) + + import plugins.manx.app.tcp_patch as tcp_patch_mod + original = tcp_patch_mod._patched_attempt_connection + tcp_patch_mod._patched_attempt_connection = mock_attempt + try: + status, pwd, response, ts = await _patched_send(handler, 42, 'whoami') + assert status == 0 + assert pwd == '/home/test' + assert response == 'root' + assert ts == '2024-01-01T00:00:00Z' + finally: + tcp_patch_mod._patched_attempt_connection = original + + @pytest.mark.asyncio + async def test_send_handles_exception(self): + handler = _make_handler() + conn = mock.AsyncMock() + conn.send.side_effect = Exception('broken') + session = Session(id=42, paw='testpaw', connection=conn) + handler.sessions = [session] + + status, pwd, response, ts = await _patched_send(handler, 42, 'whoami') + assert status == 1 + assert pwd == '~$ ' + assert 'broken' in response + + +class TestPatchTcpHandler: + + def test_patches_handler_with_old_api(self): + """Test that patch_tcp_handler replaces methods on a handler + whose accept method references get_extra_info.""" + handler = _make_handler() + + # Create a fake accept method that contains 'get_extra_info' + async def old_accept(reader, writer): + connection = writer.get_extra_info('socket') + + handler.accept = old_accept + handler.refresh = mock.AsyncMock() + handler.send = mock.AsyncMock() + handler._attempt_connection = mock.AsyncMock() + + patch_tcp_handler(handler) + + # After patching, the methods should be bound methods of the patched functions + assert 'patched' in handler.accept.__func__.__name__ + + def test_skips_handler_without_old_api(self): + """Test that patch_tcp_handler is a no-op when handler already uses modern API.""" + handler = _make_handler() + + async def modern_accept(reader, writer): + # No get_extra_info here + pass + + handler.accept = modern_accept + original_accept = handler.accept + + patch_tcp_handler(handler) + + # Should not have been replaced + assert handler.accept is original_accept