Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions src/claude_agent_sdk/_internal/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,26 +856,28 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
async def close(self) -> None:
"""Close the query and transport."""
self._closed = True
# Final-flush mirror entries before tearing down so .return()/break
# don't drop the current turn when the process exits immediately.
if self._transcript_mirror_batcher is not None:
await self._transcript_mirror_batcher.close()
for task in list(self._child_tasks):
task.cancel()
if self._read_task is not None and not self._read_task.done():
self._read_task.cancel()
await self._read_task.wait()
self._read_task = None
# The read task's finally closed the send side; repeat here for the
# case where start() was never called. Do NOT close the receive
# side — it belongs to the consumer, and anyio's receive_nowait()
# checks _closed before the buffer, so closing it here would make a
# non-parked consumer drop buffered messages with
# ClosedResourceError. _message_send.close() alone yields
# EndOfStream after the buffer drains; the consumer calls
# close_receive_stream() once it's done iterating (#859).
self._message_send.close()
await self.transport.close()
try:
# Final-flush mirror entries before tearing down so .return()/break
# don't drop the current turn when the process exits immediately.
if self._transcript_mirror_batcher is not None:
await self._transcript_mirror_batcher.close()
for task in list(self._child_tasks):
task.cancel()
if self._read_task is not None and not self._read_task.done():
self._read_task.cancel()
await self._read_task.wait()
self._read_task = None
# The read task's finally closed the send side; repeat here for the
# case where start() was never called. Do NOT close the receive
# side — it belongs to the consumer, and anyio's receive_nowait()
# checks _closed before the buffer, so closing it here would make a
# non-parked consumer drop buffered messages with
# ClosedResourceError. _message_send.close() alone yields
# EndOfStream after the buffer drains; the consumer calls
# close_receive_stream() once it's done iterating (#859).
self._message_send.close()
finally:
await self.transport.close()

def close_receive_stream(self) -> None:
"""Close the receive side of the message stream.
Expand Down