From 68c6e0bb9cd7af0ce5ea05e1447f6141a8d0f702 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:30:08 +0000 Subject: [PATCH 1/9] Fix flaky TestChildProcessCleanup: poll for file growth, clean up proc in finally The three TestChildProcessCleanup tests were flaky on Windows/macOS CI because they used fixed anyio.sleep() durations (0.5s + 0.3s = 800ms) to wait for nested Python interpreter chains to start writing to marker files. On loaded CI runners, two Python startups + subprocess.Popen + open() + first write() can exceed 800ms, causing assert 0 > 0. Secondary bug: when the assertion failed, proc was never terminated (finally only cleaned up tempfiles). The leaked subprocess was GC'd during a later test, triggering PytestUnraisableExceptionWarning and causing knock-on failures in unrelated tests. Changes: - Added _wait_for_file_size() helper: polls until getsize(path) exceeds a threshold, bounded by anyio.fail_after(10). Replaces all startup sleep+assert chains. Raises TimeoutError with a clear message instead of a confusing assert 0 > 0. - Added proc cleanup to each finally block. Uses proc=None tracking after successful in-test termination to avoid redundant double-calls, and anyio.move_on_after() so cleanup timeout never masks the real failure. - Removed the parent_marker check from test_basic_child_process_cleanup. NamedTemporaryFile(delete=False) already creates the file, so the os.path.exists() assertion was a no-op that never verified anything. Child writing proves parent started. - Simplified shutdown verification: terminate_posix_process_tree already polls for process-group death, so the first post-termination sleep(0.5) was redundant. Reduced to a single 0.3s stability check (3x child write interval). Result: 3 tests drop from ~7s to ~1.9s locally, 30/30 pass under 4-worker parallel load (flake-finder). Closes #1775 Github-Issue: #1775 --- tests/client/test_stdio.py | 368 ++++++++++++++++++------------------- 1 file changed, 180 insertions(+), 188 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index f70c24eee..8f5495c4d 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -221,6 +221,21 @@ def sigint_handler(signum, frame): raise +async def _wait_for_file_size(path: str, larger_than: int = 0, *, timeout: float = 10.0) -> int: + """Poll until the file at ``path`` has size strictly greater than ``larger_than``. + + Returns the observed size once the threshold is crossed. Raises ``TimeoutError`` + (via ``anyio.fail_after``) if the file does not grow past the threshold within + ``timeout`` seconds. Used by ``TestChildProcessCleanup`` to deterministically + wait for subprocess chains to start writing, replacing fixed ``sleep()`` durations + that caused flakiness on loaded CI runners. + """ + with anyio.fail_after(timeout): + while os.path.getsize(path) <= larger_than: + await anyio.sleep(0.05) + return os.path.getsize(path) + + class TestChildProcessCleanup: """Tests for child process cleanup functionality using _terminate_process_tree. @@ -255,86 +270,67 @@ async def test_basic_child_process_cleanup(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: marker_file = f.name - # Also create a file to verify parent started - with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: - parent_marker = f.name - - try: - # Parent script that spawns a child process - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Mark that parent started - with open({escape_path_for_python(parent_marker)}, 'w') as f: - f.write('parent started\\n') - - # Child script that writes continuously - child_script = f''' - import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"{time.time()}") - f.flush() - time.sleep(0.1) - ''' - - # Start the child process - child = subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent just sleeps + # Parent script that spawns a child process + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + + # Child script that writes continuously + child_script = f''' + import time + with open({escape_path_for_python(marker_file)}, 'a') as f: while True: + f.write(f"{time.time()}") + f.flush() time.sleep(0.1) - """ - ) + ''' - print("\nStarting child process termination test...") + # Start the child process + child = subprocess.Popen([sys.executable, '-c', child_script]) - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - - # Wait for processes to start - await anyio.sleep(0.5) + # Parent just sleeps + while True: + time.sleep(0.1) + """ + ) - # Verify parent started - assert os.path.exists(parent_marker), "Parent process didn't start" + proc = None + try: + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Verify child is writing - if os.path.exists(marker_file): # pragma: no branch - initial_size = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size_after_wait = os.path.getsize(marker_file) - assert size_after_wait > initial_size, "Child process should be writing" - print(f"Child is writing (file grew from {initial_size} to {size_after_wait} bytes)") + # Poll for child to start writing (bounded wait replaces fixed sleep(0.5)+sleep(0.3)) + initial_size = await _wait_for_file_size(marker_file, larger_than=0) + # Poll for further growth to confirm the child is actively writing in a loop + grown_size = await _wait_for_file_size(marker_file, larger_than=initial_size) + print(f"Child is writing (file grew from {initial_size} to {grown_size} bytes)") - # Terminate using our function - print("Terminating process and children...") + # Terminate using our function (the behavior under test) await _terminate_process_tree(proc) - - # Verify processes stopped - await anyio.sleep(0.5) - if os.path.exists(marker_file): # pragma: no branch - size_after_cleanup = os.path.getsize(marker_file) - await anyio.sleep(0.5) - final_size = os.path.getsize(marker_file) - - print(f"After cleanup: file size {size_after_cleanup} -> {final_size}") - assert final_size == size_after_cleanup, ( - f"Child process still running! File grew by {final_size - size_after_cleanup} bytes" - ) - - print("SUCCESS: Child process was properly terminated") + proc = None # Successfully terminated; skip redundant cleanup in finally + + # Verify child stopped writing. _terminate_process_tree on POSIX already polls + # for process group death; a single 0.3s check (3x child write interval) suffices. + size_after_term = os.path.getsize(marker_file) + await anyio.sleep(0.3) + final_size = os.path.getsize(marker_file) + assert final_size == size_after_term, ( + f"Child process still running! File grew by {final_size - size_after_term} bytes" + ) finally: - # Clean up files - for f in [marker_file, parent_marker]: - try: - os.unlink(f) - except OSError: # pragma: no cover - pass + # Ensure process tree is terminated even if an assertion above failed, + # preventing leaked subprocesses from causing knock-on failures in later tests. + if proc is not None: # pragma: no cover + # Only reached if an assertion failed before _terminate_process_tree; + # ensures the subprocess tree doesn't leak into later tests. + with anyio.move_on_after(5.0): + await _terminate_process_tree(proc) + try: + os.unlink(marker_file) + except OSError: # pragma: no cover + pass @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") @@ -350,81 +346,80 @@ async def test_nested_process_tree(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f3: grandchild_file = f3.name - try: - # Simple nested process tree test - # We create parent -> child -> grandchild, each writing to a file - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Child will spawn grandchild and write to child file - child_script = f'''import subprocess - import sys - import time - - # Grandchild just writes to file - grandchild_script = \"\"\"import time - with open({escape_path_for_python(grandchild_file)}, 'a') as f: - while True: - f.write(f"gc {{time.time()}}") - f.flush() - time.sleep(0.1)\"\"\" - - # Spawn grandchild - subprocess.Popen([sys.executable, '-c', grandchild_script]) - - # Child writes to its file - with open({escape_path_for_python(child_file)}, 'a') as f: - while True: - f.write(f"c {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Spawn child process - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent writes to its file - with open({escape_path_for_python(parent_file)}, 'a') as f: - while True: - f.write(f"p {time.time()}") - f.flush() - time.sleep(0.1) - """ - ) + # Simple nested process tree test + # We create parent -> child -> grandchild, each writing to a file + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import os + + # Child will spawn grandchild and write to child file + child_script = f'''import subprocess + import sys + import time + + # Grandchild just writes to file + grandchild_script = \"\"\"import time + with open({escape_path_for_python(grandchild_file)}, 'a') as f: + while True: + f.write(f"gc {{time.time()}}") + f.flush() + time.sleep(0.1)\"\"\" - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Spawn grandchild + subprocess.Popen([sys.executable, '-c', grandchild_script]) - # Let all processes start - await anyio.sleep(1.0) + # Child writes to its file + with open({escape_path_for_python(child_file)}, 'a') as f: + while True: + f.write(f"c {time.time()}") + f.flush() + time.sleep(0.1)''' - # Verify all are writing - for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]: - if os.path.exists(file_path): # pragma: no branch - initial_size = os.path.getsize(file_path) - await anyio.sleep(0.3) - new_size = os.path.getsize(file_path) - assert new_size > initial_size, f"{name} process should be writing" + # Spawn child process + subprocess.Popen([sys.executable, '-c', child_script]) - # Terminate the whole tree - await _terminate_process_tree(proc) + # Parent writes to its file + with open({escape_path_for_python(parent_file)}, 'a') as f: + while True: + f.write(f"p {time.time()}") + f.flush() + time.sleep(0.1) + """ + ) + + files = [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")] + proc = None + try: + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + + # Poll for each level of the tree to start writing (bounded wait). + # Grandchild is deepest, so once it's writing, the whole chain is up. + for file_path, name in files: + size = await _wait_for_file_size(file_path, larger_than=0) + await _wait_for_file_size(file_path, larger_than=size) + print(f"{name} is writing") - # Verify all stopped - await anyio.sleep(0.5) - for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]: - if os.path.exists(file_path): # pragma: no branch - size1 = os.path.getsize(file_path) - await anyio.sleep(0.3) - size2 = os.path.getsize(file_path) - assert size1 == size2, f"{name} still writing after cleanup!" + # Terminate the whole tree (the behavior under test) + await _terminate_process_tree(proc) + proc = None # Successfully terminated; skip redundant cleanup in finally - print("SUCCESS: All processes in tree terminated") + # Verify all stopped. Record sizes once, wait 3x write interval, check none grew. + sizes_after_term = {path: os.path.getsize(path) for path, _ in files} + await anyio.sleep(0.3) + for file_path, name in files: + final_size = os.path.getsize(file_path) + assert final_size == sizes_after_term[file_path], f"{name} still writing after cleanup!" finally: - # Clean up all marker files + # Ensure process tree is terminated even if an assertion above failed. + if proc is not None: # pragma: no cover + # Only reached if an assertion failed before _terminate_process_tree; + # ensures the subprocess tree doesn't leak into later tests. + with anyio.move_on_after(5.0): + await _terminate_process_tree(proc) for f in [parent_file, child_file, grandchild_file]: try: os.unlink(f) @@ -442,66 +437,63 @@ async def test_early_parent_exit(self): with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: marker_file = f.name - try: - # Parent that spawns child and waits briefly - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import signal - - # Child that continues running - child_script = f'''import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"child {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Start child in same process group - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent waits a bit then exits on SIGTERM - def handle_term(sig, frame): - sys.exit(0) - - signal.signal(signal.SIGTERM, handle_term) - - # Wait + # Parent that spawns child and exits immediately on SIGTERM + parent_script = textwrap.dedent( + f""" + import subprocess + import sys + import time + import signal + + # Child that continues running + child_script = f'''import time + with open({escape_path_for_python(marker_file)}, 'a') as f: while True: - time.sleep(0.1) - """ - ) + f.write(f"child {time.time()}") + f.flush() + time.sleep(0.1)''' - # Start the parent process - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + # Start child in same process group + subprocess.Popen([sys.executable, '-c', child_script]) - # Let child start writing - await anyio.sleep(0.5) + # Parent exits immediately on SIGTERM (the race this test exercises) + def handle_term(sig, frame): + sys.exit(0) - # Verify child is writing - if os.path.exists(marker_file): # pragma: no branch - size1 = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size2 = os.path.getsize(marker_file) - assert size2 > size1, "Child should be writing" + signal.signal(signal.SIGTERM, handle_term) - # Terminate - this will kill the process group even if parent exits first - await _terminate_process_tree(proc) + # Wait + while True: + time.sleep(0.1) + """ + ) + + proc = None + try: + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + + # Poll for child to start writing (bounded wait) + initial_size = await _wait_for_file_size(marker_file, larger_than=0) + await _wait_for_file_size(marker_file, larger_than=initial_size) - # Verify child stopped - await anyio.sleep(0.5) - if os.path.exists(marker_file): # pragma: no branch - size3 = os.path.getsize(marker_file) - await anyio.sleep(0.3) - size4 = os.path.getsize(marker_file) - assert size3 == size4, "Child should be terminated" + # Terminate — parent exits immediately on SIGTERM, but process group kill + # should still catch the child + await _terminate_process_tree(proc) + proc = None # Successfully terminated; skip redundant cleanup in finally - print("SUCCESS: Child terminated even with parent exit during cleanup") + # Verify child stopped writing + size_after_term = os.path.getsize(marker_file) + await anyio.sleep(0.3) + final_size = os.path.getsize(marker_file) + assert final_size == size_after_term, "Child should be terminated" finally: - # Clean up marker file + # Ensure process tree is terminated even if an assertion above failed. + if proc is not None: # pragma: no cover + # Only reached if an assertion failed before _terminate_process_tree; + # ensures the subprocess tree doesn't leak into later tests. + with anyio.move_on_after(5.0): + await _terminate_process_tree(proc) try: os.unlink(marker_file) except OSError: # pragma: no cover From 840c0db3cda505ae5fddc567893c8fc52dadd3f5 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:48:47 +0000 Subject: [PATCH 2/9] Rewrite TestChildProcessCleanup with socket-based deterministic liveness probe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the file-growth-polling approach with a fundamentally different design: each subprocess in the tree connects a TCP socket back to a listener owned by the test. Liveness and death are then detected via blocking I/O operations that unblock on kernel-level events — zero sleeps, zero polling: 1. await listener.accept() blocks until the subprocess connects (proves it started). 2. After _terminate_process_tree(), await stream.receive(1) raises EndOfStream because the kernel closes all file descriptors of a terminated process. This is direct, OS-guaranteed proof that the child is dead. Compared to the original file-watching approach: - No tempfiles, no file I/O, no path escaping - No fixed sleep() durations (the root cause of flakiness) - No polling loops - Failure modes are clear: accept times out = child never started; receive times out = child survived termination (actual bug) Also fixes the process leak on assertion failure by adding proc cleanup to each finally block (guarded by proc=None after successful in-test termination to avoid noisy double-terminate). Result: 3 tests drop from ~7.1s to ~0.5s, 30/30 pass under 4-worker parallel load. Removes 80 net lines and the tempfile/path-escape deps. Closes #1775 Github-Issue: #1775 --- tests/client/test_stdio.py | 382 +++++++++++++++---------------------- 1 file changed, 155 insertions(+), 227 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 8f5495c4d..c98d6984b 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -1,12 +1,11 @@ import errno -import os import shutil import sys -import tempfile import textwrap import time import anyio +import anyio.abc import pytest from mcp.client.session import ClientSession @@ -20,8 +19,6 @@ from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse -from ..shared.test_win32_utils import escape_path_for_python - # Timeout for cleanup of processes that ignore SIGTERM # This timeout ensures the test fails quickly if the cleanup logic doesn't have # proper fallback mechanisms (SIGINT/SIGKILL) for processes that ignore SIGTERM @@ -221,283 +218,214 @@ def sigint_handler(signum, frame): raise -async def _wait_for_file_size(path: str, larger_than: int = 0, *, timeout: float = 10.0) -> int: - """Poll until the file at ``path`` has size strictly greater than ``larger_than``. +# --------------------------------------------------------------------------- +# TestChildProcessCleanup — socket-based deterministic child liveness probe +# --------------------------------------------------------------------------- +# +# These tests verify that `_terminate_process_tree()` kills the *entire* process +# tree (not just the immediate child), which is critical for cleaning up tools +# like `npx` that spawn their own subprocesses. +# +# Mechanism: each subprocess in the tree connects a TCP socket back to a +# listener owned by the test. We then use two kernel-guaranteed blocking-I/O +# signals — neither requires any `sleep()` or polling loop: +# +# 1. `await listener.accept()` blocks until the subprocess connects, +# proving it is running. +# 2. After `_terminate_process_tree()`, `await stream.receive(1)` raises +# `EndOfStream` (or `ConnectionResetError` on some platforms) because the +# kernel closes all file descriptors — including sockets — when a process +# terminates. This is the direct, OS-level proof that the child is dead. +# +# This replaces an older file-growth-watching approach whose fixed `sleep()` +# durations raced against slow Python interpreter startup on loaded CI runners. + + +def _connect_back_script(port: int) -> str: + """Return a ``python -c`` script body that connects to the given port, + sends ``b'alive'``, then blocks forever. Used by TestChildProcessCleanup + subprocesses as a liveness probe.""" + return ( + f"import socket, time\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"time.sleep(3600)\n" + ) + + +def _spawn_then_block(child_script: str) -> str: + """Return a ``python -c`` script body that spawns ``child_script`` as a + subprocess, then blocks forever. The ``!r`` injection avoids nested-quote + escaping for arbitrary child script content.""" + return ( + f"import subprocess, sys, time\nsubprocess.Popen([sys.executable, '-c', {child_script!r}])\ntime.sleep(3600)\n" + ) + + +async def _open_liveness_listener() -> tuple[anyio.abc.SocketListener, int]: + """Open a TCP listener on localhost and return it along with its port.""" + multi = await anyio.create_tcp_listener(local_host="127.0.0.1") + sock = multi.listeners[0] + assert isinstance(sock, anyio.abc.SocketListener) + addr = sock.extra(anyio.abc.SocketAttribute.local_address) + # IPv4 local_address is (host: str, port: int) + assert isinstance(addr, tuple) and len(addr) >= 2 and isinstance(addr[1], int) + return sock, addr[1] + - Returns the observed size once the threshold is crossed. Raises ``TimeoutError`` - (via ``anyio.fail_after``) if the file does not grow past the threshold within - ``timeout`` seconds. Used by ``TestChildProcessCleanup`` to deterministically - wait for subprocess chains to start writing, replacing fixed ``sleep()`` durations - that caused flakiness on loaded CI runners. +async def _accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStream: + """Accept one connection and assert the peer sent ``b'alive'``. + + Blocks deterministically until a subprocess connects (no polling). The + outer test bounds this with ``anyio.fail_after`` to catch the case where + the subprocess chain failed to start. + """ + stream = await sock.accept() + msg = await stream.receive(5) + assert msg == b"alive", f"expected b'alive', got {msg!r}" + return stream + + +async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: + """Assert the peer holding the other end of ``stream`` has terminated. + + When a process dies, the kernel closes all its file descriptors. The next + ``receive()`` on the peer socket returns EOF (raises ``anyio.EndOfStream``) + or, if the process was killed abruptly, the connection is reset (raises + ``ConnectionResetError``). Either is a deterministic, kernel-level signal + that the process is dead — no sleeps or polling required. """ - with anyio.fail_after(timeout): - while os.path.getsize(path) <= larger_than: - await anyio.sleep(0.05) - return os.path.getsize(path) + with anyio.fail_after(5.0): + try: + data = await stream.receive(1) + except (anyio.EndOfStream, ConnectionResetError): + return + pytest.fail( # pragma: no cover + f"subprocess still alive after _terminate_process_tree (received {data!r})" + ) class TestChildProcessCleanup: - """Tests for child process cleanup functionality using _terminate_process_tree. - - These tests verify that child processes are properly terminated when the parent - is killed, addressing the issue where processes like npx spawn child processes - that need to be cleaned up. The tests cover various process tree scenarios: - - - Basic parent-child relationship (single child process) - - Multi-level process trees (parent → child → grandchild) - - Race conditions where parent exits during cleanup - - Note on Windows ResourceWarning: - On Windows, we may see ResourceWarning about subprocess still running. This is - expected behavior due to how Windows process termination works: - - anyio's process.terminate() calls Windows TerminateProcess() API - - TerminateProcess() immediately kills the process without allowing cleanup - - subprocess.Popen objects in the killed process can't run their cleanup code - - Python detects this during garbage collection and issues a ResourceWarning - - This warning does NOT indicate a process leak - the processes are properly - terminated. It only means the Popen objects couldn't clean up gracefully. - This is a fundamental difference between Windows and Unix process termination. + """Integration tests for ``_terminate_process_tree`` covering basic, + nested, and early-parent-exit process tree scenarios. See module-level + comment above for the socket-based liveness probe mechanism. """ @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_basic_child_process_cleanup(self): - """Test basic parent-child process cleanup. - Parent spawns a single child process that writes continuously to a file. - """ - # Create a marker file for the child process to write to - with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: - marker_file = f.name - - # Parent script that spawns a child process - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - - # Child script that writes continuously - child_script = f''' - import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"{time.time()}") - f.flush() - time.sleep(0.1) - ''' - - # Start the child process - child = subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent just sleeps - while True: - time.sleep(0.1) - """ - ) - + """Parent spawns one child; terminating the tree kills both.""" + sock, port = await _open_liveness_listener() + stream: anyio.abc.SocketStream | None = None proc = None try: + # Parent spawns a child; the child connects back to us. + parent_script = _spawn_then_block(_connect_back_script(port)) proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Poll for child to start writing (bounded wait replaces fixed sleep(0.5)+sleep(0.3)) - initial_size = await _wait_for_file_size(marker_file, larger_than=0) - # Poll for further growth to confirm the child is actively writing in a loop - grown_size = await _wait_for_file_size(marker_file, larger_than=initial_size) - print(f"Child is writing (file grew from {initial_size} to {grown_size} bytes)") + # Deterministic: accept() blocks until the child connects. No sleep. + with anyio.fail_after(10.0): + stream = await _accept_alive(sock) - # Terminate using our function (the behavior under test) + # Terminate the process tree (the behavior under test). await _terminate_process_tree(proc) - proc = None # Successfully terminated; skip redundant cleanup in finally - - # Verify child stopped writing. _terminate_process_tree on POSIX already polls - # for process group death; a single 0.3s check (3x child write interval) suffices. - size_after_term = os.path.getsize(marker_file) - await anyio.sleep(0.3) - final_size = os.path.getsize(marker_file) - assert final_size == size_after_term, ( - f"Child process still running! File grew by {final_size - size_after_term} bytes" - ) + proc = None + + # Deterministic: kernel closed child's socket when it died. + await _assert_stream_closed(stream) finally: - # Ensure process tree is terminated even if an assertion above failed, - # preventing leaked subprocesses from causing knock-on failures in later tests. if proc is not None: # pragma: no cover - # Only reached if an assertion failed before _terminate_process_tree; - # ensures the subprocess tree doesn't leak into later tests. with anyio.move_on_after(5.0): await _terminate_process_tree(proc) - try: - os.unlink(marker_file) - except OSError: # pragma: no cover - pass + if stream is not None: # pragma: no branch + await stream.aclose() + await sock.aclose() @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_nested_process_tree(self): - """Test nested process tree cleanup (parent → child → grandchild). - Each level writes to a different file to verify all processes are terminated. - """ - # Create temporary files for each process level - with tempfile.NamedTemporaryFile(mode="w", delete=False) as f1: - parent_file = f1.name - with tempfile.NamedTemporaryFile(mode="w", delete=False) as f2: - child_file = f2.name - with tempfile.NamedTemporaryFile(mode="w", delete=False) as f3: - grandchild_file = f3.name - - # Simple nested process tree test - # We create parent -> child -> grandchild, each writing to a file - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import os - - # Child will spawn grandchild and write to child file - child_script = f'''import subprocess - import sys - import time - - # Grandchild just writes to file - grandchild_script = \"\"\"import time - with open({escape_path_for_python(grandchild_file)}, 'a') as f: - while True: - f.write(f"gc {{time.time()}}") - f.flush() - time.sleep(0.1)\"\"\" - - # Spawn grandchild - subprocess.Popen([sys.executable, '-c', grandchild_script]) - - # Child writes to its file - with open({escape_path_for_python(child_file)}, 'a') as f: - while True: - f.write(f"c {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Spawn child process - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent writes to its file - with open({escape_path_for_python(parent_file)}, 'a') as f: - while True: - f.write(f"p {time.time()}") - f.flush() - time.sleep(0.1) - """ - ) - - files = [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")] + """Parent → child → grandchild; terminating the tree kills all three.""" + sock, port = await _open_liveness_listener() + streams: list[anyio.abc.SocketStream] = [] proc = None try: + # Build a three-level chain: parent spawns child, child spawns + # grandchild. Every level connects back to our socket. + grandchild = _connect_back_script(port) + child = ( + f"import subprocess, sys\n" + f"subprocess.Popen([sys.executable, '-c', {grandchild!r}])\n" + _connect_back_script(port) + ) + parent_script = ( + f"import subprocess, sys\n" + f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + _connect_back_script(port) + ) proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Poll for each level of the tree to start writing (bounded wait). - # Grandchild is deepest, so once it's writing, the whole chain is up. - for file_path, name in files: - size = await _wait_for_file_size(file_path, larger_than=0) - await _wait_for_file_size(file_path, larger_than=size) - print(f"{name} is writing") + # Deterministic: three blocking accepts, one per tree level. + with anyio.fail_after(10.0): + for _ in range(3): + # Append-in-loop intentional: preserves partially-accepted + # streams for cleanup in `finally` if a later accept fails. + streams.append(await _accept_alive(sock)) # noqa: PERF401 - # Terminate the whole tree (the behavior under test) + # Terminate the entire tree. await _terminate_process_tree(proc) - proc = None # Successfully terminated; skip redundant cleanup in finally + proc = None - # Verify all stopped. Record sizes once, wait 3x write interval, check none grew. - sizes_after_term = {path: os.path.getsize(path) for path, _ in files} - await anyio.sleep(0.3) - for file_path, name in files: - final_size = os.path.getsize(file_path) - assert final_size == sizes_after_term[file_path], f"{name} still writing after cleanup!" + # Every level of the tree must be dead: three kernel-level EOFs. + for stream in streams: + await _assert_stream_closed(stream) finally: - # Ensure process tree is terminated even if an assertion above failed. if proc is not None: # pragma: no cover - # Only reached if an assertion failed before _terminate_process_tree; - # ensures the subprocess tree doesn't leak into later tests. with anyio.move_on_after(5.0): await _terminate_process_tree(proc) - for f in [parent_file, child_file, grandchild_file]: - try: - os.unlink(f) - except OSError: # pragma: no cover - pass + for stream in streams: + await stream.aclose() + await sock.aclose() @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_early_parent_exit(self): - """Test cleanup when parent exits during termination sequence. - Tests the race condition where parent might die during our termination - sequence but we can still clean up the children via the process group. + """Parent exits immediately on SIGTERM; process-group termination still + catches the child (exercises the race where the parent dies mid-cleanup). """ - # Create a temporary file for the child - with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: - marker_file = f.name - - # Parent that spawns child and exits immediately on SIGTERM - parent_script = textwrap.dedent( - f""" - import subprocess - import sys - import time - import signal - - # Child that continues running - child_script = f'''import time - with open({escape_path_for_python(marker_file)}, 'a') as f: - while True: - f.write(f"child {time.time()}") - f.flush() - time.sleep(0.1)''' - - # Start child in same process group - subprocess.Popen([sys.executable, '-c', child_script]) - - # Parent exits immediately on SIGTERM (the race this test exercises) - def handle_term(sig, frame): - sys.exit(0) - - signal.signal(signal.SIGTERM, handle_term) - - # Wait - while True: - time.sleep(0.1) - """ - ) - + sock, port = await _open_liveness_listener() + stream: anyio.abc.SocketStream | None = None proc = None try: + # Parent installs a SIGTERM handler that exits immediately, spawns a + # child that connects back to us, then blocks. + child = _connect_back_script(port) + parent_script = ( + f"import signal, subprocess, sys, time\n" + f"signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))\n" + f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + f"time.sleep(3600)\n" + ) proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - # Poll for child to start writing (bounded wait) - initial_size = await _wait_for_file_size(marker_file, larger_than=0) - await _wait_for_file_size(marker_file, larger_than=initial_size) + # Deterministic: child connected means both parent and child are up. + with anyio.fail_after(10.0): + stream = await _accept_alive(sock) - # Terminate — parent exits immediately on SIGTERM, but process group kill - # should still catch the child + # Parent will sys.exit(0) on SIGTERM, but the process-group kill + # (POSIX killpg / Windows Job Object) must still terminate the child. await _terminate_process_tree(proc) - proc = None # Successfully terminated; skip redundant cleanup in finally + proc = None - # Verify child stopped writing - size_after_term = os.path.getsize(marker_file) - await anyio.sleep(0.3) - final_size = os.path.getsize(marker_file) - assert final_size == size_after_term, "Child should be terminated" + # Child must be dead despite parent's early exit. + await _assert_stream_closed(stream) finally: - # Ensure process tree is terminated even if an assertion above failed. if proc is not None: # pragma: no cover - # Only reached if an assertion failed before _terminate_process_tree; - # ensures the subprocess tree doesn't leak into later tests. with anyio.move_on_after(5.0): await _terminate_process_tree(proc) - try: - os.unlink(marker_file) - except OSError: # pragma: no cover - pass + if stream is not None: # pragma: no branch + await stream.aclose() + await sock.aclose() @pytest.mark.anyio From 324bbc27bda777a85c4dc91c5d81b253dfce16ff Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:38:15 +0000 Subject: [PATCH 3/9] Handle BrokenResourceError in _assert_stream_closed for Windows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Windows, TerminateJobObject causes an abrupt TCP RST (not clean FIN), so anyio raises BrokenResourceError instead of EndOfStream when reading from a socket held by the terminated child. Both exceptions indicate the peer is dead — catch both. Github-Issue: #1775 --- tests/client/test_stdio.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index c98d6984b..3fa9a737c 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -233,9 +233,10 @@ def sigint_handler(signum, frame): # 1. `await listener.accept()` blocks until the subprocess connects, # proving it is running. # 2. After `_terminate_process_tree()`, `await stream.receive(1)` raises -# `EndOfStream` (or `ConnectionResetError` on some platforms) because the -# kernel closes all file descriptors — including sockets — when a process -# terminates. This is the direct, OS-level proof that the child is dead. +# `EndOfStream` (clean close / FIN) or `BrokenResourceError` (abrupt +# close / RST — typical on Windows after TerminateJobObject) because the +# kernel closes all file descriptors when a process terminates. Either +# is the direct, OS-level proof that the child is dead. # # This replaces an older file-growth-watching approach whose fixed `sleep()` # durations raced against slow Python interpreter startup on loaded CI runners. @@ -289,16 +290,21 @@ async def _accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStrea async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: """Assert the peer holding the other end of ``stream`` has terminated. - When a process dies, the kernel closes all its file descriptors. The next - ``receive()`` on the peer socket returns EOF (raises ``anyio.EndOfStream``) - or, if the process was killed abruptly, the connection is reset (raises - ``ConnectionResetError``). Either is a deterministic, kernel-level signal - that the process is dead — no sleeps or polling required. + When a process dies, the kernel closes its file descriptors including + sockets. The next ``receive()`` on the peer socket unblocks with one of: + + - ``anyio.EndOfStream`` — clean close (FIN), typical after graceful exit + or POSIX ``SIGTERM``. + - ``anyio.BrokenResourceError`` — abrupt close (RST), typical after + Windows ``TerminateJobObject`` or POSIX ``SIGKILL``. + + Either is a deterministic, kernel-level signal that the process is dead — + no sleeps or polling required. """ with anyio.fail_after(5.0): try: data = await stream.receive(1) - except (anyio.EndOfStream, ConnectionResetError): + except (anyio.EndOfStream, anyio.BrokenResourceError): return pytest.fail( # pragma: no cover f"subprocess still alive after _terminate_process_tree (received {data!r})" From 160999f7066205f385d4f1fa712f0cfb27f8e27a Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:14:44 +0000 Subject: [PATCH 4/9] Reap process and close pipe transports after _terminate_process_tree MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _terminate_process_tree kills the OS process group / Job Object but does not call process.wait() or close the anyio stdin/stdout pipe wrappers. On Windows, the resulting _WindowsSubprocessTransport / PipeHandle objects are GC'd after the test's filterwarnings scope has exited, triggering PytestUnraisableExceptionWarning in whatever test runs next on the same worker. The old file-watching tests masked this because their ~1.8s of sleep() calls per test gave the asyncio event loop's child watcher time to fire _process_exited() callbacks (which close the transport) during the test. The new socket-based tests are ~0.15s each — too fast for that callback to fire before GC. The SDK's production path (stdio.py:180) avoids this via 'async with process:', whose __aexit__ handles reaping + stream closure. These tests call _terminate_process_tree directly, so they must do the same cleanup explicitly. Added _terminate_and_reap() helper that runs unconditionally in each test's finally: terminate tree (no-op if already dead), await wait() to reap, close stdin/stdout pipe wrappers. Bounded by move_on_after(5). Github-Issue: #1775 --- tests/client/test_stdio.py | 45 ++++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 3fa9a737c..e1d6bcecc 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -15,6 +15,7 @@ _terminate_process_tree, stdio_client, ) +from mcp.os.win32.utilities import FallbackProcess from mcp.shared.exceptions import MCPError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse @@ -311,6 +312,32 @@ async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: ) +async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None: + """Terminate the process tree, then reap and close pipe transports. + + ``_terminate_process_tree`` kills the OS process group / Job Object but does + not call ``process.wait()`` or close stdin/stdout pipe wrappers. On Windows, + the resulting ``_WindowsSubprocessTransport`` / ``PipeHandle`` objects are + then GC'd after the test's ``filterwarnings`` scope has exited, triggering + ``PytestUnraisableExceptionWarning`` in whatever test happens to run next. + + The SDK's production path (``stdio.py``) avoids this via ``async with process:`` + which wraps termination in a context manager whose ``__aexit__`` handles + reaping + stream closure. These tests call ``_terminate_process_tree`` + directly, so they must do the same cleanup explicitly. + + Safe to call on an already-terminated process (termination no-ops, wait + returns immediately). Bounded by ``move_on_after`` to prevent hangs. + """ + with anyio.move_on_after(5.0): + await _terminate_process_tree(proc) + await proc.wait() + if proc.stdin is not None: # pragma: no branch + await proc.stdin.aclose() + if proc.stdout is not None: # pragma: no branch + await proc.stdout.aclose() + + class TestChildProcessCleanup: """Integration tests for ``_terminate_process_tree`` covering basic, nested, and early-parent-exit process tree scenarios. See module-level @@ -335,15 +362,13 @@ async def test_basic_child_process_cleanup(self): # Terminate the process tree (the behavior under test). await _terminate_process_tree(proc) - proc = None # Deterministic: kernel closed child's socket when it died. await _assert_stream_closed(stream) finally: - if proc is not None: # pragma: no cover - with anyio.move_on_after(5.0): - await _terminate_process_tree(proc) + if proc is not None: # pragma: no branch + await _terminate_and_reap(proc) if stream is not None: # pragma: no branch await stream.aclose() await sock.aclose() @@ -378,16 +403,14 @@ async def test_nested_process_tree(self): # Terminate the entire tree. await _terminate_process_tree(proc) - proc = None # Every level of the tree must be dead: three kernel-level EOFs. for stream in streams: await _assert_stream_closed(stream) finally: - if proc is not None: # pragma: no cover - with anyio.move_on_after(5.0): - await _terminate_process_tree(proc) + if proc is not None: # pragma: no branch + await _terminate_and_reap(proc) for stream in streams: await stream.aclose() await sock.aclose() @@ -420,15 +443,13 @@ async def test_early_parent_exit(self): # Parent will sys.exit(0) on SIGTERM, but the process-group kill # (POSIX killpg / Windows Job Object) must still terminate the child. await _terminate_process_tree(proc) - proc = None # Child must be dead despite parent's early exit. await _assert_stream_closed(stream) finally: - if proc is not None: # pragma: no cover - with anyio.move_on_after(5.0): - await _terminate_process_tree(proc) + if proc is not None: # pragma: no branch + await _terminate_and_reap(proc) if stream is not None: # pragma: no branch await stream.aclose() await sock.aclose() From 0e80918c8ed96219d8ab604ec7f3a4f4f6bbdad4 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:19:49 +0000 Subject: [PATCH 5/9] Eliminate all coverage pragmas via AsyncExitStack + pytest.raises MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced 8 coverage pragmas with restructuring: - pytest.fail() # pragma: no cover → pytest.raises() — if the child survives, pytest.raises fails with "DID NOT RAISE" (library handles it) - if proc.stdin is not None: # pragma: no branch → assert proc.stdin is not None — satisfies pyright Optional narrowing; asserts are statements not branches, so coverage doesn't need both paths - if proc is not None: # pragma: no branch (×3 in finally blocks) → AsyncExitStack.push_async_callback(_terminate_and_reap, proc) immediately after proc is created. Cleanup is guaranteed, no None-check needed. - if stream is not None: # pragma: no branch (×2 in finally blocks) → Same: push stream.aclose as a callback right after accept() Also dropped the # noqa: PERF401 — the accept loop now does stream acquisition + cleanup registration + list append (3 ops), which doesn't match PERF401's single-append pattern. Net: all 3 tests are single-indent, zero try/finally boilerplate, zero pragmas, zero noqa. Still 100% coverage, ~0.5s for all 3 tests. Github-Issue: #1775 --- tests/client/test_stdio.py | 76 ++++++++++++++------------------------ 1 file changed, 28 insertions(+), 48 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index e1d6bcecc..f70d5ef6c 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -3,6 +3,7 @@ import sys import textwrap import time +from contextlib import AsyncExitStack import anyio import anyio.abc @@ -302,14 +303,8 @@ async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: Either is a deterministic, kernel-level signal that the process is dead — no sleeps or polling required. """ - with anyio.fail_after(5.0): - try: - data = await stream.receive(1) - except (anyio.EndOfStream, anyio.BrokenResourceError): - return - pytest.fail( # pragma: no cover - f"subprocess still alive after _terminate_process_tree (received {data!r})" - ) + with anyio.fail_after(5.0), pytest.raises((anyio.EndOfStream, anyio.BrokenResourceError)): + await stream.receive(1) async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None: @@ -332,10 +327,10 @@ async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None with anyio.move_on_after(5.0): await _terminate_process_tree(proc) await proc.wait() - if proc.stdin is not None: # pragma: no branch - await proc.stdin.aclose() - if proc.stdout is not None: # pragma: no branch - await proc.stdout.aclose() + assert proc.stdin is not None + assert proc.stdout is not None + await proc.stdin.aclose() + await proc.stdout.aclose() class TestChildProcessCleanup: @@ -348,17 +343,19 @@ class TestChildProcessCleanup: @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_basic_child_process_cleanup(self): """Parent spawns one child; terminating the tree kills both.""" - sock, port = await _open_liveness_listener() - stream: anyio.abc.SocketStream | None = None - proc = None - try: + async with AsyncExitStack() as stack: + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + # Parent spawns a child; the child connects back to us. parent_script = _spawn_then_block(_connect_back_script(port)) proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + stack.push_async_callback(_terminate_and_reap, proc) # Deterministic: accept() blocks until the child connects. No sleep. with anyio.fail_after(10.0): stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) # Terminate the process tree (the behavior under test). await _terminate_process_tree(proc) @@ -366,21 +363,14 @@ async def test_basic_child_process_cleanup(self): # Deterministic: kernel closed child's socket when it died. await _assert_stream_closed(stream) - finally: - if proc is not None: # pragma: no branch - await _terminate_and_reap(proc) - if stream is not None: # pragma: no branch - await stream.aclose() - await sock.aclose() - @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_nested_process_tree(self): """Parent → child → grandchild; terminating the tree kills all three.""" - sock, port = await _open_liveness_listener() - streams: list[anyio.abc.SocketStream] = [] - proc = None - try: + async with AsyncExitStack() as stack: + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + # Build a three-level chain: parent spawns child, child spawns # grandchild. Every level connects back to our socket. grandchild = _connect_back_script(port) @@ -393,13 +383,15 @@ async def test_nested_process_tree(self): f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + _connect_back_script(port) ) proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + stack.push_async_callback(_terminate_and_reap, proc) # Deterministic: three blocking accepts, one per tree level. + streams: list[anyio.abc.SocketStream] = [] with anyio.fail_after(10.0): for _ in range(3): - # Append-in-loop intentional: preserves partially-accepted - # streams for cleanup in `finally` if a later accept fails. - streams.append(await _accept_alive(sock)) # noqa: PERF401 + stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) + streams.append(stream) # Terminate the entire tree. await _terminate_process_tree(proc) @@ -408,23 +400,16 @@ async def test_nested_process_tree(self): for stream in streams: await _assert_stream_closed(stream) - finally: - if proc is not None: # pragma: no branch - await _terminate_and_reap(proc) - for stream in streams: - await stream.aclose() - await sock.aclose() - @pytest.mark.anyio @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_early_parent_exit(self): """Parent exits immediately on SIGTERM; process-group termination still catches the child (exercises the race where the parent dies mid-cleanup). """ - sock, port = await _open_liveness_listener() - stream: anyio.abc.SocketStream | None = None - proc = None - try: + async with AsyncExitStack() as stack: + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + # Parent installs a SIGTERM handler that exits immediately, spawns a # child that connects back to us, then blocks. child = _connect_back_script(port) @@ -435,10 +420,12 @@ async def test_early_parent_exit(self): f"time.sleep(3600)\n" ) proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) + stack.push_async_callback(_terminate_and_reap, proc) # Deterministic: child connected means both parent and child are up. with anyio.fail_after(10.0): stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) # Parent will sys.exit(0) on SIGTERM, but the process-group kill # (POSIX killpg / Windows Job Object) must still terminate the child. @@ -447,13 +434,6 @@ async def test_early_parent_exit(self): # Child must be dead despite parent's early exit. await _assert_stream_closed(stream) - finally: - if proc is not None: # pragma: no branch - await _terminate_and_reap(proc) - if stream is not None: # pragma: no branch - await stream.aclose() - await sock.aclose() - @pytest.mark.anyio async def test_stdio_client_graceful_stdin_exit(): From 54295dd7e750d4018b152daef6da768de4e894a5 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:55:38 +0000 Subject: [PATCH 6/9] Remove now-unnecessary filterwarnings("ignore::ResourceWarning") decorators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old tests had filterwarnings("ignore::ResourceWarning" if win32 ...) because _terminate_process_tree didn't call process.wait() or close pipe transports, so GC of the anyio Process wrapper emitted ResourceWarning. _terminate_and_reap now explicitly calls wait() + closes stdin/stdout pipes in the test's AsyncExitStack teardown (i.e., during the test, before GC can fire). The warnings should no longer occur — if they do, CI will tell us and we should fix the root cause rather than suppress. Github-Issue: #1775 --- tests/client/test_stdio.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index f70d5ef6c..b7665d8b7 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -340,7 +340,6 @@ class TestChildProcessCleanup: """ @pytest.mark.anyio - @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_basic_child_process_cleanup(self): """Parent spawns one child; terminating the tree kills both.""" async with AsyncExitStack() as stack: @@ -364,7 +363,6 @@ async def test_basic_child_process_cleanup(self): await _assert_stream_closed(stream) @pytest.mark.anyio - @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_nested_process_tree(self): """Parent → child → grandchild; terminating the tree kills all three.""" async with AsyncExitStack() as stack: @@ -401,7 +399,6 @@ async def test_nested_process_tree(self): await _assert_stream_closed(stream) @pytest.mark.anyio - @pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") async def test_early_parent_exit(self): """Parent exits immediately on SIGTERM; process-group termination still catches the child (exercises the race where the parent dies mid-cleanup). From a0703562708685e15b5e3f90c4bc06004983bdd7 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:07:04 +0000 Subject: [PATCH 7/9] Guard double-terminate on returncode; make _terminate_and_reap idempotent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback: calling _terminate_process_tree on an already-reaped POSIX process hits the fallback path in terminate_posix_process_tree (os.getpgid on a stale PID raises ProcessLookupError), which emits two WARNING logs and an ERROR with traceback. These are visible because pyproject.toml sets log_cli=true. Previously, each test called _terminate_process_tree in the body and the AsyncExitStack called _terminate_and_reap (which re-called _terminate_process_tree) at exit — double-terminate, nine spurious log records per run. Fix: _terminate_and_reap now checks proc.returncode before terminating. On POSIX, the asyncio child watcher sets returncode during _terminate_process_tree's polling-loop yield, so by the time cleanup runs it's populated and the guard skips the noisy re-terminate. On Windows/FallbackProcess (no returncode attribute), getattr returns None and terminate runs — but the Windows path is silent on double-call. Restructured tests to call _terminate_and_reap (which wraps _terminate_process_tree) as the action under test, and let the ExitStack safety-net call it again. Since wait() and stream.aclose() are idempotent, this is safe — and it covers both branches of the guard (first call: returncode is None → terminate; second call: returncode set → skip). 100% coverage, still zero pragmas. Github-Issue: #1775 --- tests/client/test_stdio.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index b7665d8b7..d3396f5d2 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -321,11 +321,17 @@ async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None reaping + stream closure. These tests call ``_terminate_process_tree`` directly, so they must do the same cleanup explicitly. - Safe to call on an already-terminated process (termination no-ops, wait - returns immediately). Bounded by ``move_on_after`` to prevent hangs. + Idempotent: the ``returncode`` guard skips termination if the process has + already been reaped (calling ``_terminate_process_tree`` on a reaped POSIX + process hits the fallback path in ``terminate_posix_process_tree`` and emits + spurious WARNING/ERROR logs, visible because ``log_cli = true``); ``wait()`` + and stream ``aclose()`` are no-ops on subsequent calls. The tests call this + explicitly as the action under test, and ``AsyncExitStack`` calls it again + on exit as a safety net for early failures. Bounded by ``move_on_after``. """ with anyio.move_on_after(5.0): - await _terminate_process_tree(proc) + if getattr(proc, "returncode", None) is None: + await _terminate_process_tree(proc) await proc.wait() assert proc.stdin is not None assert proc.stdout is not None @@ -356,8 +362,9 @@ async def test_basic_child_process_cleanup(self): stream = await _accept_alive(sock) stack.push_async_callback(stream.aclose) - # Terminate the process tree (the behavior under test). - await _terminate_process_tree(proc) + # Terminate, reap and close transports (wraps _terminate_process_tree, + # the behavior under test). + await _terminate_and_reap(proc) # Deterministic: kernel closed child's socket when it died. await _assert_stream_closed(stream) @@ -391,8 +398,8 @@ async def test_nested_process_tree(self): stack.push_async_callback(stream.aclose) streams.append(stream) - # Terminate the entire tree. - await _terminate_process_tree(proc) + # Terminate the entire tree (wraps _terminate_process_tree). + await _terminate_and_reap(proc) # Every level of the tree must be dead: three kernel-level EOFs. for stream in streams: @@ -426,7 +433,7 @@ async def test_early_parent_exit(self): # Parent will sys.exit(0) on SIGTERM, but the process-group kill # (POSIX killpg / Windows Job Object) must still terminate the child. - await _terminate_process_tree(proc) + await _terminate_and_reap(proc) # Child must be dead despite parent's early exit. await _assert_stream_closed(stream) From d8c7de234b2c1016e820c3d33bb674a5b36309c2 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:50:07 +0000 Subject: [PATCH 8/9] Drain stdout to EOF so _ProactorReadPipeTransport closes on Windows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One remaining leak on Windows CI: _ProactorReadPipeTransport (stdout) was never closed, triggering ResourceWarning in a later test. Root cause: anyio's StreamReaderWrapper.aclose() only calls set_exception(ClosedResourceError()) on the Python-level StreamReader — it never touches the underlying transport. On Windows, _ProactorReadPipeTransport starts with _paused=True and only detects pipe EOF when someone reads. The EOF-detection path is what calls transport.close(): _eof_received() → self.close() → _sock = None. Without a read, the transport lives until __del__. Production stdio.py avoids this because its stdout_reader task continuously reads stdout, so EOF is detected naturally. These tests never read stdout, so the transport stays paused with an orphaned overlapped read until GC. Fix: drain stdout with a single receive() call in _terminate_and_reap. The process is already dead, so the read immediately gets EOF (EndOfStream on clean close, BrokenResourceError on abrupt RST, ClosedResourceError on second call when already aclose()'d — all caught by contextlib.suppress). Note: the agent's suggested fix (stack.enter_async_context(proc) for Process.__aexit__) wouldn't help — __aexit__ calls aclose() which has the same stdout.aclose() issue. The drain is what actually closes the transport. Github-Issue: #1775 --- tests/client/test_stdio.py | 45 ++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index d3396f5d2..ce3eb3f60 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -3,7 +3,7 @@ import sys import textwrap import time -from contextlib import AsyncExitStack +from contextlib import AsyncExitStack, suppress import anyio import anyio.abc @@ -308,26 +308,31 @@ async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None: - """Terminate the process tree, then reap and close pipe transports. + """Terminate the process tree, reap, and tear down pipe transports. ``_terminate_process_tree`` kills the OS process group / Job Object but does - not call ``process.wait()`` or close stdin/stdout pipe wrappers. On Windows, - the resulting ``_WindowsSubprocessTransport`` / ``PipeHandle`` objects are - then GC'd after the test's ``filterwarnings`` scope has exited, triggering - ``PytestUnraisableExceptionWarning`` in whatever test happens to run next. - - The SDK's production path (``stdio.py``) avoids this via ``async with process:`` - which wraps termination in a context manager whose ``__aexit__`` handles - reaping + stream closure. These tests call ``_terminate_process_tree`` - directly, so they must do the same cleanup explicitly. - - Idempotent: the ``returncode`` guard skips termination if the process has - already been reaped (calling ``_terminate_process_tree`` on a reaped POSIX - process hits the fallback path in ``terminate_posix_process_tree`` and emits - spurious WARNING/ERROR logs, visible because ``log_cli = true``); ``wait()`` - and stream ``aclose()`` are no-ops on subsequent calls. The tests call this - explicitly as the action under test, and ``AsyncExitStack`` calls it again - on exit as a safety net for early failures. Bounded by ``move_on_after``. + not call ``process.wait()`` or clean up the asyncio pipe transports. On + Windows those transports leak and emit ``ResourceWarning`` when GC'd in a + later test, causing ``PytestUnraisableExceptionWarning`` knock-on failures. + + Production ``stdio.py`` avoids this via its ``stdout_reader`` task which + reads stdout to EOF (triggering ``_ProactorReadPipeTransport._eof_received`` + → ``close()``) plus ``async with process:`` which waits and closes stdin. + These tests call ``_terminate_process_tree`` directly, so they replicate + both parts here: ``wait()`` + close stdin + drain stdout to EOF. + + The stdout drain is the non-obvious part: anyio's ``StreamReaderWrapper.aclose()`` + only marks the Python-level reader closed — it never touches the underlying + ``_ProactorReadPipeTransport``. That transport starts paused and only detects + pipe EOF when someone reads, so without a drain it lives until ``__del__``. + + Idempotent: the ``returncode`` guard skips termination if already reaped + (avoids spurious WARNING/ERROR logs from ``terminate_posix_process_tree``'s + fallback path, visible because ``log_cli = true``); ``wait()`` and stream + ``aclose()`` no-op on subsequent calls; the drain raises ``ClosedResourceError`` + on the second call, caught by the suppress. The tests call this explicitly + as the action under test and ``AsyncExitStack`` calls it again on exit as a + safety net. Bounded by ``move_on_after`` to prevent hangs. """ with anyio.move_on_after(5.0): if getattr(proc, "returncode", None) is None: @@ -336,6 +341,8 @@ async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None assert proc.stdin is not None assert proc.stdout is not None await proc.stdin.aclose() + with suppress(anyio.EndOfStream, anyio.BrokenResourceError, anyio.ClosedResourceError): + await proc.stdout.receive(65536) await proc.stdout.aclose() From 0554fe0e3396f400a5fe616dee4158af87dc09ed Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:53:42 +0000 Subject: [PATCH 9/9] Add returncode property to FallbackProcess; simplify returncode guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FallbackProcess stands in for anyio.abc.Process on Windows with SelectorEventLoop, but was missing the returncode property that Process declares abstractly. This meant getattr(proc, "returncode", None) in _terminate_and_reap always returned the default None for FallbackProcess, so the double-terminate guard never fired (the docstring's 'Idempotent' claim was false for half the declared union type). Practical impact was near-zero — FallbackProcess is only active under SelectorEventLoop (not the default since Python 3.8, not used in CI), and terminate_windows_process_tree swallows all exceptions on the second call. But the fix is trivial and brings FallbackProcess closer to the interface it's supposed to implement. With returncode present on both union members, the getattr can become a direct attribute access, which also satisfies pyright without defensive indirection. Github-Issue: #1775 --- src/mcp/os/win32/utilities.py | 5 +++++ tests/client/test_stdio.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/mcp/os/win32/utilities.py b/src/mcp/os/win32/utilities.py index 0e188691f..6f68405f7 100644 --- a/src/mcp/os/win32/utilities.py +++ b/src/mcp/os/win32/utilities.py @@ -123,6 +123,11 @@ def pid(self) -> int: """Return the process ID.""" return self.popen.pid + @property + def returncode(self) -> int | None: + """Return the exit code, or ``None`` if the process has not yet terminated.""" + return self.popen.returncode + # ------------------------ # Updated function diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index ce3eb3f60..06e2cba4b 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -335,7 +335,7 @@ async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None safety net. Bounded by ``move_on_after`` to prevent hangs. """ with anyio.move_on_after(5.0): - if getattr(proc, "returncode", None) is None: + if proc.returncode is None: await _terminate_process_tree(proc) await proc.wait() assert proc.stdin is not None