fix: resolve asyncio.to_thread thread-leakage via non-blocking HTTP approval mechanism#101
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds human-in-the-loop safety approvals: SafetyEngine pending-approval tracking, an async HTTP approval handler, three API endpoints to request/list/resolve approvals, and unit/integration tests covering lifecycle and HTTP behavior. ChangesHuman-in-the-Loop Safety Approvals
Sequence DiagramsequenceDiagram
participant Client
participant APIServer as API Server
participant SafetyEngine
participant External as Approval Service
Client->>APIServer: POST /api/v1/safety/request (SafetyApprovalRequest)
APIServer->>SafetyEngine: submit_pending_approval(event_id)
SafetyEngine-->>APIServer: Future[bool] (pending)
External->>APIServer: POST /api/v1/safety/pending/{event_id}/resolve (resolution)
APIServer->>SafetyEngine: resolve_pending_approval(event_id, approved)
SafetyEngine-->>APIServer: bool (resolved)
APIServer->>SafetyEngine: await future (with timeout)
SafetyEngine-->>APIServer: bool result
APIServer-->>Client: { "approved": bool, "status": "approved"|"denied"|"timeout" }
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
d4c9a70 to
ef269b2
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
tests/test_server.py (1)
3-7:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winFix import ordering to unblock CI.
The pipeline fails on
ruff (I001)for this import block. The first-party import (agentwatch.api.server) must be separated from the third-party group by a blank line.🔧 Proposed fix
from __future__ import annotations import pytest from fastapi.testclient import TestClient + from agentwatch.api.server import app🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_server.py` around lines 3 - 7, The import block in tests/test_server.py violates ruff import grouping (I001); separate third-party imports from first-party imports by adding a single blank line between the third-party imports (pytest, TestClient from fastapi.testclient) and the first-party import of agentwatch.api.server (the app symbol), so the groupings are: standard libs, blank line, third-party libs, blank line, first-party/local imports.tests/test_safety.py (1)
12-17:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winClean up the new test imports to restore a green lint run.
Ruff is already flagging
RiskPatternand both localAgentEventimports as unused, and the two local import blocks need sorting.Also applies to: 347-352, 421-423
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_safety.py` around lines 12 - 17, Remove unused symbols and sort the import blocks: delete the unused RiskPattern import from the safety import tuple and remove any duplicate or unused local AgentEvent imports referenced elsewhere in the file, then reorder the remaining import groups and symbols alphabetically/PEP8-style so imports are grouped (standard, third-party, local) and lint (Ruff) no longer flags unused or unsorted imports; update the import lines that currently include RiskPattern, AgentEvent, and the two local import blocks (also present at the other mentioned locations) accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@agentwatch/api/server.py`:
- Around line 637-647: The pending-approval future registered via
_safety_engine.submit_pending_approval(event_id) must be unregistered on all
exit paths; wrap the await asyncio.wait_for(future, ...) in a try/except/finally
and in the finally always call _safety_engine.resolve_pending_approval(event_id,
approved=False) so cancelled tasks or other exceptions clear _pending_approvals
as well (use the same event_id and keep the existing except block for
TimeoutError to avoid changing behavior). Ensure you reference the same symbols:
event_id, future, _safety_engine.submit_pending_approval and
_safety_engine.resolve_pending_approval when making the change.
In `@agentwatch/core/safety.py`:
- Around line 615-616: The import block in agentwatch/core/safety.py should be
sorted into standard-library and third-party groups to satisfy Ruff/I001: put
"import os" (stdlib) first, add a blank line, then "import httpx" (third-party);
alternatively run isort on the file to apply the correct ordering automatically
so the import block (os, httpx) follows the expected grouping.
- Around line 530-542: The submit_pending_approval implementation currently
overwrites any existing future in self._pending_approvals for the same event_id;
change it to be idempotent by checking self._pending_approvals for an existing
Future and returning that existing, still-pending Future instead of creating a
new one (only create and store a new Future when there is no entry or the
existing Future is done/cancelled); reference the submit_pending_approval method
and the self._pending_approvals mapping when making this change so retries of
the same event_id reuse the original waiter rather than orphaning it.
In `@tests/test_server.py`:
- Around line 54-62: The background function resolve_later currently performs
asserts inside a worker thread so failures are swallowed; change resolve_later
to capture its result/exception (e.g., store response status/body or exception
info into a shared mutable container) instead of asserting, start the thread as
before, join the thread, then in the main test thread assert on the captured
outcome (validate client.post to f"/api/v1/safety/pending/{event_id}/resolve"
returned status_code 200 and returned body contains status == "resolved" and
approved == True) so any failures surface to pytest; reference the resolve_later
function, thread.join(), client.post(...) and event_id when updating the test.
- Around line 67-85: The issue is that resolve_pending_approval resolves an
anyio future from a different event loop thread by calling
future.set_result(approved) directly; update resolve_pending_approval in
agentwatch/core/safety.py to schedule the resolution on the future's originating
loop (e.g., use future.get_loop().call_soon_threadsafe(future.set_result,
approved) or store the originating loop when creating the future and use
loop.call_soon_threadsafe) so cross-portal thread resolution is thread-safe;
also update tests/test_server.py to avoid time.sleep by asserting the elapsed
time for the request completes well under the approval_timeout_seconds (replace
the implicit sleep-based wait with an elapsed-time assertion after joining the
resolver thread).
---
Outside diff comments:
In `@tests/test_safety.py`:
- Around line 12-17: Remove unused symbols and sort the import blocks: delete
the unused RiskPattern import from the safety import tuple and remove any
duplicate or unused local AgentEvent imports referenced elsewhere in the file,
then reorder the remaining import groups and symbols alphabetically/PEP8-style
so imports are grouped (standard, third-party, local) and lint (Ruff) no longer
flags unused or unsorted imports; update the import lines that currently include
RiskPattern, AgentEvent, and the two local import blocks (also present at the
other mentioned locations) accordingly.
In `@tests/test_server.py`:
- Around line 3-7: The import block in tests/test_server.py violates ruff import
grouping (I001); separate third-party imports from first-party imports by adding
a single blank line between the third-party imports (pytest, TestClient from
fastapi.testclient) and the first-party import of agentwatch.api.server (the app
symbol), so the groupings are: standard libs, blank line, third-party libs,
blank line, first-party/local imports.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: de4693eb-8278-49ce-882c-f4f36ce8740e
📒 Files selected for processing (4)
agentwatch/api/server.pyagentwatch/core/safety.pytests/test_safety.pytests/test_server.py
| import asyncio | ||
| event_id = request.event_id | ||
| future = _safety_engine.submit_pending_approval(event_id) | ||
|
|
||
| timeout = float(request.approval_timeout_seconds or 120) | ||
| try: | ||
| approved = await asyncio.wait_for(future, timeout=timeout) | ||
| return {"approved": approved, "status": "approved" if approved else "denied"} | ||
| except (asyncio.TimeoutError, TimeoutError): | ||
| _safety_engine.resolve_pending_approval(event_id, approved=False) | ||
| return {"approved": False, "status": "timeout"} |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and inspect the relevant section in agentwatch/api/server.py
FILE="agentwatch/api/server.py"
ls -la "$FILE"
python3 - <<'PY'
import itertools,sys,os
file_path="agentwatch/api/server.py"
start=580
end=720
with open(file_path,'r',encoding='utf-8') as f:
for i,line in enumerate(f, start=1):
if start <= i <= end:
print(f"{i:4d}: {line.rstrip()}")
if i > end:
break
PY
# Find the pending-approval structures and the related engine methods
rg -n "_pending_approvals|submit_pending_approval|resolve_pending_approval|safety/pending|request_safety_approval|SafetyApprovalRequest|SafetyApprovalResolution" agentwatch/api/server.py agentwatch -S || true
# Check for Ruff UP041 occurrence in this file (or nearby)
rg -n "UP041" -S agentwatch/api/server.py agentwatch || true
# Show the definitions of the relevant models (if in this file) for context
python3 - <<'PY'
import re
path="agentwatch/api/server.py"
targets=["class SafetyApprovalRequest","class SafetyApprovalResolution","def request_safety_approval","/api/v1/safety/pending","pending"]
with open(path,'r',encoding='utf-8') as f:
lines=list(f)
for i,l in enumerate(lines, start=1):
if any(t in l for t in targets if ' ' in t or t.startswith('class') or t.startswith('def') or '/' in t or t in l):
print("HIT",i,l.rstrip())
PYRepository: sreerevanth/AgentWatch
Length of output: 9806
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="agentwatch/core/safety.py"
ls -la "$FILE"
python3 - <<'PY'
path="agentwatch/core/safety.py"
start=280
end=620
with open(path,'r',encoding='utf-8') as f:
for i,line in enumerate(f, start=1):
if start <= i <= end:
print(f"{i:4d}: {line.rstrip()}")
if i > end:
break
PYRepository: sreerevanth/AgentWatch
Length of output: 14862
Always unregister the pending future when this request exits.
Cleanup only happens on the except TimeoutError: path in request_safety_approval; if the request task is cancelled before timeout, _safety_engine.resolve_pending_approval(...) won’t run, leaving _pending_approvals[event_id] (and thus /api/v1/safety/pending) to accumulate stale entries. Add a finally that calls resolve_pending_approval(event_id, approved=False) so cancellation paths also clear the pending entry.
🧰 Tools
🪛 GitHub Actions: PR Tests / 0_Test & lint.txt
[error] 645-645: ruff check . failed (UP041): Replace aliased errors with TimeoutError. Suggests replacing except (asyncio.TimeoutError, TimeoutError): with builtin TimeoutError.
🪛 GitHub Actions: PR Tests / Test & lint
[error] 645-645: ruff (UP041): Replace aliased errors with TimeoutError (use builtin TimeoutError).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agentwatch/api/server.py` around lines 637 - 647, The pending-approval future
registered via _safety_engine.submit_pending_approval(event_id) must be
unregistered on all exit paths; wrap the await asyncio.wait_for(future, ...) in
a try/except/finally and in the finally always call
_safety_engine.resolve_pending_approval(event_id, approved=False) so cancelled
tasks or other exceptions clear _pending_approvals as well (use the same
event_id and keep the existing except block for TimeoutError to avoid changing
behavior). Ensure you reference the same symbols: event_id, future,
_safety_engine.submit_pending_approval and
_safety_engine.resolve_pending_approval when making the change.
| def submit_pending_approval(self, event_id: str) -> asyncio.Future[bool]: | ||
| """Submit a pending safety check for asynchronous approval. | ||
|
|
||
| Args: | ||
| event_id: The unique ID of the event requiring approval. | ||
|
|
||
| Returns: | ||
| An asyncio.Future that resolves to True if approved, False if denied. | ||
| """ | ||
| loop = asyncio.get_running_loop() | ||
| future = loop.create_future() | ||
| self._pending_approvals[event_id] = future | ||
| return future |
There was a problem hiding this comment.
Make pending approval registration idempotent per event_id.
This overwrites any in-flight future for the same event_id. A retried /api/v1/safety/request call can orphan the original waiter, so the first request will never observe the human decision and only finishes via timeout.
Proposed fix
def submit_pending_approval(self, event_id: str) -> asyncio.Future[bool]:
"""Submit a pending safety check for asynchronous approval.
@@
- loop = asyncio.get_running_loop()
- future = loop.create_future()
- self._pending_approvals[event_id] = future
- return future
+ existing = self._pending_approvals.get(event_id)
+ if existing is not None and not existing.done():
+ return existing
+
+ loop = asyncio.get_running_loop()
+ future = loop.create_future()
+ self._pending_approvals[event_id] = future
+ return future🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@agentwatch/core/safety.py` around lines 530 - 542, The
submit_pending_approval implementation currently overwrites any existing future
in self._pending_approvals for the same event_id; change it to be idempotent by
checking self._pending_approvals for an existing Future and returning that
existing, still-pending Future instead of creating a new one (only create and
store a new Future when there is no entry or the existing Future is
done/cancelled); reference the submit_pending_approval method and the
self._pending_approvals mapping when making this change so retries of the same
event_id reuse the original waiter rather than orphaning it.
| # 3. Call request approval - this will block until the thread resolves it | ||
| req_body = { | ||
| "event_id": event_id, | ||
| "session_id": "test-session", | ||
| "agent_id": "test-agent", | ||
| "safety": { | ||
| "risk_level": "high", | ||
| "risk_score": 0.8, | ||
| "reasons": ["risky push"], | ||
| }, | ||
| "approval_timeout_seconds": 5 | ||
| } | ||
|
|
||
| resp_req = client.post("/api/v1/safety/request", json=req_body) | ||
| thread.join() | ||
|
|
||
| assert resp_req.status_code == 200 | ||
| assert resp_req.json()["approved"] is True | ||
| assert resp_req.json()["status"] == "approved" |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does Starlette TestClient run each request in its own separate event loop / blocking portal?
💡 Result:
Yes—Starlette’s synchronous TestClient does not run each request on your test thread’s existing event loop. Instead, it uses an anyio BlockingPortal created via anyio.from_thread.start_blocking_portal, which (by anyio design) starts “a new event loop in a new thread” for that portal’s lifetime. [1][2] However, it is not “a new thread/loop per request” inside a single TestClient context. TestClient caches a single BlockingPortal for reuse when you use TestClient as a context manager; subsequent requests within that context reuse the same portal/event loop thread. [3] What that means operationally: - Outside a context manager, TestClient may create a new blocking portal for each interaction (so you should expect different event loops across requests). [3] - Within a single with TestClient(app) as client: block, all requests run in the same cached BlockingPortal/event loop thread. [3] Primary implementation points: - TestClient’s portal factory calls anyio.from_thread.start_blocking_portal when self.portal is None. [1][4] - anyio.start_blocking_portal’s docstring explicitly says it “Start[s] a new event loop in a new thread and run[s] a blocking portal in its main task.” [2] So the accurate characterization is: “separate event loop(s) from your calling thread,” with portal reuse inside the TestClient context, rather than necessarily “a fresh event loop per individual request.” [1][2][3]
Citations:
- 1: https://starlette.dev/testclient/
- 2: TestClient waits for background tasks Kludex/starlette#533
- 3: https://github.com/encode/starlette/blob/9f16bf5c25e126200701f6e04330864f4a91a898/starlette/testclient.py
- 4: Event loop is closed Kludex/starlette#2069
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the relevant files
ls -la
fd -a "test_server.py" -t f
fd -a "safety.py" -t f
# Show the exact code around the referenced test lines
echo "---- tests/test_server.py (lines ~50-100) ----"
nl -ba tests/test_server.py | sed -n '50,110p'
# Locate agentwatch/core/safety.py and show resolve_pending_approval implementation
echo "---- agentwatch/core/safety.py (search for resolve_pending_approval) ----"
fd -a "agentwatch/core/safety.py" -t f -H . || true
# Fallback: find any safety.py under agentwatch/core
fd -a "safety.py" agentwatch/core -t f || true
# Print the function body if present
if [ -f agentwatch/core/safety.py ]; then
nl -ba agentwatch/core/safety.py | sed -n '1,220p'
echo "---- function snippet for resolve_pending_approval ----"
rg -n "def resolve_pending_approval|resolve_pending_approval" agentwatch/core/safety.py
# Print around the match
rg -n "def resolve_pending_approval" agentwatch/core/safety.py | head -n 5 | awk -F: '{print $1}' | head -n 1
fi
# Also search for the "/resolve" endpoint wiring
echo "---- Search for /api/v1/safety/resolve wiring ----"
rg -n "\"/api/v1/safety/(request|resolve)\"|safety.*resolve|resolve_pending_approval" . | head -n 80Repository: sreerevanth/AgentWatch
Length of output: 2181
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- tests/test_server.py (lines 55-95) ----"
sed -n '55,95p' tests/test_server.py | nl -ba | sed -n '1,80p' || true
# fallback if nl missing: use awk line numbering
if ! command -v nl >/dev/null 2>&1; then
echo "(nl not available; printing without line numbers)"
sed -n '55,95p' tests/test_server.py
fi
echo "---- agentwatch/core/safety.py: locate resolve_pending_approval ----"
rg -n "def resolve_pending_approval|resolve_pending_approval" agentwatch/core/safety.py
echo "---- agentwatch/core/safety.py: show around resolve_pending_approval ----"
# show 120 lines around first match
MATCH_LINE=$(rg -n "def resolve_pending_approval" -n agentwatch/core/safety.py | head -n1 | cut -d: -f1 || true)
if [ -n "${MATCH_LINE:-}" ]; then
START=$((MATCH_LINE-40))
END=$((MATCH_LINE+120))
if [ "$START" -lt 1 ]; then START=1; fi
sed -n "${START},${END}p" agentwatch/core/safety.py
fi
echo "---- agentwatch/core/safety.py: search for set_result / call_soon_threadsafe ----"
rg -n "set_result|call_soon_threadsafe" agentwatch/core/safety.py
echo "---- Search for /api/v1/safety/resolve endpoint wiring ----"
rg -n "/api/v1/safety/(request|resolve)" -S agentwatch | head -n 80
rg -n "safety.*resolve|resolve_pending_approval" -S agentwatch | head -n 80Repository: sreerevanth/AgentWatch
Length of output: 7761
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "---- tests/test_server.py: imports and client fixture/setup ----"
sed -n '1,140p' tests/test_server.py
echo "---- tests/test_server.py: find the test containing /api/v1/safety/request ----"
rg -n "api/v1/safety/request" tests/test_server.py
# show around the specific test lines near 67-85
echo "---- tests/test_server.py: around lines 40-120 (approx test) ----"
sed -n '35,130p' tests/test_server.py
echo "---- agentwatch/api/server.py: show request/resolve handlers around listed lines ----"
sed -n '600,720p' agentwatch/api/server.pyRepository: sreerevanth/AgentWatch
Length of output: 9430
🌐 Web query:
Starlette TestClient portal caching behavior when not used as context manager (with TestClient(app) as client): does it start a new event loop/thread per request or reuse the same BlockingPortal?
💡 Result:
When using the Starlette TestClient without a context manager, it does not cache the BlockingPortal [1][2]. Each request made with a standalone TestClient instance that is not wrapped in a with block will trigger the acquisition of a new BlockingPortal, which involves spawning a new thread and starting a new event loop for that specific interaction [1][2][3]. In contrast, when you use the TestClient as a context manager (e.g., with TestClient(app) as client:), the TestClient creates and caches a BlockingPortal for the duration of that block [1][2][3]. Subsequent requests made within that same context manager block will reuse the cached BlockingPortal, ensuring that they run in the same event loop and can share resources (such as state, database connections, or task groups) created during the lifespan of that specific context [2][3]. This behavior is implemented via the _portal_factory method in starlette/testclient.py, which checks if self.portal is already set [1]. If it is not set—which occurs when not inside the context manager—it creates a new one [1]. Using the context manager is the recommended approach to ensure consistent event loop behavior, especially when testing applications that rely on lifespan events or shared async resources [4][2].
Citations:
- 1: https://github.com/Kludex/starlette/blob/main/starlette/testclient.py
- 2: ensure TestClient requests run in the same EventLoop as lifespan Kludex/starlette#1213
- 3: ensure TestClient requests run in the same EventLoop as lifespan Kludex/starlette#1213
- 4: https://starlette.dev/testclient/
🌐 Web query:
anyio BlockingPortal thread-safety: can client.post be called concurrently from different Python threads using the same TestClient instance?
💡 Result:
Yes—concurrent calls to client.post from different Python threads using the same Starlette/FastAPI TestClient instance are generally expected to work because TestClient caches a single anyio BlockingPortal within its context, and that portal can be used to marshal work into the event loop from external threads. Starlette’s TestClient uses a cached portal for all interactions in the synchronous context manager, and anyio documents that a BlockingPortal can run async code from other threads via portal.call and even spawn tasks from that portal. [1][2][3] However, there are important caveats: 1) You must use the same TestClient instance within its context manager (with TestClient(app) as client:...). Starlette notes that TestClient session/caching is managed by the context manager; using it outside the context can lead to the portal/thread lifecycle issues. [4][1] 2) Portal.call concurrency is mediated by the portal’s event loop thread/task group; thread-safety depends on the portal staying alive. If you end/exit the TestClient context while worker threads are still issuing requests, calls can fail (e.g., “portal not running” style errors). [2][3] 3) anyio BlockingPortalProvider is specifically designed so that “no matter how many threads call … at the same time, the same blocking portal will be used” (i.e., sharing the portal across threads is the intended pattern). [3] Practical answer to your question: if you share one TestClient instance (and keep it inside a single with block so its cached portal remains running), then yes, different threads can call client.post concurrently; TestClient will route each call through the same BlockingPortal/event loop. [1][2][3] If you tell me which library/version you’re using (Starlette vs FastAPI, and TestClient backend), I can narrow to that exact TestClient implementation.
Citations:
- 1: ensure TestClient requests run in the same EventLoop as lifespan Kludex/starlette#1213
- 2: https://github.com/Kludex/starlette/blob/main/starlette/testclient.py
- 3: https://anyio.readthedocs.io/en/latest/threads.html
- 4: https://starlette.dev/testclient/
Fix pending-approval wakeup across TestClient portals (thread-safe future resolution).
tests/test_server.py creates TestClient(app) without a context manager, and the /request and background-thread /resolve calls therefore run in different anyio BlockingPortal/event-loop threads. In agentwatch/core/safety.py, resolve_pending_approval() resolves the waiter by calling future.set_result(approved) directly, which is not safe across event loops and can cause /api/v1/safety/request to only complete via the approval_timeout_seconds path. Resolve the result via the future’s originating loop (e.g., future.get_loop().call_soon_threadsafe(future.set_result, approved), or store the loop alongside the future), and add an elapsed-time assertion so the test doesn’t rely on time.sleep(0.1).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/test_server.py` around lines 67 - 85, The issue is that
resolve_pending_approval resolves an anyio future from a different event loop
thread by calling future.set_result(approved) directly; update
resolve_pending_approval in agentwatch/core/safety.py to schedule the resolution
on the future's originating loop (e.g., use
future.get_loop().call_soon_threadsafe(future.set_result, approved) or store the
originating loop when creating the future and use loop.call_soon_threadsafe) so
cross-portal thread resolution is thread-safe; also update tests/test_server.py
to avoid time.sleep by asserting the elapsed time for the request completes well
under the approval_timeout_seconds (replace the implicit sleep-based wait with
an elapsed-time assertion after joining the resolver thread).
82953ce to
88c3dec
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/test_server.py`:
- Line 50: Replace the deprecated httpx.AsyncClient usage that passes app=
directly with a transport using httpx.ASGITransport: instantiate AsyncClient
with transport=httpx.ASGITransport(app=app), keeping base_url the same; also
remove the redundant inner import "from agentwatch.api.server import app" inside
the async test since app is already imported at module scope. Ensure you import
httpx.ASGITransport if not already referenced.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 2a1e45a0-4a3a-4cbf-81d8-dfedb6fadf98
📒 Files selected for processing (4)
agentwatch/api/server.pyagentwatch/core/safety.pytests/test_safety.pytests/test_server.py
🚧 Files skipped from review as they are similar to previous changes (3)
- agentwatch/api/server.py
- agentwatch/core/safety.py
- tests/test_safety.py
88c3dec to
348ea63
Compare
348ea63 to
baab123
Compare
|
@sreerevanth |
Problem
Pre-action safety human approvals (such as
cli_approval_handler) relied on executing the synchronous standard libraryinput()function within a thread pool usingasyncio.to_thread.In asynchronous Python applications, when an approval prompt times out (e.g.
asyncio.wait_for), the outer task gets cancelled but the underlying worker thread cannot be aborted or killed and stays blocked indefinitely onstdin. Over multiple timeouts, this leaks and accumulates zombie worker threads, leading to thread/resource exhaustion. Worse, later stdin entries can get randomly consumed by old zombie threads instead of the active prompt.Solution
We implemented a production-grade, asynchronous, and stdin-free non-blocking approval loop perfect for production/headless server deployments:
SafetyEngine(submit_pending_approval/resolve_pending_approval).http_approval_handlerwhich sends a non-blocking POST request to a webhook/callback URL (configured viaAGENTWATCH_APPROVAL_URL) usinghttpx.AsyncClientwithout locking or spawning threads.POST /api/v1/safety/request(awaits the approval future non-blockingly)GET /api/v1/safety/pending(lists pending checks)POST /api/v1/safety/pending/{event_id}/resolve(resolves approval/denial)Testing & Verification
tests/test_safety.pycovering stateful pending approval registration and mocked HTTP handler behaviors.test_api_server_safety_approval_flowintests/test_server.pyusing a concurrent thread client to confirm end-to-end webhook safety routing.Summary by CodeRabbit
New Features
Tests