From 28efdc416a41fefe86b4112ea34cbec0c8b61a66 Mon Sep 17 00:00:00 2001 From: Mukunda Rao Katta Date: Mon, 27 Apr 2026 20:37:28 -0700 Subject: [PATCH] fix(query): guard Query.close() cleanup with try/finally Wraps the pre-transport cleanup in close() with try/finally so transport.close() always runs. Without this, a raise from _transcript_mirror_batcher.close() or _read_task.wait() leaked the CLI subprocess and the stderr reader task. Adds a regression test that injects a mirror-batcher whose close() raises and asserts transport.close() is still invoked. Fixes #886 --- src/claude_agent_sdk/_internal/query.py | 43 ++++++++------- tests/test_query_close_cleanup.py | 72 +++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 19 deletions(-) create mode 100644 tests/test_query_close_cleanup.py diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index 0843453e..f981bc11 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -807,25 +807,30 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: async def close(self) -> None: """Close the query and transport.""" self._closed = True - # Final-flush mirror entries before tearing down so .return()/break - # don't drop the current turn when the process exits immediately. - if self._transcript_mirror_batcher is not None: - await self._transcript_mirror_batcher.close() - for task in list(self._child_tasks): - task.cancel() - if self._read_task is not None and not self._read_task.done(): - self._read_task.cancel() - await self._read_task.wait() - self._read_task = None - # The read task's finally closed the send side; repeat here for the - # case where start() was never called. Do NOT close the receive - # side — it belongs to the consumer, and anyio's receive_nowait() - # checks _closed before the buffer, so closing it here would make a - # non-parked consumer drop buffered messages with - # ClosedResourceError. _message_send.close() alone yields - # EndOfStream after the buffer drains. - self._message_send.close() - await self.transport.close() + # Wrap pre-transport cleanup in try/finally so transport.close() always + # runs. Without this guard, a raise from _transcript_mirror_batcher.close() + # or _read_task.wait() leaks the CLI subprocess and stderr reader task. + try: + # Final-flush mirror entries before tearing down so .return()/break + # don't drop the current turn when the process exits immediately. + if self._transcript_mirror_batcher is not None: + await self._transcript_mirror_batcher.close() + for task in list(self._child_tasks): + task.cancel() + if self._read_task is not None and not self._read_task.done(): + self._read_task.cancel() + await self._read_task.wait() + self._read_task = None + # The read task's finally closed the send side; repeat here for the + # case where start() was never called. Do NOT close the receive + # side — it belongs to the consumer, and anyio's receive_nowait() + # checks _closed before the buffer, so closing it here would make a + # non-parked consumer drop buffered messages with + # ClosedResourceError. _message_send.close() alone yields + # EndOfStream after the buffer drains. + self._message_send.close() + finally: + await self.transport.close() # Make Query an async iterator def __aiter__(self) -> AsyncIterator[dict[str, Any]]: diff --git a/tests/test_query_close_cleanup.py b/tests/test_query_close_cleanup.py new file mode 100644 index 00000000..0093c88e --- /dev/null +++ b/tests/test_query_close_cleanup.py @@ -0,0 +1,72 @@ +"""Regression test for issue #886. + +`Query.close()` did not guard the pre-transport cleanup with try/finally, +so if any earlier cleanup step raised, ``transport.close()`` would never +run — leaking the CLI subprocess and the stderr reader task. +""" + +from unittest.mock import AsyncMock, Mock + +import anyio +import pytest + +from claude_agent_sdk._internal.query import Query + + +def _make_mock_transport(): + """Mock transport with awaitable lifecycle methods and an empty read stream.""" + mock_transport = AsyncMock() + + async def mock_receive(): + if False: # pragma: no cover - empty generator + yield {} + + mock_transport.read_messages = mock_receive + mock_transport.connect = AsyncMock() + mock_transport.close = AsyncMock() + mock_transport.end_input = AsyncMock() + mock_transport.write = AsyncMock() + mock_transport.is_ready = Mock(return_value=True) + return mock_transport + + +class _RaisingMirrorBatcher: + """Minimal stand-in for TranscriptMirrorBatcher whose close() raises. + + Only ``close()`` is called from ``Query.close()``; the other methods are + here so the type-stub set_transcript_mirror_batcher accepts it. + """ + + async def close(self) -> None: + raise RuntimeError("simulated mirror-batcher failure") + + async def flush(self) -> None: # pragma: no cover - not exercised + return None + + def enqueue(self, file_path, entries) -> None: # pragma: no cover + return None + + +def test_close_runs_transport_close_when_mirror_batcher_close_raises(): + """transport.close() must still run when an earlier cleanup step raises. + + Regression for issue #886: previously, a raise from + _transcript_mirror_batcher.close() short-circuited the rest of close() + and skipped transport.close(), leaking the CLI subprocess. + """ + + async def _test(): + mock_transport = _make_mock_transport() + q = Query(transport=mock_transport, is_streaming_mode=True) + q.set_transcript_mirror_batcher(_RaisingMirrorBatcher()) # type: ignore[arg-type] + + await q.start() + + with pytest.raises(RuntimeError, match="simulated mirror-batcher failure"): + await q.close() + + # The whole point of the fix: transport.close() must run even when + # an earlier cleanup step raised. + mock_transport.close.assert_called_once() + + anyio.run(_test)