Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ updates:
open-pull-requests-limit: 5
commit-message:
prefix: "deps"
# Hold aiohttp below 3.14: it added a required ClientResponse.stream_writer
# kwarg that breaks aioresponses 0.7.8 (latest), failing the external/fetch
# test suites. Drop this ignore (and lift the pyproject cap) once
# aioresponses ships a 3.14-compatible release.
ignore:
- dependency-name: "aiohttp"
versions: [">=3.14"]
# Roll all non-breaking bumps into a single weekly PR to keep noise down;
# major bumps still open as individual PRs so they get reviewed on their own.
groups:
Expand Down
14 changes: 6 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ http = ["falcon>=3.0", "httpx>=0.24", "waitress>=2.0", "zstandard>=0.20", "pynac
s3 = ["boto3>=1.28"]
gcs = ["google-cloud-storage>=2.10"]
cli = ["typer>=0.9", "httpx>=0.24", "filelock>=3.13"]
# aiohttp <3.14: 3.14 added a required ClientResponse.stream_writer kwarg that
# breaks aioresponses 0.7.8 (latest); lift the cap once aioresponses catches up.
external = ["aiohttp>=3.9,<3.14", "tenacity>=8.0", "zstandard>=0.20"]
# aiohttp >=3.14.1 picks up a batch of client-side security fixes (CVE-2026-
# 34993, -47265, -502xx, -542xx). The old <3.14 cap existed only because
# aioresponses broke on 3.14's ClientResponse.stream_writer kwarg; the test
# suite has since migrated to aiointercept, so the cap is lifted.
external = ["aiohttp>=3.14.1", "tenacity>=8.0", "zstandard>=0.20"]
otel = ["opentelemetry-api>=1.20", "opentelemetry-sdk>=1.20"]
oauth = ["joserfc>=1.0"]
mtls = ["cryptography>=41.0"]
Expand Down Expand Up @@ -102,10 +104,6 @@ ignore_missing_imports = true
module = "moto.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "aioresponses.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "google.cloud.*"
ignore_missing_imports = true
Expand Down Expand Up @@ -207,7 +205,7 @@ docs = [
"pymdown-extensions>=10.21.2",
]
dev = [
"aioresponses>=0.7",
"aiointercept>=0.1.7",
"joserfc>=1.0",
"boto3>=1.28",
"falcon>=3.0",
Expand Down
88 changes: 88 additions & 0 deletions tests/_aiomock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# © Copyright 2025-2026, Query.Farm LLC - https://query.farm
# SPDX-License-Identifier: Apache-2.0

"""Synchronous adapter around :mod:`aiointercept` for the aiohttp test suites.

The fetch/external tests are synchronous (``def test_...``) and drive
:func:`vgi_rpc.external_fetch.fetch_url`, which runs aiohttp on its own daemon
thread and event loop. :mod:`aiointercept` is an *async* context manager that
boots a real localhost test server and patches aiohttp's DNS resolver at the
class level so requests to any registered host are transparently intercepted
(``mock_external_urls=True``) — exactly what is needed to mock the presigned
URLs the fetcher downloads.

:func:`mock_aiohttp` bridges the two: it drives ``aiointercept``'s async
``start``/``stop`` from synchronous test code on a transient event loop. The
class-level patches it installs are global, so they intercept the fetcher's
daemon-thread requests too. Registered callbacks are synchronous here, so the
transient loop used for startup never needs to outlive ``start()``.

This replaces ``aioresponses`` (which broke on aiohttp >=3.14); the registration
surface (``get``/``head``/``post``/...; ``CallbackResult``; ``callback(url, *,
headers, query, json, data)``) is API-compatible, so test bodies are unchanged.
"""

from __future__ import annotations

import asyncio
import threading
from collections.abc import Coroutine, Iterator
from contextlib import contextmanager
from typing import Any

from aiointercept import CallbackResult, aiointercept

__all__ = ["CallbackResult", "aiointercept", "mock_aiohttp"]


def _run_sync(coro: Coroutine[Any, Any, None]) -> None:
"""Run *coro* to completion on a throwaway loop in a dedicated thread.

A dedicated thread keeps this safe whether or not the *calling* thread
already has a running event loop — some tests drive ``fetch_url`` from
inside their own loop (e.g. the nested-event-loop case), where a plain
:func:`asyncio.run` would raise ``RuntimeError``. ``aiointercept`` runs its
test server on its own daemon thread, so this loop only needs to live long
enough to execute ``start``/``stop``; the registered callbacks here are
synchronous and never reach back to it.

Args:
coro: The ``start()`` or ``stop()`` coroutine to drive.

"""
error: list[BaseException] = []

def _target() -> None:
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(coro)
except BaseException as exc: # surface failures to the caller thread
error.append(exc)
finally:
loop.close()

thread = threading.Thread(target=_target, name="aiomock-lifecycle")
thread.start()
thread.join()
if error:
raise error[0]


@contextmanager
def mock_aiohttp() -> Iterator[aiointercept]:
"""Yield an :class:`aiointercept.aiointercept` mock, driven synchronously.

Intercepts requests to any registered host (``mock_external_urls=True``) so
the daemon-thread fetcher's HTTPS requests to presigned URLs are mocked.

Yields:
The started ``aiointercept`` instance; register handlers on it with
``.get()`` / ``.head()`` / ``.post()`` and friends.

"""
mock = aiointercept(mock_external_urls=True)
_run_sync(mock.start())
try:
yield mock
finally:
_run_sync(mock.stop())
50 changes: 25 additions & 25 deletions tests/test_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import pyarrow.compute as pc
import pytest
import zstandard
from aioresponses import CallbackResult
from aioresponses import aioresponses as aioresponses_ctx
from pyarrow import ipc

from tests._aiomock import CallbackResult, aiointercept
from tests._aiomock import mock_aiohttp as aiointercept_ctx
from vgi_rpc.external import (
Compression,
ExternalLocationConfig,
Expand Down Expand Up @@ -66,7 +66,7 @@
class MockStorage(ExternalStorage):
"""In-memory ExternalStorage for testing.

Uses ``https://`` URLs so aioresponses can intercept them.
Uses ``https://`` URLs so aiointercept can intercept them.
"""

def __init__(self) -> None:
Expand Down Expand Up @@ -95,10 +95,10 @@ def generate_upload_url(self, schema: pa.Schema) -> UploadUrl:


@contextmanager
def _mock_aio(storage: MockStorage, *, content_encoding: str | None = None) -> Iterator[aioresponses_ctx]:
"""Context manager that registers all MockStorage URLs in aioresponses."""
def _mock_aio(storage: MockStorage, *, content_encoding: str | None = None) -> Iterator[aiointercept]:
"""Context manager that registers all MockStorage URLs in aiointercept."""
enc = content_encoding or storage.last_content_encoding
with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
for url, body in storage.data.items():
head_headers: dict[str, str] = {"Content-Length": str(len(body))}
get_headers: dict[str, str] = {"Content-Length": str(len(body))}
Expand Down Expand Up @@ -352,7 +352,7 @@ def test_retry_success(self) -> None:

pointer, cm = make_external_location_batch(_SCHEMA, url)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
# First attempt: HEAD ok, GET fails; second attempt: HEAD ok, GET succeeds
mock.head(url, headers={"Content-Length": str(len(ipc_bytes))})
mock.get(url, exception=aiohttp.ClientConnectionError("transient failure"))
Expand All @@ -371,7 +371,7 @@ def test_all_retries_fail(self) -> None:
pointer, cm = make_external_location_batch(_SCHEMA, url)

with (
aioresponses_ctx() as mock,
aiointercept_ctx() as mock,
pytest.raises(RuntimeError, match="Failed to resolve"),
):
# Both attempts: HEAD fails with connection error
Expand Down Expand Up @@ -1039,7 +1039,7 @@ def bidi_large(self, factor: float) -> Stream[_LargeBidiState]:
)


def _mock_aio_dynamic(storage: MockStorage, mock: aioresponses_ctx, *, content_encoding: str | None = None) -> None:
def _mock_aio_dynamic(storage: MockStorage, mock: aiointercept, *, content_encoding: str | None = None) -> None:
"""Register pattern-based HEAD + GET callbacks that serve from MockStorage dynamically."""
pattern = re.compile(r"^https://mock\.storage/.*$")
enc = content_encoding or storage.last_content_encoding
Expand Down Expand Up @@ -1087,7 +1087,7 @@ def test_unary_large_externalized(self) -> None:
storage = MockStorage()
config = self._make_config(storage, threshold=10)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock)
with serve_pipe(
_ExternalService,
Expand Down Expand Up @@ -1121,7 +1121,7 @@ def test_server_stream_large_externalized(self) -> None:

received_logs: list[Message] = []

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock)
with serve_pipe(
_ExternalService,
Expand All @@ -1145,7 +1145,7 @@ def test_server_stream_logs_from_external(self) -> None:

received_logs: list[Message] = []

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock)
with serve_pipe(
_ExternalService,
Expand All @@ -1165,7 +1165,7 @@ def test_bidi_large_output_externalized(self) -> None:
storage = MockStorage()
config = self._make_config(storage, threshold=100)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock)
with serve_pipe(
_ExternalService,
Expand All @@ -1189,7 +1189,7 @@ def test_bidi_large_input_externalized(self) -> None:
storage = MockStorage()
config = self._make_config(storage, threshold=100)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock)
with serve_pipe(
_ExternalService,
Expand Down Expand Up @@ -1328,7 +1328,7 @@ def test_bidi_storage_none(self) -> None:
class TestS3Storage:
"""Tests for S3Storage with moto mock.

Uses moto's mock_aws for S3 operations and aioresponses to intercept
Uses moto's mock_aws for S3 operations and aiointercept to intercept
fetches, reading back from moto's mock S3.
"""

Expand Down Expand Up @@ -1373,7 +1373,7 @@ def test_full_roundtrip(self, _s3_env: tuple[Any, Any]) -> None:
externalize_threshold_bytes=10,
)

# Helper to read objects from moto's mock S3 for aioresponses
# Helper to read objects from moto's mock S3 for aiointercept
def _s3_callback(url_: Any, **kwargs: Any) -> CallbackResult:
parsed = urlparse(str(url_))
key = parsed.path.lstrip("/")
Expand All @@ -1388,7 +1388,7 @@ def _s3_head_callback(url_: Any, **kwargs: Any) -> CallbackResult:
body: bytes = resp["Body"].read()
return CallbackResult(status=200, headers={"Content-Length": str(len(body))})

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
pattern = re.compile(r"^https://.*test-bucket.*$")
for _ in range(10):
mock.head(pattern, callback=_s3_head_callback)
Expand Down Expand Up @@ -1485,7 +1485,7 @@ def test_upload_and_signed_url(self, _gcs_mocks: tuple[MagicMock, MagicMock, Mag
assert call_kwargs.kwargs["method"] == "GET"

def test_full_roundtrip(self, _gcs_mocks: tuple[MagicMock, MagicMock, MagicMock, MagicMock]) -> None:
"""Full round-trip with mocked GCS + aioresponses."""
"""Full round-trip with mocked GCS + aiointercept."""
from vgi_rpc.gcs import GCSStorage

_, _, _, mock_blob = _gcs_mocks
Expand All @@ -1507,7 +1507,7 @@ def capture_upload(data: bytes, content_type: str = "") -> None:
assert ext_batch.num_rows == 0
assert ext_cm is not None

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
body = uploaded["data"]
mock.head(
"https://storage.googleapis.com/test",
Expand Down Expand Up @@ -1620,7 +1620,7 @@ def test_basic_resolution_with_fetch_config(self) -> None:

pointer, cm = make_external_location_batch(_SCHEMA, url)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
mock.head(url, headers={"Content-Length": str(len(ipc_bytes))})
mock.get(url, body=ipc_bytes, headers={"Content-Length": str(len(ipc_bytes))})

Expand Down Expand Up @@ -1669,7 +1669,7 @@ def _range_callback(url_: Any, **kwargs: Any) -> CallbackResult:
)
return CallbackResult(status=200, body=ipc_bytes)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
mock.head(
url,
headers={"Content-Length": str(len(ipc_bytes)), "Accept-Ranges": "bytes"},
Expand Down Expand Up @@ -1858,7 +1858,7 @@ def test_unary_large_compressed(self) -> None:
storage = MockStorage()
config = self._make_config(storage, threshold=10)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock, content_encoding="zstd")
with serve_pipe(
_ExternalService,
Expand All @@ -1880,7 +1880,7 @@ def test_server_stream_compressed(self) -> None:

received_logs: list[Message] = []

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock, content_encoding="zstd")
with serve_pipe(
_ExternalService,
Expand All @@ -1900,7 +1900,7 @@ def test_bidi_compressed(self) -> None:
storage = MockStorage()
config = self._make_config(storage, threshold=100)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock, content_encoding="zstd")
with serve_pipe(
_ExternalService,
Expand Down Expand Up @@ -1984,7 +1984,7 @@ def test_large_header_externalized(self) -> None:
storage = MockStorage()
config = self._make_config(storage, threshold=10)

with aioresponses_ctx() as mock:
with aiointercept_ctx() as mock:
_mock_aio_dynamic(storage, mock)
with serve_pipe(
_HeaderExternalService,
Expand Down
Loading