From c3bf7f7c0614a11de5c3fec62f66b195a8570237 Mon Sep 17 00:00:00 2001 From: Eric Lee Date: Fri, 15 May 2026 03:01:36 -0700 Subject: [PATCH] fix(esc): cancel streaming API call when abort signal trips MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Even after PRs #135/#140/#141/#143 wired ESC through subagents, headless SIGINT, the non-optional ``tool_context.abort_controller`` contract, and ripgrep mid-search, the user-visible ESC latency was still 20+ seconds when the model emitted a multi-tool_use response. Root cause: the provider's streaming HTTP read had no way to observe the abort controller. ``query.py`` passed no abort plumbing to ``chat_stream_response``; the only existing interrupt point was the ``on_text_chunk`` callback, which never fires for a turn that emits tool_use blocks without intervening text. The model could spend ~20s generating eight parallel ``Write`` blocks, and only after the stream returned naturally would the outer query loop check the abort signal and yield "Interrupted by user" for all eight. Thread an ``AbortSignal`` through ``chat_stream_response``. In the Anthropic provider, register a listener that calls ``stream.response.close()`` (the same close pattern the existing ``StreamWatchdog`` uses for idle timeout). Closing the underlying HTTP response causes the SDK's blocking socket read to raise in the consumer thread; the provider catches it, detects the abort via the signal state (not the exception class — different SDK versions raise different classes), and re-raises ``AbortError``. Three defenses against the registration race: * Pre-call fast-path bails when the signal was already tripped at call entry — skips the round-trip entirely. * Register-then-recheck after entering the stream context — closes the sub-microsecond window between an ``aborted`` read and a ``add_listener`` append where ``_fire`` could snapshot the listener list and silently drop our newly-appended listener. * Post-with-block recheck — surfaces a signal that fires between ``__exit__`` and ``return``. Plumbing: * ``query.py``: ``_call_model_sync`` grows ``abort_signal`` keyword and forwards to the provider; call site passes ``params.abort_controller.signal``. New ``except AbortError: raise`` in ``_call_model_sync`` and ``except AbortError: pass`` in the query loop route the cancel to the existing post-API abort-handling block in one place. * ``agent_loop.py``: ``_call_provider_for_turn`` grows ``cancel_signal`` and forwards as ``abort_signal=`` to the provider; ``run_agent_loop`` passes through. * ``minimax_provider.py`` / ``openai_compatible.py``: pre-call fast-path for parity (mid-stream listener for these providers is a future PR — same underlying anthropic SDK for Minimax makes it a small lift). Five regression tests pin the contract: pre-aborted fast-path skips ``_ensure_client``, mid-stream abort closes the stream and raises ``AbortError`` within <1s (vs the 20s+ symptom this PR fixes), unaborted streams return normally, ``abort_signal=None`` is a no-op for legacy callers, and the listener detaches after normal completion. Co-Authored-By: Claude Opus 4.7 --- src/providers/anthropic_provider.py | 93 ++++++++++- src/providers/base.py | 14 +- src/providers/minimax_provider.py | 10 ++ src/providers/openai_compatible.py | 9 ++ src/query/query.py | 31 +++- src/tool_system/agent_loop.py | 9 ++ tests/test_provider_abort_signal.py | 236 ++++++++++++++++++++++++++++ 7 files changed, 394 insertions(+), 8 deletions(-) create mode 100644 tests/test_provider_abort_signal.py diff --git a/src/providers/anthropic_provider.py b/src/providers/anthropic_provider.py index 1619f1d..128732a 100644 --- a/src/providers/anthropic_provider.py +++ b/src/providers/anthropic_provider.py @@ -3,10 +3,13 @@ from __future__ import annotations import sys -from typing import Generator, Optional, Any +from typing import Generator, Optional, Any, TYPE_CHECKING from .base import BaseProvider, ChatResponse, MessageInput, TextChunkCallback +if TYPE_CHECKING: + from src.utils.abort_controller import AbortSignal + # WI-4.4 (ch17 Phase 4): defer the ``import anthropic`` call. The SDK # alone is ~150-200ms to import (verified by ``my-docs/profiler-baseline.md``: @@ -248,6 +251,7 @@ def chat_stream_response( messages: list[MessageInput], tools: Optional[list[dict[str, Any]]] = None, on_text_chunk: TextChunkCallback | None = None, + abort_signal: "AbortSignal | None" = None, **kwargs ) -> ChatResponse: """Stream Anthropic text chunks and return the final structured response. @@ -257,9 +261,26 @@ def chat_stream_response( ``CLAUDE_STREAM_IDLE_TIMEOUT_MS`` (default 90 s). On timeout the iterator raises; we catch it and fall back to the non-streaming ``chat()`` path so the user gets an answer rather than a hung session. + + ESC-cancellation: when ``abort_signal`` is provided, a listener is + registered that calls ``stream.response.close()`` when the signal + fires. The close interrupts the SDK's blocking socket read so the + ``for text in stream.text_stream`` iterator raises immediately — + without it, ESC during a tool-use-only response (no intervening + text chunks for ``on_text_chunk`` to observe) waits for the model + to finish generating before the outer query loop can bail. We + translate the raise into ``AbortError`` so callers can distinguish + a user-initiated cancel from the watchdog's idle-timeout fallback. """ + from src.utils.abort_controller import AbortError from src.utils.stream_watchdog import StreamWatchdog + # Fast-path: if abort fired before we even build the request, don't + # spend the round-trip — raise directly so the caller's cancel + # boundary unwinds at the same place the mid-stream path lands. + if abort_signal is not None and abort_signal.aborted: + raise AbortError(abort_signal.reason or "user_interrupt") + model = self._get_model(**kwargs) max_tokens = kwargs.get("max_tokens", 4096) system = kwargs.pop("system", None) @@ -294,6 +315,7 @@ def _fallback_to_chat() -> ChatResponse: streamed_text = "" watchdog_fired = False final_message = None + abort_listener: Any = None try: with client.messages.stream( model=model, @@ -303,6 +325,44 @@ def _fallback_to_chat() -> ChatResponse: **extra_kwargs, **{k: v for k, v in kwargs.items() if k not in ["model", "max_tokens", "tools"]}, ) as stream: + # Register the abort listener BEFORE the iterator pulls + # its first chunk, so a signal that fires between context + # entry and the first ``text_stream.__next__`` still wins + # the race. Mirrors ``StreamWatchdog``'s close pattern: + # close the underlying HTTP response from another thread, + # which raises in the consumer thread on the next pull. + if abort_signal is not None: + def _close_stream_on_abort() -> None: + try: + response = getattr(stream, "response", None) + if response is not None: + close = getattr(response, "close", None) + if callable(close): + close() + except Exception: + # Best-effort — never let the close + # propagate out of the listener thread. + pass + + # Register-then-recheck (NOT check-then-register): + # the naive ordering has a sub-microsecond race + # where another thread can call ``_fire`` between + # our ``aborted`` read and the ``add_listener`` + # append. ``_fire`` snapshots the listener list + # before iterating, so any listener appended after + # that snapshot is silently dropped. + # Register-then-recheck closes the gap: ``aborted`` + # is sticky-True after ``_fire`` runs, so the + # post-add read catches any concurrent fire, and + # ``_close_stream_on_abort`` is idempotent so a + # double-call (listener fires AND we call directly) + # is harmless. + abort_listener = abort_signal.add_listener( + _close_stream_on_abort, once=True, + ) + if abort_signal.aborted: + _close_stream_on_abort() + watchdog = StreamWatchdog(stream) watchdog.arm() try: @@ -329,10 +389,22 @@ def _fallback_to_chat() -> ChatResponse: watchdog_fired = watchdog.fired watchdog.disarm() except Exception as streaming_exc: - # WI-5.2 fallback path: stream interrupted. If our watchdog - # triggered the interruption, fall back to non-streaming so - # the user still gets an answer. If the failure is something - # else (network/auth/etc.), re-raise the original. + # Abort path: the abort listener closed the stream's response, + # which raised in the consumer thread. Translate to + # ``AbortError`` so the query loop's + # ``except AbortError: raise`` cancel boundary unwinds + # cleanly. We check the signal AFTER the catch (not the + # exception type) because the SDK can raise several different + # exception classes depending on which socket operation was + # in flight when we closed; the abort_signal state is the + # authoritative source of truth. + if abort_signal is not None and abort_signal.aborted: + raise AbortError(abort_signal.reason or "user_interrupt") from streaming_exc + + # WI-5.2 fallback path: stream interrupted by the idle + # watchdog. Fall back to non-streaming so the user still + # gets an answer. If the failure is something else + # (network/auth/etc.), re-raise the original. if watchdog_fired: try: return _fallback_to_chat() @@ -344,6 +416,17 @@ def _fallback_to_chat() -> ChatResponse: # error and re-raised only the streaming one. raise fallback_exc from streaming_exc raise + finally: + # Always detach the abort listener so it doesn't pin the + # provider alive past one call. + if abort_listener is not None and abort_signal is not None: + abort_signal.remove_listener(abort_listener) + + # Stream completed normally but abort may have fired between + # ``stream.__exit__`` and here. Surface it now so the caller + # bails at the same place every other path does. + if abort_signal is not None and abort_signal.aborted: + raise AbortError(abort_signal.reason or "user_interrupt") if watchdog_fired: # Stream got interrupted but no exception escaped the diff --git a/src/providers/base.py b/src/providers/base.py index f972e5e..8e40cf9 100644 --- a/src/providers/base.py +++ b/src/providers/base.py @@ -4,7 +4,10 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Callable, Generator, Optional, TypeAlias +from typing import Any, Callable, Generator, Optional, TYPE_CHECKING, TypeAlias + +if TYPE_CHECKING: + from src.utils.abort_controller import AbortSignal @dataclass @@ -95,10 +98,19 @@ def chat_stream_response( messages: list[MessageInput], tools: Optional[list[dict[str, Any]]] = None, on_text_chunk: TextChunkCallback | None = None, + abort_signal: "AbortSignal | None" = None, **kwargs ) -> ChatResponse: """Stream a response while also returning the final structured ChatResponse. + When ``abort_signal`` is provided, a provider implementation should + register a listener on it that forcibly closes the underlying HTTP + stream when the signal fires. Without this, a tripped abort can only + be observed between chunks via ``on_text_chunk`` — which never fires + for a turn that emits tool_use blocks without intervening text, so + ESC ends up waiting for the model to finish generating before the + outer query loop can bail. + Providers may override this to support tool-aware streaming. The default implementation signals that rich streamed responses are unavailable. """ diff --git a/src/providers/minimax_provider.py b/src/providers/minimax_provider.py index 04cc06e..a103d0e 100644 --- a/src/providers/minimax_provider.py +++ b/src/providers/minimax_provider.py @@ -165,8 +165,18 @@ def chat_stream_response( messages: list[MessageInput], tools: Optional[list[dict[str, Any]]] = None, on_text_chunk: TextChunkCallback | None = None, + abort_signal: Any = None, **kwargs ) -> ChatResponse: + # Pre-call fast-path: matches AnthropicProvider. A signal that + # tripped at a turn boundary skips the API round-trip entirely. + # Mid-stream cancellation isn't implemented yet — that needs the + # same response-close listener pattern AnthropicProvider uses, + # which the Minimax/anthropic-compatible SDK should support + # (it's the same underlying ``anthropic`` package) — future PR. + if abort_signal is not None and getattr(abort_signal, "aborted", False): + from src.utils.abort_controller import AbortError + raise AbortError(getattr(abort_signal, "reason", None) or "user_interrupt") model = self._get_model(**kwargs) max_tokens = kwargs.get("max_tokens", 4096) system = kwargs.pop("system", None) diff --git a/src/providers/openai_compatible.py b/src/providers/openai_compatible.py index a25a051..49b685e 100644 --- a/src/providers/openai_compatible.py +++ b/src/providers/openai_compatible.py @@ -325,9 +325,18 @@ def chat_stream_response( messages: list[MessageInput], tools: Optional[list[dict[str, Any]]] = None, on_text_chunk: TextChunkCallback | None = None, + abort_signal: Any = None, **kwargs ) -> ChatResponse: """Stream OpenAI-compatible chunks while rebuilding the final response.""" + # Pre-call fast-path: matches AnthropicProvider. A signal that + # tripped at a turn boundary skips the API round-trip entirely. + # Mid-stream cancellation isn't implemented yet — that needs a + # response-close listener around the OpenAI SDK's stream + # iterator — future PR. + if abort_signal is not None and getattr(abort_signal, "aborted", False): + from src.utils.abort_controller import AbortError + raise AbortError(getattr(abort_signal, "reason", None) or "user_interrupt") model = self._get_model(**kwargs) provider_messages = self._prepare_messages(messages) diff --git a/src/query/query.py b/src/query/query.py index 1960739..67aa9d6 100644 --- a/src/query/query.py +++ b/src/query/query.py @@ -21,7 +21,7 @@ from ..tool_system.context import ToolContext from ..tool_system.protocol import ToolCall, ToolResult from ..tool_system.registry import ToolRegistry -from ..utils.abort_controller import AbortController +from ..utils.abort_controller import AbortController, AbortError from ..providers.base import BaseProvider, ChatResponse from .config import QueryConfig, build_query_config @@ -282,6 +282,7 @@ async def _call_model_sync( system_prompt: str, tools: Tools, max_output_tokens_override: int | None = None, + abort_signal: Any = None, ) -> tuple[list[AssistantMessage], list[ToolUseBlock]]: from ..types.messages import normalize_messages_for_api @@ -383,11 +384,27 @@ async def _call_model_sync( logger.warning("[DIAG] _call_model_sync: calling provider (streaming)...") try: try: - response = provider.chat_stream_response(api_messages, **call_kwargs) + # ``abort_signal`` reaches the provider so a tripped controller + # can close the streaming HTTP response immediately rather than + # waiting for the model to finish generating. Without this + # plumb, ESC during a tool-use-only response (no intermediate + # text chunks for an ``on_text_chunk`` to observe) waits the + # full model latency before the outer query loop bails. + response = provider.chat_stream_response( + api_messages, abort_signal=abort_signal, **call_kwargs, + ) except (NotImplementedError, AttributeError): if _diag: logger.warning("[DIAG] _call_model_sync: streaming not supported, falling back to chat()") response = provider.chat(api_messages, **call_kwargs) + except AbortError: + # User-initiated cancel — propagate so the query loop's + # ``except AbortError: pass`` boundary unwinds to the + # post-API abort-check block. We do NOT route this through + # the error-message classification below: a future addition + # to those substring checks could accidentally match an abort + # reason and convert the cancel into a model-error reply. + raise except Exception as e: if _diag: logger.warning("[DIAG] _call_model_sync: EXCEPTION after %.1fs: %s", time.monotonic() - _t0, e) @@ -866,6 +883,7 @@ async def query( system_prompt=params.system_prompt, tools=params.tools, max_output_tokens_override=max_output_tokens_override, + abort_signal=params.abort_controller.signal, ) assistant_messages = returned_assistants tool_use_blocks = returned_tool_blocks @@ -886,6 +904,15 @@ async def query( if not withheld: yield msg + except AbortError: + # The provider's abort listener closed the streaming HTTP + # response mid-flight (ESC pressed while the model was still + # generating). The signal is already tripped, so let the + # ``if params.abort_controller.signal.aborted`` block right + # below us do the cancellation processing in exactly one + # place — anything we did here would duplicate that work. + pass + except Exception as e: logger.error("Query error: %s", e) error_message = str(e) diff --git a/src/tool_system/agent_loop.py b/src/tool_system/agent_loop.py index 7485c31..c2cc1c5 100644 --- a/src/tool_system/agent_loop.py +++ b/src/tool_system/agent_loop.py @@ -147,16 +147,24 @@ def _call_provider_for_turn( call_kwargs: dict[str, Any], stream: bool, on_text_chunk: TextChunkHandler | None, + cancel_signal: AbortSignal | None = None, ) -> tuple[Any, bool]: """Call the provider, preferring structured streaming when available. Returns (response, streamed_live_text). + + ``cancel_signal`` is forwarded to the provider so a tripped signal can + close the streaming HTTP response immediately rather than waiting for + the model to finish generating. Without this plumb, ESC during a + tool-use-only turn (no intervening text chunks for ``on_text_chunk`` + to observe) waits the full model latency before the agent loop bails. """ if stream: try: response = provider.chat_stream_response( api_messages, on_text_chunk=on_text_chunk, + abort_signal=cancel_signal, **call_kwargs, ) if not isinstance(response, ChatResponse): @@ -344,6 +352,7 @@ def _check_cancel() -> None: call_kwargs=call_kwargs, stream=stream, on_text_chunk=on_text_chunk, + cancel_signal=cancel_signal, ) turn_count += 1 diff --git a/tests/test_provider_abort_signal.py b/tests/test_provider_abort_signal.py new file mode 100644 index 0000000..1cca598 --- /dev/null +++ b/tests/test_provider_abort_signal.py @@ -0,0 +1,236 @@ +"""Regression tests for abort-signal-aware streaming in the Anthropic provider. + +Before this fix, ``provider.chat_stream_response`` had no way to observe +a tripped abort controller. The agent loop's ``cancel_signal`` was only +checked between chunks via the optional ``on_text_chunk`` callback — but +``on_text_chunk`` never fires for a turn that emits tool_use blocks +without intervening text. The result: ESC during a multi-tool-use +response (e.g. the model emitting eight parallel ``Write`` calls) waited +the full model latency before the outer query loop's abort check fired, +producing the "ESC takes 20+ seconds" symptom on the default REPL UI. + +The fix threads an ``AbortSignal`` through ``chat_stream_response``. The +Anthropic provider registers a listener that calls +``stream.response.close()`` when the signal fires; the SDK's blocking +socket read raises in the consumer thread, the provider catches it, +detects the abort via the signal state (not the exception type — the +SDK can raise several different classes depending on which syscall was +in flight), and re-raises ``AbortError``. The query loop's existing +abort-aware exception handler then routes through the same cancellation +processing as any other in-flight cancel. + +These tests pin the provider-level contract using a synthetic stream +object that mimics the SDK's surface (``__enter__`` / ``__exit__`` / +``text_stream`` / ``response.close()``). We don't exercise the real +Anthropic SDK — that would require a live API key. +""" +from __future__ import annotations + +import threading +import time +from unittest.mock import MagicMock, patch + +import pytest + +from src.providers.anthropic_provider import AnthropicProvider +from src.utils.abort_controller import AbortController, AbortError + + +class _FakeStream: + """Minimal stand-in for the Anthropic SDK's ``messages.stream`` ctx manager. + + Yields text chunks one at a time with a configurable delay so we can + simulate a slow streaming response that ESC needs to cancel. + """ + + def __init__(self, chunks: list[str], per_chunk_delay_s: float = 0.0): + self._chunks = list(chunks) + self._delay = per_chunk_delay_s + self._closed = threading.Event() + self.response = MagicMock() + # The provider expects ``response.close`` to be callable. + self.response.close.side_effect = self._closed.set + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + @property + def text_stream(self): + for chunk in self._chunks: + if self._closed.is_set(): + # SDK's iterator would raise once the underlying HTTP + # response is closed — model that here so the provider + # observes the same signal it would in production. + raise ConnectionError("response closed mid-stream") + if self._delay > 0: + # Wait in small slices so a stream.close() landing from + # another thread can interrupt promptly. + deadline = time.monotonic() + self._delay + while time.monotonic() < deadline: + if self._closed.is_set(): + raise ConnectionError("response closed mid-stream") + time.sleep(0.005) + yield chunk + + def get_final_message(self): + # Build a minimal "final message" shape the provider's + # ``_build_chat_response`` accepts; only called on the success + # path, not after a close. + m = MagicMock() + m.content = [] + m.usage.input_tokens = 1 + m.usage.output_tokens = 1 + m.model = "test" + m.stop_reason = "end_turn" + return m + + +def _provider_with_stream(stream) -> AnthropicProvider: + """Build a provider whose ``client.messages.stream`` returns ``stream``. + + We patch ``_ensure_client`` rather than the underlying SDK so the + test stays insulated from the lazy-import / module-getattr dance in + ``anthropic_provider.py``. + """ + provider = AnthropicProvider(api_key="test", model="claude-sonnet-4-6") + client = MagicMock() + client.messages.stream.return_value = stream + provider._ensure_client = lambda: client # type: ignore[method-assign] + return provider + + +def test_pre_aborted_signal_short_circuits_before_request(monkeypatch) -> None: + """A tripped controller before the call bypasses the request entirely. + + Without this fast-path, an abort fired between turn boundaries + would still spend the API round-trip before the outer loop could + bail. The fast-path matches the same shape every other abort + boundary in the codebase: detect, raise ``AbortError``, let the + cancel boundary unwind. + """ + controller = AbortController() + controller.abort("user_interrupt") + + provider = AnthropicProvider(api_key="test", model="claude-sonnet-4-6") + # ``_ensure_client`` must NOT be called — the fast-path bails first. + provider._ensure_client = MagicMock( # type: ignore[method-assign] + side_effect=AssertionError("client should not be built when abort is pre-tripped") + ) + + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + provider._ensure_client.assert_not_called() + + +def test_mid_stream_abort_closes_stream_and_raises_abort_error() -> None: + """ESC mid-stream → ``response.close()`` → iterator raises → ``AbortError``. + + This is the regression: previously the provider would propagate the + iterator's exception generically, and the outer query loop would + treat the closed-stream raise as a model error rather than a user + cancel. The fix detects the abort via the signal state (which is + authoritative even when the SDK exception class varies across + SDK versions / socket libraries) and re-raises ``AbortError``. + """ + controller = AbortController() + # 5 chunks with 100ms delay each → 500ms total without abort. + stream = _FakeStream(["a", "b", "c", "d", "e"], per_chunk_delay_s=0.10) + provider = _provider_with_stream(stream) + + def _trip_after_first_chunk() -> None: + # Sleep long enough that the provider has entered the stream + # context and pulled at least one chunk, then fire the abort. + time.sleep(0.15) + controller.abort("user_interrupt") + + threading.Thread(target=_trip_after_first_chunk, daemon=True).start() + + start = time.monotonic() + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + elapsed = time.monotonic() - start + + # The 5-chunk stream would take 500ms to complete. With abort, the + # listener fires ~150ms in and the next iteration sees a closed + # response and raises. Total elapsed should be well under 350ms; + # 1s is comfortable CI headroom while still failing loudly if a + # regression makes us wait out the full stream. + assert elapsed < 1.0, f"abort took {elapsed:.2f}s — expected <1s" + # The stream's response.close() must actually have been called — + # this is what propagates the cancel into the SDK's blocking read. + stream.response.close.assert_called() + + +def test_uncancelled_stream_returns_normally() -> None: + """A never-tripped signal preserves existing streaming semantics.""" + controller = AbortController() # never aborted + stream = _FakeStream(["hello ", "world"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + seen_chunks: list[str] = [] + + def _on_text(chunk: str) -> None: + seen_chunks.append(chunk) + + response = provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + on_text_chunk=_on_text, + abort_signal=controller.signal, + ) + + # The chunks arrived through the callback (and the close listener + # never fired, so the stream completes naturally). + assert seen_chunks == ["hello ", "world"] + stream.response.close.assert_not_called() + # The provider returned the final structured response shape. + assert response is not None + + +def test_no_abort_signal_param_preserves_legacy_callers() -> None: + """SDK consumers that don't pass ``abort_signal`` see no behavior change. + + The parameter is keyword-only with default ``None`` so existing + callers don't break. With ``None`` no listener is registered, no + close fires, no AbortError can be raised by the provider — the + stream completes naturally just like before this PR. + """ + stream = _FakeStream(["ok"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + response = provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + # No abort_signal — verifies the default works. + ) + assert response is not None + stream.response.close.assert_not_called() + + +def test_listener_detached_after_normal_completion() -> None: + """The abort listener must not pin the provider alive across calls. + + Concrete failure mode the cleanup guards against: a long-lived + AbortController (e.g. the REPL engine's controller, reused across + many turns) would otherwise accumulate one dead listener per + streaming call, and each abort would invoke N stream-close + callbacks against long-gone streams. + """ + controller = AbortController() + stream = _FakeStream(["ok"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + + # No listeners should remain attached after the call completes. + assert controller.signal._listeners == []