From 3a5a2a65348d1a0b0e7dc44c5586de17cf3e0754 Mon Sep 17 00:00:00 2001 From: Abhimanyu Siwach Date: Sat, 21 Feb 2026 02:03:48 -0800 Subject: [PATCH] feat: isolate async handler execution on dedicated worker event loop Async handlers that contain blocking calls (e.g. time.sleep, synchronous HTTP requests) previously ran on the main uvicorn event loop, freezing /ping health checks and causing container termination. This introduces a dedicated persistent worker event loop in a background thread that isolates all handler execution from the main loop. Three-way handler dispatch: - Async generators: bridged to sync generators via queue.Queue on worker loop - Regular async: run_coroutine_threadsafe + wrap_future on worker loop - Sync: run_in_threadpool (starlette thread pool) Context propagation uses contextvars.copy_context() with the Django asgiref _restore_context pattern for Python 3.10+ compatibility. --- src/bedrock_agentcore/runtime/app.py | 122 +++++- tests/bedrock_agentcore/runtime/test_app.py | 414 +++++++++++++++++++- 2 files changed, 523 insertions(+), 13 deletions(-) diff --git a/src/bedrock_agentcore/runtime/app.py b/src/bedrock_agentcore/runtime/app.py index d575426..1aa6ca5 100644 --- a/src/bedrock_agentcore/runtime/app.py +++ b/src/bedrock_agentcore/runtime/app.py @@ -5,9 +5,11 @@ import asyncio import contextvars +import functools import inspect import json import logging +import queue import threading import time import uuid @@ -15,6 +17,7 @@ from typing import Any, Callable, Dict, Optional from starlette.applications import Starlette +from starlette.concurrency import run_in_threadpool from starlette.middleware import Middleware from starlette.responses import JSONResponse, Response, StreamingResponse from starlette.routing import Route, WebSocketRoute @@ -39,6 +42,30 @@ from .utils import convert_complex_objects +def _is_async_callable(obj: Any) -> bool: + """Check if obj is async-callable, unwrapping functools.partial.""" + while isinstance(obj, functools.partial): + obj = obj.func + return asyncio.iscoroutinefunction(obj) or (callable(obj) and asyncio.iscoroutinefunction(obj.__call__)) + + +def _is_async_gen_callable(obj: Any) -> bool: + """Check if obj is an async generator function, unwrapping functools.partial.""" + while isinstance(obj, functools.partial): + obj = obj.func + return inspect.isasyncgenfunction(obj) or (callable(obj) and inspect.isasyncgenfunction(obj.__call__)) + + +def _restore_context(ctx: contextvars.Context) -> None: + """Restore context variables from a snapshot (Django asgiref pattern).""" + for var, value in ctx.items(): + try: + if var.get() != value: + var.set(value) + except LookupError: + var.set(value) + + class RequestContextFormatter(logging.Formatter): """Formatter including request and session IDs.""" @@ -96,6 +123,9 @@ def __init__( self._task_counter_lock: threading.Lock = threading.Lock() self._forced_ping_status: Optional[PingStatus] = None self._last_status_update_time: float = time.time() + self._worker_loop: Optional[asyncio.AbstractEventLoop] = None + self._worker_thread: Optional[threading.Thread] = None + self._worker_loop_lock: threading.Lock = threading.Lock() routes = [ Route("/invocations", self._handle_invocation, methods=["POST"]), @@ -163,7 +193,7 @@ def async_task(self, func: Callable) -> Callable: - Set ping status to HEALTHY_BUSY while running - Revert to HEALTHY when complete """ - if not asyncio.iscoroutinefunction(func): + if not _is_async_callable(func): raise ValueError("@async_task can only be applied to async functions") async def wrapper(*args, **kwargs): @@ -463,16 +493,92 @@ def run(self, port: int = 8080, host: Optional[str] = None, **kwargs): uvicorn.run(self, **uvicorn_params) - async def _invoke_handler(self, handler, request_context, takes_context, payload): + def _ensure_worker_loop(self) -> asyncio.AbstractEventLoop: + """Lazily create and start a dedicated worker event loop in a background thread. + + The worker loop isolates async handler execution from the main event loop, + ensuring that blocking async handlers do not prevent /ping from responding. + """ + if self._worker_loop is not None and self._worker_loop.is_running(): + return self._worker_loop + with self._worker_loop_lock: + if self._worker_loop is None or not self._worker_loop.is_running(): + self._worker_loop = asyncio.new_event_loop() + self._worker_thread = threading.Thread( + target=self._run_worker_loop, + daemon=True, + name="agentcore-worker-loop", + ) + self._worker_thread.start() + return self._worker_loop + + def _run_worker_loop(self) -> None: + """Entry point for the worker loop background thread.""" + asyncio.set_event_loop(self._worker_loop) + self._worker_loop.run_forever() + + @staticmethod + async def _run_with_context(coro: Any, ctx: contextvars.Context) -> Any: + """Run a coroutine after restoring context variables from a snapshot.""" + _restore_context(ctx) + return await coro + + def _async_gen_to_sync_gen(self, async_gen: Any, ctx: contextvars.Context) -> Any: + """Bridge an async generator through the worker loop as a sync generator. + + The async generator is iterated on the worker loop. Chunks are sent to + a thread-safe queue and yielded synchronously. Starlette's StreamingResponse + iterates this sync generator via iterate_in_threadpool, so the main event + loop is never blocked. + """ + worker_loop = self._ensure_worker_loop() + q: queue.Queue = queue.Queue(maxsize=100) + _DONE = object() + + async def _produce() -> None: + _restore_context(ctx) + try: + async for chunk in async_gen: + q.put((True, chunk)) + q.put((True, _DONE)) + except BaseException as e: + q.put((False, e)) + + worker_loop.call_soon_threadsafe(lambda: worker_loop.create_task(_produce())) + + while True: + ok, value = q.get() + if not ok: + raise value + if value is _DONE: + break + yield value + + async def _invoke_handler(self, handler: Callable, request_context: Any, takes_context: bool, payload: Any) -> Any: + """Dispatch handler execution based on handler type. + + - Async generator functions: bridged through the worker loop as a sync generator + - Regular async functions: run on the dedicated worker event loop + - Sync functions (including sync generators): run in the thread pool + + This ensures the main event loop stays responsive for /ping health checks + regardless of whether handlers contain blocking operations. + """ try: args = (payload, request_context) if takes_context else (payload,) - - if asyncio.iscoroutinefunction(handler): - return await handler(*args) + ctx = contextvars.copy_context() + + if _is_async_gen_callable(handler): + return self._async_gen_to_sync_gen(handler(*args), ctx) + elif _is_async_callable(handler): + worker_loop = self._ensure_worker_loop() + future = asyncio.run_coroutine_threadsafe(self._run_with_context(handler(*args), ctx), worker_loop) + result = await asyncio.wrap_future(future) + if inspect.isasyncgen(result): + return self._async_gen_to_sync_gen(result, ctx) + return result else: - loop = asyncio.get_event_loop() - ctx = contextvars.copy_context() - return await loop.run_in_executor(None, ctx.run, handler, *args) + return await run_in_threadpool(ctx.run, handler, *args) except Exception: handler_name = getattr(handler, "__name__", "unknown") self.logger.debug("Handler '%s' execution failed", handler_name) diff --git a/tests/bedrock_agentcore/runtime/test_app.py b/tests/bedrock_agentcore/runtime/test_app.py index 9266776..45ca6c3 100644 --- a/tests/bedrock_agentcore/runtime/test_app.py +++ b/tests/bedrock_agentcore/runtime/test_app.py @@ -465,8 +465,8 @@ def handler(payload): # Removed: No more 503 responses since we removed concurrency limits @pytest.mark.asyncio - async def test_async_handler_runs_in_event_loop(self): - """Test async handlers run in main event loop, not thread pool.""" + async def test_async_handler_runs_on_worker_loop(self): + """Test async handlers run on the dedicated worker loop, not main event loop.""" app = BedrockAgentCoreApp() # Track which thread the handler runs in @@ -488,9 +488,8 @@ async def handler(payload): result = await app._invoke_handler(handler, context, False, {}) assert result == {"async": True} - # Async handler should run in main thread - assert handler_thread_id == threading.current_thread().ident - # No executor needed for async handlers + # Async handler should run on worker thread, NOT main thread + assert handler_thread_id != threading.current_thread().ident @pytest.mark.asyncio async def test_sync_handler_runs_in_thread_pool(self): @@ -2148,3 +2147,408 @@ async def handler(websocket, context): assert chunks[0] == {"chunk": 0, "data": "chunk_0"} assert chunks[4] == {"chunk": 4, "data": "chunk_4"} assert final == {"done": True} + + +class TestWorkerLoopInvocation: + """Test the dedicated worker event loop for async handler isolation.""" + + @pytest.mark.asyncio + async def test_async_handler_runs_on_worker_thread(self): + """Test async handlers run on worker loop thread, not main thread.""" + app = BedrockAgentCoreApp() + handler_thread_id = None + handler_loop_id = None + + @app.entrypoint + async def handler(payload): + nonlocal handler_thread_id, handler_loop_id + handler_thread_id = threading.current_thread().ident + handler_loop_id = id(asyncio.get_running_loop()) + return {"ok": True} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + result = await app._invoke_handler(handler, context, False, {}) + + assert result == {"ok": True} + assert handler_thread_id != threading.current_thread().ident + assert handler_loop_id != id(asyncio.get_running_loop()) + + @pytest.mark.asyncio + async def test_sync_handler_runs_in_thread_pool_not_worker(self): + """Test sync handlers still run in the thread pool, not the worker loop.""" + app = BedrockAgentCoreApp() + handler_thread_id = None + + @app.entrypoint + def handler(payload): + nonlocal handler_thread_id + handler_thread_id = threading.current_thread().ident + return {"sync": True} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + result = await app._invoke_handler(handler, context, False, {}) + + assert result == {"sync": True} + assert handler_thread_id != threading.current_thread().ident + # Worker loop should NOT have been created for sync-only handlers + assert app._worker_loop is None + + @pytest.mark.asyncio + async def test_blocking_async_handler_does_not_block_ping(self): + """Test that a blocking async handler does not block /ping responses.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + time.sleep(2) # Blocking call inside async handler + return {"done": True} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + + # Start the blocking invocation + invocation_task = asyncio.create_task(app._invoke_handler(handler, context, False, {})) + await asyncio.sleep(0.2) # Let invocation start on worker loop + + # Ping should respond instantly — it runs on the main event loop + ping_start = time.time() + status = app.get_current_ping_status() + ping_duration = time.time() - ping_start + + assert ping_duration < 0.5 # Ping responds in well under 1 second + assert status.value in ("Healthy", "HealthyBusy") + + await invocation_task # Clean up + + @pytest.mark.asyncio + async def test_context_propagation_to_async_handler(self): + """Test context vars propagate from main loop to worker loop.""" + app = BedrockAgentCoreApp() + captured_request_id = None + captured_session_id = None + captured_token = None + + from bedrock_agentcore.runtime.context import BedrockAgentCoreContext, RequestContext + + @app.entrypoint + async def handler(payload): + nonlocal captured_request_id, captured_session_id, captured_token + captured_request_id = BedrockAgentCoreContext.get_request_id() + captured_session_id = BedrockAgentCoreContext.get_session_id() + captured_token = BedrockAgentCoreContext.get_workload_access_token() + return {"ok": True} + + # Set context vars (simulating _build_request_context) + BedrockAgentCoreContext.set_request_context("req-123", "sess-456") + BedrockAgentCoreContext.set_workload_access_token("token-789") + + context = RequestContext(session_id="sess-456") + await app._invoke_handler(handler, context, False, {}) + + assert captured_request_id == "req-123" + assert captured_session_id == "sess-456" + assert captured_token == "token-789" + + @pytest.mark.asyncio + async def test_context_propagation_to_sync_handler(self): + """Test context vars propagate to sync handlers in the thread pool.""" + app = BedrockAgentCoreApp() + captured_request_id = None + + from bedrock_agentcore.runtime.context import BedrockAgentCoreContext, RequestContext + + @app.entrypoint + def handler(payload): + nonlocal captured_request_id + captured_request_id = BedrockAgentCoreContext.get_request_id() + return {"ok": True} + + BedrockAgentCoreContext.set_request_context("sync-req-001", None) + + context = RequestContext(session_id=None) + await app._invoke_handler(handler, context, False, {}) + + assert captured_request_id == "sync-req-001" + + @pytest.mark.asyncio + async def test_async_gen_handler_bridged_as_sync_gen(self): + """Test async generator handlers are bridged as sync generators.""" + import inspect as inspect_mod + + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + yield {"chunk": 1} + yield {"chunk": 2} + yield {"chunk": 3} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + result = await app._invoke_handler(handler, context, False, {}) + + # Result should be a sync generator (bridged from async gen) + assert inspect_mod.isgenerator(result) + + chunks = list(result) + assert chunks == [{"chunk": 1}, {"chunk": 2}, {"chunk": 3}] + + @pytest.mark.asyncio + async def test_async_gen_context_propagation(self): + """Test context vars propagate through async generator bridge.""" + app = BedrockAgentCoreApp() + + from bedrock_agentcore.runtime.context import BedrockAgentCoreContext, RequestContext + + @app.entrypoint + async def handler(payload): + rid = BedrockAgentCoreContext.get_request_id() + yield {"request_id": rid} + + BedrockAgentCoreContext.set_request_context("gen-req-001", None) + + context = RequestContext(session_id=None) + result = await app._invoke_handler(handler, context, False, {}) + chunks = list(result) + + assert chunks == [{"request_id": "gen-req-001"}] + + @pytest.mark.asyncio + async def test_fire_and_forget_task_survives_handler_return(self): + """Test background tasks via create_task survive handler return on worker loop.""" + app = BedrockAgentCoreApp() + background_completed = threading.Event() + + @app.async_task + async def background_work(): + await asyncio.sleep(0.3) + background_completed.set() + + @app.entrypoint + async def handler(payload): + asyncio.create_task(background_work()) + return {"status": "accepted"} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + result = await app._invoke_handler(handler, context, False, {}) + assert result == {"status": "accepted"} + + # Handler returned but background task should still be running + assert len(app._active_tasks) == 1 + + # Wait for background task to complete + background_completed.wait(timeout=3.0) + assert background_completed.is_set() + + # Give the finally block time to clean up + await asyncio.sleep(0.2) + assert len(app._active_tasks) == 0 + + @pytest.mark.asyncio + async def test_async_handler_exception_propagates(self): + """Test exceptions from async handlers on worker loop propagate correctly.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + raise ValueError("worker loop error") + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + with pytest.raises(ValueError, match="worker loop error"): + await app._invoke_handler(handler, context, False, {}) + + @pytest.mark.asyncio + async def test_worker_loop_lazy_initialization(self): + """Test worker loop is created lazily on first async handler invocation.""" + app = BedrockAgentCoreApp() + assert app._worker_loop is None + + @app.entrypoint + async def handler(payload): + return {"ok": True} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + await app._invoke_handler(handler, context, False, {}) + + assert app._worker_loop is not None + assert app._worker_loop.is_running() + + @pytest.mark.asyncio + async def test_worker_loop_reused_across_invocations(self): + """Test worker loop is reused, not recreated per invocation.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + return {"ok": True} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + + await app._invoke_handler(handler, context, False, {}) + loop1 = app._worker_loop + + await app._invoke_handler(handler, context, False, {}) + loop2 = app._worker_loop + + assert loop1 is loop2 + + @pytest.mark.asyncio + async def test_concurrent_async_invocations(self): + """Test multiple concurrent async invocations run on the worker loop.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + await asyncio.sleep(0.3) + return {"id": payload["id"]} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + + start = time.time() + tasks = [asyncio.create_task(app._invoke_handler(handler, context, False, {"id": i})) for i in range(5)] + results = await asyncio.gather(*tasks) + duration = time.time() - start + + assert all(r["id"] == i for i, r in enumerate(results)) + # Should be ~0.3s (concurrent), not ~1.5s (serial) + assert duration < 1.0 + + @pytest.mark.asyncio + async def test_functools_partial_async_dispatches_to_worker(self): + """Test functools.partial wrapping an async function dispatches to worker loop.""" + import functools + + app = BedrockAgentCoreApp() + handler_thread_id = None + + async def base_handler(extra, payload): + nonlocal handler_thread_id + handler_thread_id = threading.current_thread().ident + return {"extra": extra} + + partial_handler = functools.partial(base_handler, "value") + app.handlers["main"] = partial_handler + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + result = await app._invoke_handler(partial_handler, context, False, {}) + + assert result == {"extra": "value"} + assert handler_thread_id != threading.current_thread().ident + + @pytest.mark.asyncio + async def test_callable_class_async_dispatches_to_worker(self): + """Test callable class with async __call__ dispatches to worker loop.""" + app = BedrockAgentCoreApp() + handler_thread_id = None + + class AsyncHandler: + async def __call__(self, payload): + nonlocal handler_thread_id + handler_thread_id = threading.current_thread().ident + return {"class": True} + + handler = AsyncHandler() + app.handlers["main"] = handler + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + result = await app._invoke_handler(handler, context, False, {}) + + assert result == {"class": True} + assert handler_thread_id != threading.current_thread().ident + + @pytest.mark.asyncio + async def test_ping_healthy_busy_during_background_task(self): + """Test /ping returns HEALTHY_BUSY while a background task runs on worker loop.""" + from bedrock_agentcore.runtime.models import PingStatus + + app = BedrockAgentCoreApp() + task_started = threading.Event() + + @app.async_task + async def background_work(): + task_started.set() + await asyncio.sleep(1.0) + + @app.entrypoint + async def handler(payload): + asyncio.create_task(background_work()) + return {"accepted": True} + + from bedrock_agentcore.runtime.context import RequestContext + + context = RequestContext(session_id=None) + await app._invoke_handler(handler, context, False, {}) + task_started.wait(timeout=2.0) + + status = app.get_current_ping_status() + assert status == PingStatus.HEALTHY_BUSY + + def test_sync_streaming_still_works_via_http(self): + """Test sync generator streaming works end-to-end via HTTP.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + def handler(payload): + yield {"chunk": 1} + yield {"chunk": 2} + + client = TestClient(app) + response = client.post("/invocations", json={"test": True}) + + assert response.status_code == 200 + content = response.content.decode("utf-8") + assert 'data: {"chunk": 1}' in content + assert 'data: {"chunk": 2}' in content + + def test_async_handler_works_via_http(self): + """Test async handlers work end-to-end via HTTP with worker loop.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + await asyncio.sleep(0.01) + return {"result": payload.get("input", "none")} + + client = TestClient(app) + response = client.post("/invocations", json={"input": "hello"}) + + assert response.status_code == 200 + assert response.json() == {"result": "hello"} + + def test_async_gen_streaming_works_via_http(self): + """Test async generator streaming works end-to-end via HTTP with bridge.""" + app = BedrockAgentCoreApp() + + @app.entrypoint + async def handler(payload): + yield {"chunk": "a"} + await asyncio.sleep(0.01) + yield {"chunk": "b"} + + client = TestClient(app) + response = client.post("/invocations", json={"test": True}) + + assert response.status_code == 200 + content = response.content.decode("utf-8") + assert 'data: {"chunk": "a"}' in content + assert 'data: {"chunk": "b"}' in content