Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions src/claude_agent_sdk/_internal/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,25 +807,30 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
async def close(self) -> None:
"""Close the query and transport."""
self._closed = True
# Final-flush mirror entries before tearing down so .return()/break
# don't drop the current turn when the process exits immediately.
if self._transcript_mirror_batcher is not None:
await self._transcript_mirror_batcher.close()
for task in list(self._child_tasks):
task.cancel()
if self._read_task is not None and not self._read_task.done():
self._read_task.cancel()
await self._read_task.wait()
self._read_task = None
# The read task's finally closed the send side; repeat here for the
# case where start() was never called. Do NOT close the receive
# side — it belongs to the consumer, and anyio's receive_nowait()
# checks _closed before the buffer, so closing it here would make a
# non-parked consumer drop buffered messages with
# ClosedResourceError. _message_send.close() alone yields
# EndOfStream after the buffer drains.
self._message_send.close()
await self.transport.close()
# Wrap pre-transport cleanup in try/finally so transport.close() always
# runs. Without this guard, a raise from _transcript_mirror_batcher.close()
# or _read_task.wait() leaks the CLI subprocess and stderr reader task.
try:
# Final-flush mirror entries before tearing down so .return()/break
# don't drop the current turn when the process exits immediately.
if self._transcript_mirror_batcher is not None:
await self._transcript_mirror_batcher.close()
for task in list(self._child_tasks):
task.cancel()
if self._read_task is not None and not self._read_task.done():
self._read_task.cancel()
await self._read_task.wait()
self._read_task = None
# The read task's finally closed the send side; repeat here for the
# case where start() was never called. Do NOT close the receive
# side — it belongs to the consumer, and anyio's receive_nowait()
# checks _closed before the buffer, so closing it here would make a
# non-parked consumer drop buffered messages with
# ClosedResourceError. _message_send.close() alone yields
# EndOfStream after the buffer drains.
self._message_send.close()
finally:
await self.transport.close()

# Make Query an async iterator
def __aiter__(self) -> AsyncIterator[dict[str, Any]]:
Expand Down
72 changes: 72 additions & 0 deletions tests/test_query_close_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Regression test for issue #886.

`Query.close()` did not guard the pre-transport cleanup with try/finally,
so if any earlier cleanup step raised, ``transport.close()`` would never
run — leaking the CLI subprocess and the stderr reader task.
"""

from unittest.mock import AsyncMock, Mock

import anyio
import pytest

from claude_agent_sdk._internal.query import Query


def _make_mock_transport():
"""Mock transport with awaitable lifecycle methods and an empty read stream."""
mock_transport = AsyncMock()

async def mock_receive():
if False: # pragma: no cover - empty generator
yield {}

mock_transport.read_messages = mock_receive
mock_transport.connect = AsyncMock()
mock_transport.close = AsyncMock()
mock_transport.end_input = AsyncMock()
mock_transport.write = AsyncMock()
mock_transport.is_ready = Mock(return_value=True)
return mock_transport


class _RaisingMirrorBatcher:
"""Minimal stand-in for TranscriptMirrorBatcher whose close() raises.

Only ``close()`` is called from ``Query.close()``; the other methods are
here so the type-stub set_transcript_mirror_batcher accepts it.
"""

async def close(self) -> None:
raise RuntimeError("simulated mirror-batcher failure")

async def flush(self) -> None: # pragma: no cover - not exercised
return None

def enqueue(self, file_path, entries) -> None: # pragma: no cover
return None


def test_close_runs_transport_close_when_mirror_batcher_close_raises():
"""transport.close() must still run when an earlier cleanup step raises.

Regression for issue #886: previously, a raise from
_transcript_mirror_batcher.close() short-circuited the rest of close()
and skipped transport.close(), leaking the CLI subprocess.
"""

async def _test():
mock_transport = _make_mock_transport()
q = Query(transport=mock_transport, is_streaming_mode=True)
q.set_transcript_mirror_batcher(_RaisingMirrorBatcher()) # type: ignore[arg-type]

await q.start()

with pytest.raises(RuntimeError, match="simulated mirror-batcher failure"):
await q.close()

# The whole point of the fix: transport.close() must run even when
# an earlier cleanup step raised.
mock_transport.close.assert_called_once()

anyio.run(_test)