From e8cf708486e1934e787c79bafac1faae0f02320f Mon Sep 17 00:00:00 2001 From: itxaiohanglover <1531137510@qq.com> Date: Fri, 15 May 2026 10:37:33 +0800 Subject: [PATCH] fix: guard transport.close() with try/finally in Query.close() If any cleanup step preceding transport.close() raises an exception (e.g. batcher.close(), task.wait(), message_send.close()), the transport is never closed, leaving the stderr reader task and CLI subprocess leaked. Wrap cleanup in try/finally to ensure transport teardown always runs. Closes #886 --- src/claude_agent_sdk/_internal/query.py | 42 +++++++++++++------------ 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index 7a4f8a44..63018555 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -856,26 +856,28 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: async def close(self) -> None: """Close the query and transport.""" self._closed = True - # Final-flush mirror entries before tearing down so .return()/break - # don't drop the current turn when the process exits immediately. - if self._transcript_mirror_batcher is not None: - await self._transcript_mirror_batcher.close() - for task in list(self._child_tasks): - task.cancel() - if self._read_task is not None and not self._read_task.done(): - self._read_task.cancel() - await self._read_task.wait() - self._read_task = None - # The read task's finally closed the send side; repeat here for the - # case where start() was never called. Do NOT close the receive - # side — it belongs to the consumer, and anyio's receive_nowait() - # checks _closed before the buffer, so closing it here would make a - # non-parked consumer drop buffered messages with - # ClosedResourceError. _message_send.close() alone yields - # EndOfStream after the buffer drains; the consumer calls - # close_receive_stream() once it's done iterating (#859). - self._message_send.close() - await self.transport.close() + try: + # Final-flush mirror entries before tearing down so .return()/break + # don't drop the current turn when the process exits immediately. + if self._transcript_mirror_batcher is not None: + await self._transcript_mirror_batcher.close() + for task in list(self._child_tasks): + task.cancel() + if self._read_task is not None and not self._read_task.done(): + self._read_task.cancel() + await self._read_task.wait() + self._read_task = None + # The read task's finally closed the send side; repeat here for the + # case where start() was never called. Do NOT close the receive + # side — it belongs to the consumer, and anyio's receive_nowait() + # checks _closed before the buffer, so closing it here would make a + # non-parked consumer drop buffered messages with + # ClosedResourceError. _message_send.close() alone yields + # EndOfStream after the buffer drains; the consumer calls + # close_receive_stream() once it's done iterating (#859). + self._message_send.close() + finally: + await self.transport.close() def close_receive_stream(self) -> None: """Close the receive side of the message stream.