diff --git a/src/providers/minimax_provider.py b/src/providers/minimax_provider.py index a103d0e..def28e5 100644 --- a/src/providers/minimax_provider.py +++ b/src/providers/minimax_provider.py @@ -168,14 +168,21 @@ def chat_stream_response( abort_signal: Any = None, **kwargs ) -> ChatResponse: + """Stream Minimax response with abort-signal-aware cancellation. + + Minimax wraps the anthropic SDK against its compatible endpoint, + so the response-close listener pattern AnthropicProvider uses + works here too. Same contract: pre-call fast-path, register- + then-recheck listener that closes the underlying HTTP response, + signal-state-authoritative abort detection in the exception + handler, post-with-block recheck, ``finally`` detaches the + listener. + """ + from src.utils.abort_controller import AbortError + # Pre-call fast-path: matches AnthropicProvider. A signal that # tripped at a turn boundary skips the API round-trip entirely. - # Mid-stream cancellation isn't implemented yet — that needs the - # same response-close listener pattern AnthropicProvider uses, - # which the Minimax/anthropic-compatible SDK should support - # (it's the same underlying ``anthropic`` package) — future PR. if abort_signal is not None and getattr(abort_signal, "aborted", False): - from src.utils.abort_controller import AbortError raise AbortError(getattr(abort_signal, "reason", None) or "user_interrupt") model = self._get_model(**kwargs) max_tokens = kwargs.get("max_tokens", 4096) @@ -188,24 +195,72 @@ def chat_stream_response( extra_kwargs["tools"] = tools streamed_text = "" - with client.messages.stream( - model=model, - max_tokens=max_tokens, - messages=minimax_messages, - **({"system": system} if system else {}), - **extra_kwargs, - **{k: v for k, v in kwargs.items() if k not in ["model", "max_tokens", "tools"]}, - ) as stream: - for text in stream.text_stream: - if not text: - continue - streamed_text += text - if on_text_chunk is not None: - on_text_chunk(text) - try: - final_message = stream.get_final_message() - except Exception: - final_message = None + final_message: Any = None + abort_listener: Any = None + try: + with client.messages.stream( + model=model, + max_tokens=max_tokens, + messages=minimax_messages, + **({"system": system} if system else {}), + **extra_kwargs, + **{k: v for k, v in kwargs.items() if k not in ["model", "max_tokens", "tools"]}, + ) as stream: + if abort_signal is not None: + def _close_stream_on_abort() -> None: + try: + response = getattr(stream, "response", None) + if response is not None: + close = getattr(response, "close", None) + if callable(close): + close() + except Exception: + pass + + # Register-then-recheck: see AnthropicProvider for the + # full race analysis. ``_fire`` snapshots the listener + # list before iterating, so a listener appended after + # the snapshot is silently dropped; the post-add + # ``aborted`` read closes the gap. + abort_listener = abort_signal.add_listener( + _close_stream_on_abort, once=True, + ) + if abort_signal.aborted: + _close_stream_on_abort() + + for text in stream.text_stream: + if not text: + continue + streamed_text += text + if on_text_chunk is not None: + on_text_chunk(text) + try: + final_message = stream.get_final_message() + except Exception: + final_message = None + except Exception as streaming_exc: + # Abort path: signal state is authoritative — different SDK + # versions raise different exception types when the response + # is closed mid-read. + if abort_signal is not None and getattr(abort_signal, "aborted", False): + raise AbortError( + getattr(abort_signal, "reason", None) or "user_interrupt" + ) from streaming_exc + raise + finally: + if abort_listener is not None and abort_signal is not None: + try: + abort_signal.remove_listener(abort_listener) + except Exception: + pass + + # Stream completed normally but abort may have fired between + # ``__exit__`` and here. Surface it at the same boundary every + # other path uses. + if abort_signal is not None and getattr(abort_signal, "aborted", False): + raise AbortError( + getattr(abort_signal, "reason", None) or "user_interrupt" + ) if final_message is not None: return self._build_chat_response(final_message) diff --git a/src/providers/openai_compatible.py b/src/providers/openai_compatible.py index 49b685e..208392a 100644 --- a/src/providers/openai_compatible.py +++ b/src/providers/openai_compatible.py @@ -328,14 +328,34 @@ def chat_stream_response( abort_signal: Any = None, **kwargs ) -> ChatResponse: - """Stream OpenAI-compatible chunks while rebuilding the final response.""" + """Stream OpenAI-compatible chunks while rebuilding the final response. + + ESC-cancellation: when ``abort_signal`` is provided, two defenses + cooperate so ESC unwinds the stream promptly regardless of the + provider's chunk cadence: + + * **Response-close listener** registered on the abort signal — + calls ``stream.response.close()``. Closes the underlying HTTP + socket so the SDK's blocking next-chunk read raises + immediately, even when the model is in a long gap between + chunks (extended thinking, tool_use generation). + * **In-loop abort check** at the top of each ``for chunk in + stream`` iteration — catches the case where chunks arrive + back-to-back and the listener's close lands one iteration + late, so we stop iterating before the next read. + + Mirrors the contract ``AnthropicProvider.chat_stream_response`` + established for the Anthropic SDK; same correctness arguments + apply (signal state is authoritative for abort detection; + register-then-recheck closes the registration race; listener + is detached in a ``finally`` so long-lived controllers don't + accumulate dead listeners). + """ + from src.utils.abort_controller import AbortError + # Pre-call fast-path: matches AnthropicProvider. A signal that # tripped at a turn boundary skips the API round-trip entirely. - # Mid-stream cancellation isn't implemented yet — that needs a - # response-close listener around the OpenAI SDK's stream - # iterator — future PR. if abort_signal is not None and getattr(abort_signal, "aborted", False): - from src.utils.abort_controller import AbortError raise AbortError(getattr(abort_signal, "reason", None) or "user_interrupt") model = self._get_model(**kwargs) provider_messages = self._prepare_messages(messages) @@ -372,51 +392,122 @@ def chat_stream_response( usage_obj: Any = None tool_calls_by_index: dict[int, dict[str, str]] = {} - for chunk in stream: - response_model = getattr(chunk, "model", response_model) - usage_candidate = getattr(chunk, "usage", None) - if usage_candidate is not None: - usage_obj = usage_candidate + # --- Abort-listener wiring --- + # Close the underlying HTTP response when the signal trips so a + # blocking next-chunk read raises immediately. The OpenAI Python + # SDK 1.x and 2.x both expose the underlying httpx Response as + # ``stream.response`` (see ``openai/_streaming.py``). + # ``httpx.Response.close()`` is idempotent (guarded by + # ``if not self.is_closed``), so a double-close — e.g., the + # listener fires AND the post-loop path explicitly closes — is + # harmless. + def _close_stream_on_abort() -> None: + try: + response = getattr(stream, "response", None) + if response is not None: + close = getattr(response, "close", None) + if callable(close): + close() + except Exception: + # Best-effort — never let close() propagate out of the + # listener thread. + pass + + abort_listener: Any = None + if abort_signal is not None: + # Register-then-recheck: see the Anthropic provider for the + # full race analysis. The TL;DR is that ``_fire`` snapshots + # the listener list before iterating, so a listener appended + # after that snapshot is silently dropped; the post-add + # ``aborted`` read closes the gap (signal state is sticky). + abort_listener = abort_signal.add_listener( + _close_stream_on_abort, once=True, + ) + if abort_signal.aborted: + _close_stream_on_abort() + + try: + for chunk in stream: + # In-loop abort check: even when the listener fires + # mid-stream, chunks already buffered by the SDK can + # still get yielded before the closed-socket raise lands. + # The in-loop check makes the abort observable on the + # very next chunk boundary regardless of buffering. + if abort_signal is not None and abort_signal.aborted: + break + + response_model = getattr(chunk, "model", response_model) + usage_candidate = getattr(chunk, "usage", None) + if usage_candidate is not None: + usage_obj = usage_candidate + + choices = getattr(chunk, "choices", None) or [] + if not choices: + continue + choice = choices[0] + if getattr(choice, "finish_reason", None): + finish_reason = choice.finish_reason - choices = getattr(chunk, "choices", None) or [] - if not choices: - continue - choice = choices[0] - if getattr(choice, "finish_reason", None): - finish_reason = choice.finish_reason + delta = getattr(choice, "delta", None) + if delta is None: + continue - delta = getattr(choice, "delta", None) - if delta is None: - continue + content_piece = getattr(delta, "content", None) + if content_piece: + piece = str(content_piece) + content_parts.append(piece) + if on_text_chunk is not None: + on_text_chunk(piece) + + reasoning_piece = getattr(delta, "reasoning_content", None) + if reasoning_piece: + reasoning_parts.append(str(reasoning_piece)) + + tool_call_deltas = getattr(delta, "tool_calls", None) or [] + for tc in tool_call_deltas: + idx = getattr(tc, "index", 0) + entry = tool_calls_by_index.setdefault(idx, {"id": "", "name": "", "arguments": ""}) + + tc_id = getattr(tc, "id", None) + if tc_id: + entry["id"] = str(tc_id) + + function = getattr(tc, "function", None) + if function is not None: + fn_name = getattr(function, "name", None) + if fn_name: + entry["name"] += str(fn_name) + fn_args = getattr(function, "arguments", None) + if fn_args: + entry["arguments"] += str(fn_args) + except Exception as streaming_exc: + # Abort path: the listener closed the underlying HTTP + # response, which raised on the SDK's next read in the + # consumer thread. Detect via signal state (not exception + # class — the OpenAI/httpx layer can raise several different + # exception types depending on which syscall was in flight). + if abort_signal is not None and getattr(abort_signal, "aborted", False): + raise AbortError( + getattr(abort_signal, "reason", None) or "user_interrupt" + ) from streaming_exc + raise + finally: + if abort_listener is not None and abort_signal is not None: + try: + abort_signal.remove_listener(abort_listener) + except Exception: + pass - content_piece = getattr(delta, "content", None) - if content_piece: - piece = str(content_piece) - content_parts.append(piece) - if on_text_chunk is not None: - on_text_chunk(piece) - - reasoning_piece = getattr(delta, "reasoning_content", None) - if reasoning_piece: - reasoning_parts.append(str(reasoning_piece)) - - tool_call_deltas = getattr(delta, "tool_calls", None) or [] - for tc in tool_call_deltas: - idx = getattr(tc, "index", 0) - entry = tool_calls_by_index.setdefault(idx, {"id": "", "name": "", "arguments": ""}) - - tc_id = getattr(tc, "id", None) - if tc_id: - entry["id"] = str(tc_id) - - function = getattr(tc, "function", None) - if function is not None: - fn_name = getattr(function, "name", None) - if fn_name: - entry["name"] += str(fn_name) - fn_args = getattr(function, "arguments", None) - if fn_args: - entry["arguments"] += str(fn_args) + # The stream may have completed naturally OR we broke out of + # the loop because the in-loop abort check fired. Surface the + # abort here so the caller bails at the same place every other + # path does. ``stream.close()`` after a clean exit is a no-op + # on httpx, so this stays safe. + if abort_signal is not None and getattr(abort_signal, "aborted", False): + _close_stream_on_abort() + raise AbortError( + getattr(abort_signal, "reason", None) or "user_interrupt" + ) tool_uses: list[dict[str, Any]] = [] for idx in sorted(tool_calls_by_index.keys()): diff --git a/tests/test_minimax_abort_signal.py b/tests/test_minimax_abort_signal.py new file mode 100644 index 0000000..9498067 --- /dev/null +++ b/tests/test_minimax_abort_signal.py @@ -0,0 +1,148 @@ +"""Regression tests for abort-signal-aware streaming in MinimaxProvider. + +Minimax goes through the anthropic SDK against ``api.minimaxi.com/anthropic``, +so the close-listener pattern is structurally identical to +``AnthropicProvider.chat_stream_response``. These tests pin the +contract end-to-end against a synthetic stream fake — Minimax-specific +network plumbing isn't exercised, but the abort wiring is what we +care about. +""" +from __future__ import annotations + +import threading +import time +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from src.providers.minimax_provider import MinimaxProvider +from src.utils.abort_controller import AbortController, AbortError + + +class _FakeMessageStream: + """Mimic the anthropic SDK's ``MessageStream`` context manager. + + Same surface ``AnthropicProvider.chat_stream_response`` reads: + ``text_stream`` iterator, ``response.close()`` for forced + teardown, ``get_final_message()`` for the post-stream + structured response. + """ + + def __init__(self, chunks: list[str], per_chunk_delay_s: float = 0.0): + self._chunks = list(chunks) + self._delay = per_chunk_delay_s + self._closed = threading.Event() + self.response = MagicMock() + self.response.close.side_effect = self._closed.set + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + @property + def text_stream(self): + for chunk in self._chunks: + if self._closed.is_set(): + raise ConnectionError("response closed mid-stream") + if self._delay > 0: + deadline = time.monotonic() + self._delay + while time.monotonic() < deadline: + if self._closed.is_set(): + raise ConnectionError("response closed mid-stream") + time.sleep(0.005) + yield chunk + + def get_final_message(self): + m = MagicMock() + m.content = [] + m.usage.input_tokens = 1 + m.usage.output_tokens = 1 + m.model = "MiniMax-M2.7" + m.stop_reason = "end_turn" + return m + + +def _provider_with_stream(stream) -> MinimaxProvider: + provider = MinimaxProvider(api_key="test", model="MiniMax-M2.7") + client = MagicMock() + client.messages.stream.return_value = stream + provider._ensure_client = lambda: client # type: ignore[method-assign] + return provider + + +def test_pre_aborted_signal_short_circuits_before_request() -> None: + """Already-tripped signal raises before the API round-trip.""" + controller = AbortController() + controller.abort("user_interrupt") + + provider = MinimaxProvider(api_key="test", model="MiniMax-M2.7") + provider._ensure_client = MagicMock( # type: ignore[method-assign] + side_effect=AssertionError( + "_ensure_client should not be called when abort is pre-tripped" + ) + ) + + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + provider._ensure_client.assert_not_called() + + +def test_mid_stream_abort_closes_stream_and_raises_abort_error() -> None: + """ESC mid-stream → ``response.close()`` → iterator raises → ``AbortError``.""" + controller = AbortController() + stream = _FakeMessageStream(["a", "b", "c", "d", "e"], per_chunk_delay_s=0.10) + provider = _provider_with_stream(stream) + + def _trip_after_first_chunk() -> None: + time.sleep(0.15) + controller.abort("user_interrupt") + + threading.Thread(target=_trip_after_first_chunk, daemon=True).start() + + start = time.monotonic() + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + elapsed = time.monotonic() - start + assert elapsed < 1.0, f"abort took {elapsed:.2f}s — expected <1s" + stream.response.close.assert_called() + + +def test_uncancelled_stream_returns_normally() -> None: + """Never-tripped signal preserves existing streaming semantics.""" + controller = AbortController() + stream = _FakeMessageStream(["hello ", "world"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + seen: list[str] = [] + response = provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + on_text_chunk=lambda c: seen.append(c), + abort_signal=controller.signal, + ) + + assert seen == ["hello ", "world"] + stream.response.close.assert_not_called() + assert response is not None + + +def test_listener_detached_after_normal_completion() -> None: + """The listener must be removed after a clean stream exit.""" + controller = AbortController() + stream = _FakeMessageStream(["ok"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + + assert controller.signal._listeners == [] diff --git a/tests/test_openai_compat_abort_signal.py b/tests/test_openai_compat_abort_signal.py new file mode 100644 index 0000000..4a5ac68 --- /dev/null +++ b/tests/test_openai_compat_abort_signal.py @@ -0,0 +1,261 @@ +"""Regression tests for abort-signal-aware streaming in OpenAI-compatible providers. + +Before this fix, ``OpenAICompatibleProvider.chat_stream_response`` only +had the pre-call fast-path from PR #144 — no mid-stream cancellation. +Users on LiteLLM / GLM / OpenAI / DeepSeek hitting ESC during a model +turn waited the full model latency before the outer query loop's +abort check fired, same 20+ second symptom this whole effort started +with on the Anthropic path. + +This module pins the new behavior. We don't exercise the real OpenAI +SDK — a synthetic stream fake mimics the surface the provider reads +(``response.close()`` plus a slow-yielding iterator). +""" +from __future__ import annotations + +import threading +import time +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from src.providers.openai_compatible import OpenAICompatibleProvider +from src.utils.abort_controller import AbortController, AbortError + + +class _FakeChoice: + def __init__(self, content: str = "", finish_reason: str | None = None): + self.delta = MagicMock() + self.delta.content = content + self.delta.reasoning_content = None + self.delta.tool_calls = [] + self.finish_reason = finish_reason + + +class _FakeChunk: + def __init__(self, content: str = "", finish_reason: str | None = None): + self.model = "test-model" + self.usage = None + self.choices = [_FakeChoice(content=content, finish_reason=finish_reason)] + + +class _FakeStream: + """Mimic the OpenAI SDK's Stream object. + + Yields chunks with a configurable delay between them so the test + can fire abort mid-iteration. ``response.close()`` sets an event + the iterator polls — same mechanism the real httpx stream uses + (closed socket raises on the next read). + """ + + def __init__(self, chunks: list[str], per_chunk_delay_s: float = 0.0): + self._chunks = list(chunks) + self._delay = per_chunk_delay_s + self._closed = threading.Event() + self.response = MagicMock() + self.response.close.side_effect = self._closed.set + + def __iter__(self): + for content in self._chunks: + if self._closed.is_set(): + raise ConnectionError("stream response closed") + if self._delay > 0: + # Poll the closed event in small slices so a close() + # landing from another thread interrupts within ~5ms. + deadline = time.monotonic() + self._delay + while time.monotonic() < deadline: + if self._closed.is_set(): + raise ConnectionError("stream response closed") + time.sleep(0.005) + yield _FakeChunk(content=content) + yield _FakeChunk(finish_reason="stop") + + +class _ConcreteOpenAIProvider(OpenAICompatibleProvider): + """Concrete subclass for testing — the base class is abstract.""" + + def _create_client(self) -> Any: + return MagicMock() + + def get_available_models(self) -> list[str]: + return ["test-model"] + + +def _provider_with_stream(stream) -> _ConcreteOpenAIProvider: + provider = _ConcreteOpenAIProvider(api_key="test", model="test-model") + client = MagicMock() + client.chat.completions.create.return_value = stream + provider._client = client # bypass the lazy create + return provider + + +def test_pre_aborted_signal_short_circuits_before_request() -> None: + """Already-tripped signal raises before the API round-trip.""" + controller = AbortController() + controller.abort("user_interrupt") + + provider = _ConcreteOpenAIProvider(api_key="test", model="test-model") + # The leaf method is what the provider actually calls; setting + # ``side_effect`` on the root mock never fires (root is accessed + # via attribute chain, not invoked). Setting it on the leaf gives + # us a real failure sentinel if the fast-path regresses. + fake_client = MagicMock() + fake_client.chat.completions.create.side_effect = AssertionError( + "client.chat.completions.create should not be invoked when abort is pre-tripped" + ) + provider._client = fake_client + + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + # Belt and suspenders: explicit assertion that the leaf was never called. + fake_client.chat.completions.create.assert_not_called() + + +def test_mid_stream_abort_closes_stream_and_raises_abort_error() -> None: + """ESC mid-stream → ``response.close()`` → iterator raises → ``AbortError``. + + Same contract as the Anthropic test in + ``test_provider_abort_signal.py``: an abort that fires while the + SDK is blocked on the next chunk forces a close and raises + ``AbortError`` within the poll cadence — orders of magnitude + faster than the model's natural generation time. + """ + controller = AbortController() + # 5 chunks with 100ms delay each → 500ms total without abort. + stream = _FakeStream(["a", "b", "c", "d", "e"], per_chunk_delay_s=0.10) + provider = _provider_with_stream(stream) + + def _trip_after_first_chunk() -> None: + # Sleep long enough that the provider has entered the stream + # context and pulled at least one chunk, then trip the abort. + time.sleep(0.15) + controller.abort("user_interrupt") + + threading.Thread(target=_trip_after_first_chunk, daemon=True).start() + + start = time.monotonic() + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + elapsed = time.monotonic() - start + # Without the fix this would have completed the full 500ms stream + # before the next iteration's in-loop check OR the listener's + # close took effect. With the fix the listener fires ~150ms in + # and the iterator raises within one chunk-delay slice. + assert elapsed < 1.0, f"abort took {elapsed:.2f}s — expected <1s" + # The listener must have actually invoked response.close(). + stream.response.close.assert_called() + + +def test_in_loop_check_catches_abort_between_chunks() -> None: + """Even with no chunk delay, the in-loop check observes abort. + + Models the case where chunks arrive back-to-back fast enough that + the listener's close lands one iteration late (or that the model + finished generating before the user pressed ESC, so the listener's + socket close has nothing to interrupt). The in-loop check at the + top of each ``for chunk in stream`` iteration must observe the + abort and break out — otherwise the consumer would burn through + whatever's left in the SDK's prefetch buffer. + + Load-bearing assertion: ``on_text_chunk`` is called with + ``"first"`` but NOT with ``"second"``. Without this content check, + the test would still pass even if the in-loop check were deleted: + the second chunk would be consumed, the iterator would exit + naturally, and the post-loop ``if abort_signal.aborted`` check + would still raise ``AbortError``. The Critic pointed this out — + asserting on the chunk callback list is what actually pins the + in-loop defense. + """ + controller = AbortController() + + class _TrippingStream: + """Stream whose first yielded chunk trips the controller. + + ``__iter__`` is defined at class scope because Python looks + up iteration protocol on the type, not the instance — an + instance-level ``__iter__`` is silently ignored. + """ + + def __init__(self) -> None: + self.response = MagicMock() + + def __iter__(self): + yield _FakeChunk(content="first") + controller.abort("user_interrupt") + yield _FakeChunk(content="second") + yield _FakeChunk(finish_reason="stop") + + stream = _TrippingStream() + provider = _provider_with_stream(stream) + + seen: list[str] = [] + with pytest.raises(AbortError): + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + on_text_chunk=lambda c: seen.append(c), + abort_signal=controller.signal, + ) + + # The in-loop check at the top of iteration 2 must observe the + # abort and break BEFORE consuming "second". If this assertion + # fails after the in-loop check is removed, we have a regression + # back to the original "ESC waits for the model to finish + # generating" behaviour. + assert seen == ["first"], f"in-loop check leaked second chunk: {seen}" + + +def test_uncancelled_stream_returns_normally() -> None: + """Never-tripped signal preserves existing streaming semantics.""" + controller = AbortController() # never aborted + stream = _FakeStream(["hello ", "world"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + seen: list[str] = [] + response = provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + on_text_chunk=lambda c: seen.append(c), + abort_signal=controller.signal, + ) + + assert seen == ["hello ", "world"] + stream.response.close.assert_not_called() + assert response.content == "hello world" + + +def test_no_abort_signal_param_preserves_legacy_callers() -> None: + """``abort_signal=None`` default keeps the iterator working unchanged.""" + stream = _FakeStream(["ok"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + response = provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + ) + assert response.content == "ok" + stream.response.close.assert_not_called() + + +def test_listener_detached_after_normal_completion() -> None: + """The abort listener must not pin the provider alive across calls. + + Long-lived AbortControllers (the REPL engine's, reused across many + turns) would otherwise accumulate one dead listener per streaming + call, and each abort would invoke N stream-close callbacks against + long-gone streams. + """ + controller = AbortController() + stream = _FakeStream(["ok"], per_chunk_delay_s=0.0) + provider = _provider_with_stream(stream) + + provider.chat_stream_response( + messages=[{"role": "user", "content": "hi"}], + abort_signal=controller.signal, + ) + + assert controller.signal._listeners == []