Skip to content

fix: resolve asyncio.to_thread thread-leakage via non-blocking HTTP approval mechanism#101

Open
MayurKharat0390 wants to merge 1 commit into
sreerevanth:mainfrom
MayurKharat0390:fix/http-approval-nonblocking
Open

fix: resolve asyncio.to_thread thread-leakage via non-blocking HTTP approval mechanism#101
MayurKharat0390 wants to merge 1 commit into
sreerevanth:mainfrom
MayurKharat0390:fix/http-approval-nonblocking

Conversation

@MayurKharat0390
Copy link
Copy Markdown
Contributor

@MayurKharat0390 MayurKharat0390 commented May 30, 2026

Problem

Pre-action safety human approvals (such as cli_approval_handler) relied on executing the synchronous standard library input() function within a thread pool using asyncio.to_thread.

In asynchronous Python applications, when an approval prompt times out (e.g. asyncio.wait_for), the outer task gets cancelled but the underlying worker thread cannot be aborted or killed and stays blocked indefinitely on stdin. Over multiple timeouts, this leaks and accumulates zombie worker threads, leading to thread/resource exhaustion. Worse, later stdin entries can get randomly consumed by old zombie threads instead of the active prompt.

Solution

We implemented a production-grade, asynchronous, and stdin-free non-blocking approval loop perfect for production/headless server deployments:

  1. Safety Engine Stateful Futures: Added asynchronous pending-approval future tracking to SafetyEngine (submit_pending_approval / resolve_pending_approval).
  2. Async HTTP Approval Handler: Implemented http_approval_handler which sends a non-blocking POST request to a webhook/callback URL (configured via AGENTWATCH_APPROVAL_URL) using httpx.AsyncClient without locking or spawning threads.
  3. FastAPI Endpoints: Exposed 3 REST endpoints to coordinate the approval loop asynchronously:
    • POST /api/v1/safety/request (awaits the approval future non-blockingly)
    • GET /api/v1/safety/pending (lists pending checks)
    • POST /api/v1/safety/pending/{event_id}/resolve (resolves approval/denial)

Testing & Verification

  • Added comprehensive unit tests in tests/test_safety.py covering stateful pending approval registration and mocked HTTP handler behaviors.
  • Added a full multi-process integration test test_api_server_safety_approval_flow in tests/test_server.py using a concurrent thread client to confirm end-to-end webhook safety routing.
  • All 237 tests pass perfectly!

Summary by CodeRabbit

  • New Features

    • Safety approval API to submit events for human review with a configurable timeout (default 120s; timeout resolves as denied)
    • Endpoints to list pending approvals and to resolve (approve/deny) specific pending requests (resolving missing/already-resolved IDs returns 404)
    • Optional non-blocking external HTTP approval callback for async approval workflows
  • Tests

    • Async integration and unit tests covering the approval flow, pending-state behavior, timeouts, and external callback responses

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 30, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds human-in-the-loop safety approvals: SafetyEngine pending-approval tracking, an async HTTP approval handler, three API endpoints to request/list/resolve approvals, and unit/integration tests covering lifecycle and HTTP behavior.

Changes

Human-in-the-Loop Safety Approvals

Layer / File(s) Summary
SafetyEngine pending-approval tracking
agentwatch/core/safety.py, tests/test_safety.py
SafetyEngine.submit_pending_approval registers an unresolved Future[bool] by event_id and returns it; SafetyEngine.resolve_pending_approval completes the future and removes the pending entry. Async unit test verifies lifecycle and completion.
HTTP approval handler callback
agentwatch/core/safety.py, tests/test_safety.py
Adds http_approval_handler that reads AGENTWATCH_APPROVAL_URL, POSTs event/session/agent/tool and safety metadata to an external service, parses JSON {"approved": bool}, and returns approval; unit tests cover missing-URL and mocked HTTP responses and adjust related test imports/formatting.
REST API models and safety approval endpoints
agentwatch/api/server.py
Pydantic models SafetyApprovalRequest and SafetyApprovalResolution. POST /api/v1/safety/request enqueues a pending approval and awaits resolution with timeout handling (resolves as denied on timeout); GET /api/v1/safety/pending lists pending IDs; POST /api/v1/safety/pending/{event_id}/resolve resolves an event or returns 404.
Integration test for end-to-end approval flow
tests/test_server.py
Async integration test confirms no pending approvals initially, concurrently posts a safety request and resolves it via the resolve endpoint, asserts request and resolve responses, and verifies pending-list cleanup.

Sequence Diagram

sequenceDiagram
  participant Client
  participant APIServer as API Server
  participant SafetyEngine
  participant External as Approval Service

  Client->>APIServer: POST /api/v1/safety/request (SafetyApprovalRequest)
  APIServer->>SafetyEngine: submit_pending_approval(event_id)
  SafetyEngine-->>APIServer: Future[bool] (pending)

  External->>APIServer: POST /api/v1/safety/pending/{event_id}/resolve (resolution)
  APIServer->>SafetyEngine: resolve_pending_approval(event_id, approved)
  SafetyEngine-->>APIServer: bool (resolved)

  APIServer->>SafetyEngine: await future (with timeout)
  SafetyEngine-->>APIServer: bool result
  APIServer-->>Client: { "approved": bool, "status": "approved"|"denied"|"timeout" }
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

NSoC 26', backend, level: intermediate

Poem

🐰 I sat atop a waiting thread,

A tiny future in my head,
A POST went out, a button pressed,
Approve or not — I took my rest,
Hops, tests, and logs: the workflow fed.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.36% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main objective of the PR: fixing thread-leakage issues by implementing a non-blocking HTTP approval mechanism instead of blocking stdin operations.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@MayurKharat0390 MayurKharat0390 force-pushed the fix/http-approval-nonblocking branch 2 times, most recently from d4c9a70 to ef269b2 Compare May 30, 2026 04:16
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
tests/test_server.py (1)

3-7: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Fix import ordering to unblock CI.

The pipeline fails on ruff (I001) for this import block. The first-party import (agentwatch.api.server) must be separated from the third-party group by a blank line.

🔧 Proposed fix
 from __future__ import annotations
 
 import pytest
 from fastapi.testclient import TestClient
+
 from agentwatch.api.server import app
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_server.py` around lines 3 - 7, The import block in
tests/test_server.py violates ruff import grouping (I001); separate third-party
imports from first-party imports by adding a single blank line between the
third-party imports (pytest, TestClient from fastapi.testclient) and the
first-party import of agentwatch.api.server (the app symbol), so the groupings
are: standard libs, blank line, third-party libs, blank line, first-party/local
imports.
tests/test_safety.py (1)

12-17: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Clean up the new test imports to restore a green lint run.

Ruff is already flagging RiskPattern and both local AgentEvent imports as unused, and the two local import blocks need sorting.

Also applies to: 347-352, 421-423

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_safety.py` around lines 12 - 17, Remove unused symbols and sort
the import blocks: delete the unused RiskPattern import from the safety import
tuple and remove any duplicate or unused local AgentEvent imports referenced
elsewhere in the file, then reorder the remaining import groups and symbols
alphabetically/PEP8-style so imports are grouped (standard, third-party, local)
and lint (Ruff) no longer flags unused or unsorted imports; update the import
lines that currently include RiskPattern, AgentEvent, and the two local import
blocks (also present at the other mentioned locations) accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@agentwatch/api/server.py`:
- Around line 637-647: The pending-approval future registered via
_safety_engine.submit_pending_approval(event_id) must be unregistered on all
exit paths; wrap the await asyncio.wait_for(future, ...) in a try/except/finally
and in the finally always call _safety_engine.resolve_pending_approval(event_id,
approved=False) so cancelled tasks or other exceptions clear _pending_approvals
as well (use the same event_id and keep the existing except block for
TimeoutError to avoid changing behavior). Ensure you reference the same symbols:
event_id, future, _safety_engine.submit_pending_approval and
_safety_engine.resolve_pending_approval when making the change.

In `@agentwatch/core/safety.py`:
- Around line 615-616: The import block in agentwatch/core/safety.py should be
sorted into standard-library and third-party groups to satisfy Ruff/I001: put
"import os" (stdlib) first, add a blank line, then "import httpx" (third-party);
alternatively run isort on the file to apply the correct ordering automatically
so the import block (os, httpx) follows the expected grouping.
- Around line 530-542: The submit_pending_approval implementation currently
overwrites any existing future in self._pending_approvals for the same event_id;
change it to be idempotent by checking self._pending_approvals for an existing
Future and returning that existing, still-pending Future instead of creating a
new one (only create and store a new Future when there is no entry or the
existing Future is done/cancelled); reference the submit_pending_approval method
and the self._pending_approvals mapping when making this change so retries of
the same event_id reuse the original waiter rather than orphaning it.

In `@tests/test_server.py`:
- Around line 54-62: The background function resolve_later currently performs
asserts inside a worker thread so failures are swallowed; change resolve_later
to capture its result/exception (e.g., store response status/body or exception
info into a shared mutable container) instead of asserting, start the thread as
before, join the thread, then in the main test thread assert on the captured
outcome (validate client.post to f"/api/v1/safety/pending/{event_id}/resolve"
returned status_code 200 and returned body contains status == "resolved" and
approved == True) so any failures surface to pytest; reference the resolve_later
function, thread.join(), client.post(...) and event_id when updating the test.
- Around line 67-85: The issue is that resolve_pending_approval resolves an
anyio future from a different event loop thread by calling
future.set_result(approved) directly; update resolve_pending_approval in
agentwatch/core/safety.py to schedule the resolution on the future's originating
loop (e.g., use future.get_loop().call_soon_threadsafe(future.set_result,
approved) or store the originating loop when creating the future and use
loop.call_soon_threadsafe) so cross-portal thread resolution is thread-safe;
also update tests/test_server.py to avoid time.sleep by asserting the elapsed
time for the request completes well under the approval_timeout_seconds (replace
the implicit sleep-based wait with an elapsed-time assertion after joining the
resolver thread).

---

Outside diff comments:
In `@tests/test_safety.py`:
- Around line 12-17: Remove unused symbols and sort the import blocks: delete
the unused RiskPattern import from the safety import tuple and remove any
duplicate or unused local AgentEvent imports referenced elsewhere in the file,
then reorder the remaining import groups and symbols alphabetically/PEP8-style
so imports are grouped (standard, third-party, local) and lint (Ruff) no longer
flags unused or unsorted imports; update the import lines that currently include
RiskPattern, AgentEvent, and the two local import blocks (also present at the
other mentioned locations) accordingly.

In `@tests/test_server.py`:
- Around line 3-7: The import block in tests/test_server.py violates ruff import
grouping (I001); separate third-party imports from first-party imports by adding
a single blank line between the third-party imports (pytest, TestClient from
fastapi.testclient) and the first-party import of agentwatch.api.server (the app
symbol), so the groupings are: standard libs, blank line, third-party libs,
blank line, first-party/local imports.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: de4693eb-8278-49ce-882c-f4f36ce8740e

📥 Commits

Reviewing files that changed from the base of the PR and between 55545f0 and 24d421c.

📒 Files selected for processing (4)
  • agentwatch/api/server.py
  • agentwatch/core/safety.py
  • tests/test_safety.py
  • tests/test_server.py

Comment thread agentwatch/api/server.py
Comment on lines +637 to +647
import asyncio
event_id = request.event_id
future = _safety_engine.submit_pending_approval(event_id)

timeout = float(request.approval_timeout_seconds or 120)
try:
approved = await asyncio.wait_for(future, timeout=timeout)
return {"approved": approved, "status": "approved" if approved else "denied"}
except (asyncio.TimeoutError, TimeoutError):
_safety_engine.resolve_pending_approval(event_id, approved=False)
return {"approved": False, "status": "timeout"}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate and inspect the relevant section in agentwatch/api/server.py
FILE="agentwatch/api/server.py"
ls -la "$FILE"
python3 - <<'PY'
import itertools,sys,os
file_path="agentwatch/api/server.py"
start=580
end=720
with open(file_path,'r',encoding='utf-8') as f:
    for i,line in enumerate(f, start=1):
        if start <= i <= end:
            print(f"{i:4d}: {line.rstrip()}")
        if i > end:
            break
PY

# Find the pending-approval structures and the related engine methods
rg -n "_pending_approvals|submit_pending_approval|resolve_pending_approval|safety/pending|request_safety_approval|SafetyApprovalRequest|SafetyApprovalResolution" agentwatch/api/server.py agentwatch -S || true

# Check for Ruff UP041 occurrence in this file (or nearby)
rg -n "UP041" -S agentwatch/api/server.py agentwatch || true

# Show the definitions of the relevant models (if in this file) for context
python3 - <<'PY'
import re
path="agentwatch/api/server.py"
targets=["class SafetyApprovalRequest","class SafetyApprovalResolution","def request_safety_approval","/api/v1/safety/pending","pending"]
with open(path,'r',encoding='utf-8') as f:
    lines=list(f)
for i,l in enumerate(lines, start=1):
    if any(t in l for t in targets if ' ' in t or t.startswith('class') or t.startswith('def') or '/' in t or t in l):
        print("HIT",i,l.rstrip())
PY

Repository: sreerevanth/AgentWatch

Length of output: 9806


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="agentwatch/core/safety.py"
ls -la "$FILE"

python3 - <<'PY'
path="agentwatch/core/safety.py"
start=280
end=620
with open(path,'r',encoding='utf-8') as f:
    for i,line in enumerate(f, start=1):
        if start <= i <= end:
            print(f"{i:4d}: {line.rstrip()}")
        if i > end:
            break
PY

Repository: sreerevanth/AgentWatch

Length of output: 14862


Always unregister the pending future when this request exits.

Cleanup only happens on the except TimeoutError: path in request_safety_approval; if the request task is cancelled before timeout, _safety_engine.resolve_pending_approval(...) won’t run, leaving _pending_approvals[event_id] (and thus /api/v1/safety/pending) to accumulate stale entries. Add a finally that calls resolve_pending_approval(event_id, approved=False) so cancellation paths also clear the pending entry.

🧰 Tools
🪛 GitHub Actions: PR Tests / 0_Test & lint.txt

[error] 645-645: ruff check . failed (UP041): Replace aliased errors with TimeoutError. Suggests replacing except (asyncio.TimeoutError, TimeoutError): with builtin TimeoutError.

🪛 GitHub Actions: PR Tests / Test & lint

[error] 645-645: ruff (UP041): Replace aliased errors with TimeoutError (use builtin TimeoutError).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agentwatch/api/server.py` around lines 637 - 647, The pending-approval future
registered via _safety_engine.submit_pending_approval(event_id) must be
unregistered on all exit paths; wrap the await asyncio.wait_for(future, ...) in
a try/except/finally and in the finally always call
_safety_engine.resolve_pending_approval(event_id, approved=False) so cancelled
tasks or other exceptions clear _pending_approvals as well (use the same
event_id and keep the existing except block for TimeoutError to avoid changing
behavior). Ensure you reference the same symbols: event_id, future,
_safety_engine.submit_pending_approval and
_safety_engine.resolve_pending_approval when making the change.

Comment thread agentwatch/core/safety.py
Comment on lines +530 to +542
def submit_pending_approval(self, event_id: str) -> asyncio.Future[bool]:
"""Submit a pending safety check for asynchronous approval.

Args:
event_id: The unique ID of the event requiring approval.

Returns:
An asyncio.Future that resolves to True if approved, False if denied.
"""
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_approvals[event_id] = future
return future
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make pending approval registration idempotent per event_id.

This overwrites any in-flight future for the same event_id. A retried /api/v1/safety/request call can orphan the original waiter, so the first request will never observe the human decision and only finishes via timeout.

Proposed fix
 def submit_pending_approval(self, event_id: str) -> asyncio.Future[bool]:
     """Submit a pending safety check for asynchronous approval.
@@
-    loop = asyncio.get_running_loop()
-    future = loop.create_future()
-    self._pending_approvals[event_id] = future
-    return future
+    existing = self._pending_approvals.get(event_id)
+    if existing is not None and not existing.done():
+        return existing
+
+    loop = asyncio.get_running_loop()
+    future = loop.create_future()
+    self._pending_approvals[event_id] = future
+    return future
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@agentwatch/core/safety.py` around lines 530 - 542, The
submit_pending_approval implementation currently overwrites any existing future
in self._pending_approvals for the same event_id; change it to be idempotent by
checking self._pending_approvals for an existing Future and returning that
existing, still-pending Future instead of creating a new one (only create and
store a new Future when there is no entry or the existing Future is
done/cancelled); reference the submit_pending_approval method and the
self._pending_approvals mapping when making this change so retries of the same
event_id reuse the original waiter rather than orphaning it.

Comment thread agentwatch/core/safety.py
Comment thread tests/test_server.py Outdated
Comment thread tests/test_server.py Outdated
Comment on lines +67 to +85
# 3. Call request approval - this will block until the thread resolves it
req_body = {
"event_id": event_id,
"session_id": "test-session",
"agent_id": "test-agent",
"safety": {
"risk_level": "high",
"risk_score": 0.8,
"reasons": ["risky push"],
},
"approval_timeout_seconds": 5
}

resp_req = client.post("/api/v1/safety/request", json=req_body)
thread.join()

assert resp_req.status_code == 200
assert resp_req.json()["approved"] is True
assert resp_req.json()["status"] == "approved"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🌐 Web query:

Does Starlette TestClient run each request in its own separate event loop / blocking portal?

💡 Result:

Yes—Starlette’s synchronous TestClient does not run each request on your test thread’s existing event loop. Instead, it uses an anyio BlockingPortal created via anyio.from_thread.start_blocking_portal, which (by anyio design) starts “a new event loop in a new thread” for that portal’s lifetime. [1][2] However, it is not “a new thread/loop per request” inside a single TestClient context. TestClient caches a single BlockingPortal for reuse when you use TestClient as a context manager; subsequent requests within that context reuse the same portal/event loop thread. [3] What that means operationally: - Outside a context manager, TestClient may create a new blocking portal for each interaction (so you should expect different event loops across requests). [3] - Within a single with TestClient(app) as client: block, all requests run in the same cached BlockingPortal/event loop thread. [3] Primary implementation points: - TestClient’s portal factory calls anyio.from_thread.start_blocking_portal when self.portal is None. [1][4] - anyio.start_blocking_portal’s docstring explicitly says it “Start[s] a new event loop in a new thread and run[s] a blocking portal in its main task.” [2] So the accurate characterization is: “separate event loop(s) from your calling thread,” with portal reuse inside the TestClient context, rather than necessarily “a fresh event loop per individual request.” [1][2][3]

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the relevant files
ls -la
fd -a "test_server.py" -t f
fd -a "safety.py" -t f

# Show the exact code around the referenced test lines
echo "---- tests/test_server.py (lines ~50-100) ----"
nl -ba tests/test_server.py | sed -n '50,110p'

# Locate agentwatch/core/safety.py and show resolve_pending_approval implementation
echo "---- agentwatch/core/safety.py (search for resolve_pending_approval) ----"
fd -a "agentwatch/core/safety.py" -t f -H . || true
# Fallback: find any safety.py under agentwatch/core
fd -a "safety.py" agentwatch/core -t f || true

# Print the function body if present
if [ -f agentwatch/core/safety.py ]; then
  nl -ba agentwatch/core/safety.py | sed -n '1,220p'
  echo "---- function snippet for resolve_pending_approval ----"
  rg -n "def resolve_pending_approval|resolve_pending_approval" agentwatch/core/safety.py
  # Print around the match
  rg -n "def resolve_pending_approval" agentwatch/core/safety.py | head -n 5 | awk -F: '{print $1}' | head -n 1
fi

# Also search for the "/resolve" endpoint wiring
echo "---- Search for /api/v1/safety/resolve wiring ----"
rg -n "\"/api/v1/safety/(request|resolve)\"|safety.*resolve|resolve_pending_approval" . | head -n 80

Repository: sreerevanth/AgentWatch

Length of output: 2181


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- tests/test_server.py (lines 55-95) ----"
sed -n '55,95p' tests/test_server.py | nl -ba | sed -n '1,80p' || true
# fallback if nl missing: use awk line numbering
if ! command -v nl >/dev/null 2>&1; then
  echo "(nl not available; printing without line numbers)"
  sed -n '55,95p' tests/test_server.py
fi

echo "---- agentwatch/core/safety.py: locate resolve_pending_approval ----"
rg -n "def resolve_pending_approval|resolve_pending_approval" agentwatch/core/safety.py

echo "---- agentwatch/core/safety.py: show around resolve_pending_approval ----"
# show 120 lines around first match
MATCH_LINE=$(rg -n "def resolve_pending_approval" -n agentwatch/core/safety.py | head -n1 | cut -d: -f1 || true)
if [ -n "${MATCH_LINE:-}" ]; then
  START=$((MATCH_LINE-40))
  END=$((MATCH_LINE+120))
  if [ "$START" -lt 1 ]; then START=1; fi
  sed -n "${START},${END}p" agentwatch/core/safety.py
fi

echo "---- agentwatch/core/safety.py: search for set_result / call_soon_threadsafe ----"
rg -n "set_result|call_soon_threadsafe" agentwatch/core/safety.py

echo "---- Search for /api/v1/safety/resolve endpoint wiring ----"
rg -n "/api/v1/safety/(request|resolve)" -S agentwatch | head -n 80
rg -n "safety.*resolve|resolve_pending_approval" -S agentwatch | head -n 80

Repository: sreerevanth/AgentWatch

Length of output: 7761


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- tests/test_server.py: imports and client fixture/setup ----"
sed -n '1,140p' tests/test_server.py

echo "---- tests/test_server.py: find the test containing /api/v1/safety/request ----"
rg -n "api/v1/safety/request" tests/test_server.py

# show around the specific test lines near 67-85
echo "---- tests/test_server.py: around lines 40-120 (approx test) ----"
sed -n '35,130p' tests/test_server.py

echo "---- agentwatch/api/server.py: show request/resolve handlers around listed lines ----"
sed -n '600,720p' agentwatch/api/server.py

Repository: sreerevanth/AgentWatch

Length of output: 9430


🌐 Web query:

Starlette TestClient portal caching behavior when not used as context manager (with TestClient(app) as client): does it start a new event loop/thread per request or reuse the same BlockingPortal?

💡 Result:

When using the Starlette TestClient without a context manager, it does not cache the BlockingPortal [1][2]. Each request made with a standalone TestClient instance that is not wrapped in a with block will trigger the acquisition of a new BlockingPortal, which involves spawning a new thread and starting a new event loop for that specific interaction [1][2][3]. In contrast, when you use the TestClient as a context manager (e.g., with TestClient(app) as client:), the TestClient creates and caches a BlockingPortal for the duration of that block [1][2][3]. Subsequent requests made within that same context manager block will reuse the cached BlockingPortal, ensuring that they run in the same event loop and can share resources (such as state, database connections, or task groups) created during the lifespan of that specific context [2][3]. This behavior is implemented via the _portal_factory method in starlette/testclient.py, which checks if self.portal is already set [1]. If it is not set—which occurs when not inside the context manager—it creates a new one [1]. Using the context manager is the recommended approach to ensure consistent event loop behavior, especially when testing applications that rely on lifespan events or shared async resources [4][2].

Citations:


🌐 Web query:

anyio BlockingPortal thread-safety: can client.post be called concurrently from different Python threads using the same TestClient instance?

💡 Result:

Yes—concurrent calls to client.post from different Python threads using the same Starlette/FastAPI TestClient instance are generally expected to work because TestClient caches a single anyio BlockingPortal within its context, and that portal can be used to marshal work into the event loop from external threads. Starlette’s TestClient uses a cached portal for all interactions in the synchronous context manager, and anyio documents that a BlockingPortal can run async code from other threads via portal.call and even spawn tasks from that portal. [1][2][3] However, there are important caveats: 1) You must use the same TestClient instance within its context manager (with TestClient(app) as client:...). Starlette notes that TestClient session/caching is managed by the context manager; using it outside the context can lead to the portal/thread lifecycle issues. [4][1] 2) Portal.call concurrency is mediated by the portal’s event loop thread/task group; thread-safety depends on the portal staying alive. If you end/exit the TestClient context while worker threads are still issuing requests, calls can fail (e.g., “portal not running” style errors). [2][3] 3) anyio BlockingPortalProvider is specifically designed so that “no matter how many threads call … at the same time, the same blocking portal will be used” (i.e., sharing the portal across threads is the intended pattern). [3] Practical answer to your question: if you share one TestClient instance (and keep it inside a single with block so its cached portal remains running), then yes, different threads can call client.post concurrently; TestClient will route each call through the same BlockingPortal/event loop. [1][2][3] If you tell me which library/version you’re using (Starlette vs FastAPI, and TestClient backend), I can narrow to that exact TestClient implementation.

Citations:


Fix pending-approval wakeup across TestClient portals (thread-safe future resolution).

tests/test_server.py creates TestClient(app) without a context manager, and the /request and background-thread /resolve calls therefore run in different anyio BlockingPortal/event-loop threads. In agentwatch/core/safety.py, resolve_pending_approval() resolves the waiter by calling future.set_result(approved) directly, which is not safe across event loops and can cause /api/v1/safety/request to only complete via the approval_timeout_seconds path. Resolve the result via the future’s originating loop (e.g., future.get_loop().call_soon_threadsafe(future.set_result, approved), or store the loop alongside the future), and add an elapsed-time assertion so the test doesn’t rely on time.sleep(0.1).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_server.py` around lines 67 - 85, The issue is that
resolve_pending_approval resolves an anyio future from a different event loop
thread by calling future.set_result(approved) directly; update
resolve_pending_approval in agentwatch/core/safety.py to schedule the resolution
on the future's originating loop (e.g., use
future.get_loop().call_soon_threadsafe(future.set_result, approved) or store the
originating loop when creating the future and use loop.call_soon_threadsafe) so
cross-portal thread resolution is thread-safe; also update tests/test_server.py
to avoid time.sleep by asserting the elapsed time for the request completes well
under the approval_timeout_seconds (replace the implicit sleep-based wait with
an elapsed-time assertion after joining the resolver thread).

@MayurKharat0390 MayurKharat0390 force-pushed the fix/http-approval-nonblocking branch 2 times, most recently from 82953ce to 88c3dec Compare May 30, 2026 04:28
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/test_server.py`:
- Line 50: Replace the deprecated httpx.AsyncClient usage that passes app=
directly with a transport using httpx.ASGITransport: instantiate AsyncClient
with transport=httpx.ASGITransport(app=app), keeping base_url the same; also
remove the redundant inner import "from agentwatch.api.server import app" inside
the async test since app is already imported at module scope. Ensure you import
httpx.ASGITransport if not already referenced.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 2a1e45a0-4a3a-4cbf-81d8-dfedb6fadf98

📥 Commits

Reviewing files that changed from the base of the PR and between 82953ce and 88c3dec.

📒 Files selected for processing (4)
  • agentwatch/api/server.py
  • agentwatch/core/safety.py
  • tests/test_safety.py
  • tests/test_server.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • agentwatch/api/server.py
  • agentwatch/core/safety.py
  • tests/test_safety.py

Comment thread tests/test_server.py Outdated
@MayurKharat0390 MayurKharat0390 force-pushed the fix/http-approval-nonblocking branch from 88c3dec to 348ea63 Compare May 30, 2026 04:31
@MayurKharat0390 MayurKharat0390 force-pushed the fix/http-approval-nonblocking branch from 348ea63 to baab123 Compare May 30, 2026 04:36
@MayurKharat0390
Copy link
Copy Markdown
Contributor Author

MayurKharat0390 commented May 30, 2026

@sreerevanth
please review this i tried to solve the above error
still there is a error in the PR test
but there is no Conflict merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant