Support for Redis-based session store to run across multiple nodes#374
Support for Redis-based session store to run across multiple nodes#374
Conversation
📝 WalkthroughWalkthroughThe pull request introduces a distributed session registry with pluggable in-memory and Redis stores, moves session HTTP endpoints under call-scoped routes (/calls/{call_id}/...), and integrates registry-driven session lifecycle and periodic maintenance into AgentLauncher. Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant API as HTTP API
participant Launcher as AgentLauncher
participant Registry as SessionRegistry
participant Storage as SessionKVStore
Client->>API: POST /calls/{call_id}/sessions
API->>Launcher: start_session(call_id)
Launcher->>Launcher: create AgentSession
Launcher->>Registry: register(session_id, call_id)
Registry->>Storage: set(sessions/{session_id}, data, ttl)
Registry->>Storage: set(call_sessions/{call_id}/{session_id}, data, ttl)
Storage-->>Registry: OK
Registry-->>Launcher: registered
Launcher-->>API: session started
API-->>Client: 201 Created
par Maintenance Loop
Launcher->>Launcher: _maintenance_loop() periodic task
Launcher->>Registry: get_close_requests(session_ids)
Registry->>Storage: get(close_requests/{session_id})
Storage-->>Registry: close flags
Registry-->>Launcher: session_ids_to_close
Launcher->>Launcher: close matched sessions
and
Launcher->>Registry: refresh(active_sessions)
Registry->>Storage: expire(*keys, ttl)
Storage-->>Registry: TTLs extended
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
agents-core/vision_agents/core/runner/http/options.py (1)
7-8:⚠️ Potential issue | 🟡 MinorAdd explicit type parameters to
Callableannotations.Lines 44–47 use bare
Callablewithout type parameters, which violates the coding guideline requiring fullCallablesignatures. Change toCallable[..., Any]:Typing fix
- can_start_session: Callable = allow_all - can_close_session: Callable = allow_all - can_view_session: Callable = allow_all - can_view_metrics: Callable = allow_all + can_start_session: Callable[..., Any] = allow_all + can_close_session: Callable[..., Any] = allow_all + can_view_session: Callable[..., Any] = allow_all + can_view_metrics: Callable[..., Any] = allow_all🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/runner/http/options.py` around lines 7 - 8, Update any bare Callable annotations in this module to include explicit type parameters (use Callable[..., Any]); locate functions/types that currently use plain Callable (e.g., annotations referenced around allow_all and the Callable usages on lines ~44–47) and change them to Callable[..., Any], ensuring Any is imported from typing if not already present so the annotations are fully typed.agents-core/vision_agents/core/runner/http/api.py (1)
75-86:⚠️ Potential issue | 🟠 MajorRemove broad exception catch and avoid exposing exception details in HTTP responses.
The
except Exception as e:clause violates the Python coding guideline (catch specific exceptions only) and leaks internal error details to clients. Sincestart_session()documents that it raises onlySessionLimitExceededand its subclasses—which are already caught on line 80—the broad catch is unnecessary. Remove it and let unexpected errors bubble up as 500 responses without exposing exception text.🛠️ Proposed fix
- except Exception as e: - logger.exception("Failed to start agent") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to start agent: {str(e)}", - ) from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/runner/http/api.py` around lines 75 - 86, Remove the broad "except Exception as e" block so unexpected errors from launcher.start_session bubble up instead of being caught and returning internal exception text to clients; keep the specific "except SessionLimitExceeded as e" handler. In practice, delete the general except block that calls logger.exception and raises HTTPException with f"Failed to start agent: {str(e)}", leaving only the try + SessionLimitExceeded except so Python/ASGI will produce a 500 without exposing the exception details.
🧹 Nitpick comments (14)
examples/08_agent_server_example/README.md (1)
181-184: Usecall_idin the auth example to avoid overly broad authorization.On Line 181,
call_idis accepted but never enforced. In a multi-call setup, this example can imply global permission is enough. Please show call-level authorization in the sample.Suggested doc example adjustment
async def can_start_session(call_id: str, token_payload=Depends(verify_token)): """Check if the caller has permission to start agent sessions.""" - if "agents:start" not in token_payload.get("permissions", []): + allowed_calls = token_payload.get("call_ids", []) + if "agents:start" not in token_payload.get("permissions", []) or call_id not in allowed_calls: raise HTTPException(status_code=403, detail="Insufficient permissions")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/08_agent_server_example/README.md` around lines 181 - 184, The can_start_session function accepts call_id but doesn't enforce call-level authorization; update can_start_session to check that the token_payload grants permission for this specific call_id (e.g., look for a scoped permission like f"agents:start:{call_id}" or verify call_id is present in token_payload.get("allowed_call_ids", [])) instead of only checking the global "agents:start" permission; use the existing token_payload and call_id parameters and raise HTTPException(403) if the scoped check fails so the example demonstrates per-call authorization.agents-core/vision_agents/core/agents/session_registry/redis_storage.py (2)
48-52: Move private helpers after public feature methods.
_prefixedand_strip_prefixare private helpers but are placed before the public lifecycle methodsstart/close. Per the project's method-ordering convention (__init__→ public lifecycle → properties → public feature methods → private helpers → dunders), they should move afterdelete/publish/subscribe.As per coding guidelines: "Order class methods as:
__init__, public lifecycle methods, properties, public feature methods, private helpers, dunder methods."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py` around lines 48 - 52, Move the private helper methods _prefixed and _strip_prefix so they appear after the public feature methods (delete, publish, subscribe) and after any properties, but before any dunder methods; ensure the class method order follows the project's convention: __init__, public lifecycle methods (start/close), properties, public feature methods (delete/publish/subscribe), then private helpers (_prefixed/_strip_prefix).
54-64: Replace theinspect.iscoroutineguard with a plainawait.
redis.asyncio.Redis.ping()always returns a coroutine in the asyncio context; the guard is redundant and obscures intent. Remove the now-unusedimport inspectas well.♻️ Proposed simplification
async def start(self) -> None: """Open the Redis connection and verify it with a PING.""" - # Handle non-specific Union return type here - ping = self._redis.ping() - if inspect.iscoroutine(ping): - await ping + await self._redis.ping() connection_kwargs = self._redis.connection_pool.connection_kwargsRemove the now-unused
import inspectfrom line 1.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py` around lines 54 - 64, In the start method of RedisSessionKVStore, replace the inspect.iscoroutine guard around self._redis.ping() with a direct await (i.e., await self._redis.ping()) and remove the now-unused import inspect; update the start function to always await the ping coroutine and delete the inspect import from the module top.agents-core/vision_agents/core/agents/session_registry/storage.py (1)
27-46: Move__aenter__/__aexit__to the end of the class body.Per the project's ordering convention, dunder methods come last. Currently
__aenter__/__aexit__appear between the public lifecycle methods and the abstract feature methods, which violates the guideline.As per coding guidelines: "Order class methods as:
__init__, public lifecycle methods, properties, public feature methods, private helpers, dunder methods."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/storage.py` around lines 27 - 46, Move the async context-manager dunders to the end of the class: relocate __aenter__ and __aexit__ so they appear after the public lifecycle methods (start, close), properties, public feature methods and private helpers — i.e., place __aenter__ and __aexit__ at the end of the class body to follow the project's ordering convention; keep their implementations unchanged and ensure they still call await self.start() and await self.close() respectively.agents-core/vision_agents/core/runner/http/dependencies.py (1)
7-16: Add return-type annotations and docstrings to the permission stubs.All four stubs are public API hooks that users override; they currently have no return type annotation (implicitly
None) and no docstrings. Per Google-style guidelines, public callables should carry at minimum a one-liner docstring and an explicit return type.✏️ Suggested addition
-def can_start_session(call_id: str): ... +def can_start_session(call_id: str) -> None: + """Permission hook called before a session is started for the given call.""" + ... -def can_close_session(call_id: str): ... +def can_close_session(call_id: str) -> None: + """Permission hook called before a session is closed for the given call.""" + ... -def can_view_session(call_id: str): ... +def can_view_session(call_id: str) -> None: + """Permission hook called before session info is retrieved for the given call.""" + ... -def can_view_metrics(call_id: str): ... +def can_view_metrics(call_id: str) -> None: + """Permission hook called before metrics are retrieved for the given call.""" + ...🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/runner/http/dependencies.py` around lines 7 - 16, The four permission stub functions (can_start_session, can_close_session, can_view_session, can_view_metrics) lack return-type annotations and docstrings; update each signature to return bool (e.g. def can_start_session(call_id: str) -> bool:) and add a one-line Google-style docstring describing the purpose, the call_id parameter, and that it returns True if the action is permitted (False otherwise), so users overriding these public hooks have explicit types and minimal documentation.tests/test_agents/test_session_registry/test_in_memory_store.py (1)
9-16: Add a return type annotation to the async fixture.✏️ Suggested annotation
+from collections.abc import AsyncIterator + `@pytest.fixture`() -async def store(): +async def store() -> AsyncIterator[InMemorySessionKVStore]:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_session_registry/test_in_memory_store.py` around lines 9 - 16, The async pytest fixture "store" should declare its return type to make typing clear: change "async def store()" to "async def store() -> AsyncIterator[InMemorySessionKVStore]"; add the needed import "from typing import AsyncIterator" at the top of the test file; keep the body using InMemorySessionKVStore, await s.start(), yield s, and await s.close() as-is.agents-core/vision_agents/core/agents/agent_launcher.py (3)
306-317: SilentRuntimeErrorswallow in task finalizer is acceptable but worth a debug log.When the event loop is gone,
registry.remove()will never fire — the session lingers until TTL expiry. Consider adding alogger.debugso this isn't completely invisible.♻️ Suggested improvement
try: asyncio.get_running_loop().create_task( self._registry.remove(session_id_) ) except RuntimeError: - pass + logger.debug( + "No running loop; registry removal of %s deferred to TTL", + session_id_, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 306 - 317, The finalizer currently swallows RuntimeError silently when asyncio.get_running_loop() fails; update the except block in _finalizer to log a debug message including session_id_ and call_id_ (and optionally the exception message) to indicate that self._registry.remove(session_id_) couldn't be scheduled and the session will expire by TTL; use the module/class logger (e.g., logger.debug) so this situation is visible during debugging while preserving the existing behavior.
462-478: Broadexcept Exceptionin maintenance path.Lines 476 and 498 catch all exceptions. The guideline calls for specific exception handling. For a background resilience loop, this is a common tradeoff, but consider catching a narrower set (e.g.,
OSError,ConnectionError,redis.RedisError) to avoid swallowing unexpected programming errors likeTypeErrororKeyError.As per coding guidelines, "Never write
except Exception as e- use specific exception handling".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 462 - 478, Replace the broad "except Exception" in the maintenance loop around the call to self._registry.get_close_requests and the processing of self._sessions with a narrow exception handler that only catches expected runtime/IO errors (for example: OSError, ConnectionError and your Redis client error type such as redis.RedisError) rather than all exceptions; update the except clause to "except (OSError, ConnectionError, redis.RedisError) as e:" (add the redis import if needed) and keep the existing logger.exception call to record the error, so programming errors like TypeError/KeyError still surface while transient registry/IO errors are handled gracefully.
493-499: Same broad catch pattern in_refresh_active_sessions.Same guideline concern as above — consider narrowing the exception type to connection/IO errors.
As per coding guidelines, "Never write
except Exception as e- use specific exception handling".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 493 - 499, Narrow the broad except in _refresh_active_sessions: catch only connection/IO-related errors that can occur when calling self._registry.refresh (e.g., ConnectionError, asyncio.TimeoutError, and any HTTP client errors used in this project such as aiohttp.ClientError), log the caught exception with logger.exception including the exception variable (e) and context, and let unexpected exceptions propagate (or re-raise) instead of swallowing all Exception types; keep sessions_map and the call to self._registry.refresh as-is.tests/test_agents/test_session_registry/test_redis_store.py (2)
7-9: Prefer importing from the public re-export.Same as noted in
test_agent_launcher.py—RedisSessionKVStoreis re-exported fromvision_agents.core.agents.session_registry.♻️ Suggested fix
-from vision_agents.core.agents.session_registry.redis_storage import ( - RedisSessionKVStore, -) +from vision_agents.core.agents.session_registry import RedisSessionKVStore🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_session_registry/test_redis_store.py` around lines 7 - 9, The test currently imports RedisSessionKVStore from its internal module; update the import to use the public re-export instead by importing RedisSessionKVStore from vision_agents.core.agents.session_registry so the test relies on the public API rather than the internal module.
188-194: Accessing privateregistry._storecouples the test to internal implementation.Consider creating a standalone
SessionRegistrywithttl=1.0using theredis_storefixture directly, rather than reaching into the private_storeattribute.♻️ Suggested refactor
async def test_session_expires_without_refresh( - self, registry: SessionRegistry + self, redis_store: RedisSessionKVStore ) -> None: - short_registry = SessionRegistry(store=registry._store, ttl=1.0) + short_registry = SessionRegistry(store=redis_store, ttl=1.0) await short_registry.register("sess-expire", "call-e") await asyncio.sleep(1.5) assert await short_registry.get("sess-expire") is None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_session_registry/test_redis_store.py` around lines 188 - 194, The test test_session_expires_without_refresh currently accesses the private attribute registry._store; instead, instantiate a standalone SessionRegistry using the provided redis_store fixture (e.g., short_registry = SessionRegistry(store=redis_store, ttl=1.0)) so the test depends only on the public API; update the test to register `"sess-expire"` on short_registry, await asyncio.sleep(1.5), and assert short_registry.get("sess-expire") is None, removing any access to registry._store.tests/test_agents/test_agent_launcher.py (2)
15-21: Prefer importing from the public re-export rather than internal submodules.
InMemorySessionKVStore,RedisSessionKVStore, andSessionKVStoreare all re-exported fromvision_agents.core.agents.session_registry(and even fromvision_agents.core). Using the public path keeps tests resilient to internal restructuring.♻️ Suggested imports
-from vision_agents.core.agents.session_registry.in_memory_storage import ( - InMemorySessionKVStore, -) -from vision_agents.core.agents.session_registry.redis_storage import ( - RedisSessionKVStore, -) -from vision_agents.core.agents.session_registry.storage import SessionKVStore +from vision_agents.core.agents.session_registry import ( + InMemorySessionKVStore, + RedisSessionKVStore, + SessionKVStore, +)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_agent_launcher.py` around lines 15 - 21, The test imports use internal submodule paths for InMemorySessionKVStore, RedisSessionKVStore, and SessionKVStore; update the import statements to use the public re-export (e.g., import these three symbols from vision_agents.core.agents.session_registry or the top-level vision_agents.core re-export) so the test relies on the public API rather than internal modules (replace the three imports referencing session_registry.in_memory_storage, session_registry.redis_storage, and session_registry.storage with a single public import of InMemorySessionKVStore, RedisSessionKVStore, and SessionKVStore).
63-68: Consider marking Redis-dependent fixtures/tests with@pytest.mark.integration.The
redis_urlfixture spins up a real Redis container. Per coding guidelines, integration tests should be decorated with@pytest.mark.integration. Since thestoragefixture unconditionally depends onredis_url, even the"memory"param triggers a container startup. Consider either:
- Adding
@pytest.mark.integrationtoTestAgentLauncherWithStorage, or- Splitting the fixture so the memory variant doesn't require
redis_url.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_agent_launcher.py` around lines 63 - 68, The redis_url fixture launches a real Redis container and causes Redis to start even for memory-backed runs; update tests so Redis-dependent runs are marked or the fixture is split: either add `@pytest.mark.integration` to TestAgentLauncherWithStorage (so the whole class is treated as integration) or refactor the storage fixture to conditionally require redis_url only when storage param != "memory" (ensure the storage fixture checks the param and only references redis_url when needed, leaving memory cases free of container startup).agents-core/vision_agents/core/agents/session_registry/registry.py (1)
81-95:update_metricsuses a non-atomic read-modify-write.In a multi-node Redis deployment, concurrent
update_metricscalls for the same session could cause a lost update. This is likely safe if only the owning node updates metrics (as the docstring implies), but worth noting for future awareness if the contract changes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/registry.py` around lines 81 - 95, The current update_metrics function performs a non-atomic read-modify-write using self._store.get and self._store.set which can cause lost updates under concurrent calls; change update_metrics (the method named update_metrics that reads the key f"sessions/{session_id}" and writes metrics and metrics_updated_at) to perform the update atomically — either use your store client's optimistic locking (WATCH/MULTI/EXEC) around the key, or run the update in a single server-side operation (e.g., a Redis EVAL/Lua script or an atomic JSON/SET command) so that writing "metrics" and "metrics_updated_at" to the sessions/{session_id} value cannot be interleaved by other updaters.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@agents-core/vision_agents/core/agents/session_registry/in_memory_storage.py`:
- Around line 115-124: In subscribe(), the generator finalizer can raise
KeyError if close() cleared self._subscribers; modify the finally block in
subscribe(self, channel: str) so it checks for the channel and queue presence
before removing (e.g., use self._subscribers.get(channel) and remove if present,
or guard with "if channel in self._subscribers and queue in
self._subscribers[channel]" and only then remove and possibly delete the empty
list) to avoid accessing a missing key on cleanup.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`:
- Around line 75-80: In mset, remove the erroneous await on pipeline command
calls so commands are queued instead of awaited: inside the async def mset(self,
items...) loop call pipe.set(self._prefixed(key), value, px=int(ttl * 1000)) (no
await) for each item and leave only a single await pipe.execute() after the
loop; this fixes improper use of await with redis-py asyncio pipeline methods.
In `@agents-core/vision_agents/core/agents/session_registry/storage.py`:
- Around line 45-46: Change the async context-manager exit method to use the
standard three-argument signature rather than a catch-all tuple: update the
method definition for __aexit__ to async def __aexit__(self, exc_type, exc_val,
exc_tb) -> None so static analyzers and editors can recognize the expected
parameters; keep calling await self.close() inside the method and preserve the
existing return behavior (None/False) so exceptions continue to propagate as
before.
- Around line 154-171: The abstract method subscribe is declared as a sync def
but all implementations (InMemorySessionKVStore.subscribe and
RedisSessionKVStore.subscribe) are async generator functions; change the
abstract signature to "async def subscribe(self, channel: str) ->
AsyncIterator[bytes]:" so the abstract method matches the implementations and
type checkers (mypy/pyright) recognize it as an async generator returning
AsyncIterator[bytes]; update the docstring if needed but keep the same semantics
and return type.
In `@agents-core/vision_agents/core/runner/http/api.py`:
- Around line 110-117: Handlers call launcher.get_session_info(session_id) then
operate on the session without verifying the session actually belongs to the
provided call_id; update each handler (the ones that call
launcher.get_session_info and then call launcher.request_close_session or
launcher.request_read_session) to check that the returned session info has the
same call_id as the path param (e.g., compare info.call_id or
info.session_call_id to the call_id arg) and if it doesn’t match raise an
HTTPException (404 or 403) before calling
launcher.request_close_session/request_read_session; apply this check to the
close and read handlers that use
get_session_info/request_close_session/request_read_session so callers cannot
close/read sessions from other calls.
In `@tests/test_agents/test_session_registry/test_in_memory_store.py`:
- Around line 32-35: Tests like test_ttl_expiry (and the related
test_expire_refreshes_ttl and test_expire_multiple_keys) are timing-sensitive
and can flake on busy CI; update these tests in
tests/test_agents/test_session_registry/test_in_memory_store.py to widen the
expiry margin by reducing the TTL and increasing the post-set sleep (for example
change ttl from 0.5 to 0.1 and sleep from 0.7 to 0.3) or otherwise increase the
sleep-to-ttl ratio (ensure sleep >= ttl * 2–3) so the assertion checking
InMemorySessionKVStore.get("...") being None is robust on loaded runners.
In `@tests/test_agents/test_session_registry/test_redis_store.py`:
- Line 44: The two test classes that require Redis, TestRedisSessionKVStore and
TestSessionRegistryWithRedis, must be marked as integration tests; add the
`@pytest.mark.integration` decorator above each class definition and ensure pytest
is imported (add "import pytest" at the top if missing) so the decorator is
available.
---
Outside diff comments:
In `@agents-core/vision_agents/core/runner/http/api.py`:
- Around line 75-86: Remove the broad "except Exception as e" block so
unexpected errors from launcher.start_session bubble up instead of being caught
and returning internal exception text to clients; keep the specific "except
SessionLimitExceeded as e" handler. In practice, delete the general except block
that calls logger.exception and raises HTTPException with f"Failed to start
agent: {str(e)}", leaving only the try + SessionLimitExceeded except so
Python/ASGI will produce a 500 without exposing the exception details.
In `@agents-core/vision_agents/core/runner/http/options.py`:
- Around line 7-8: Update any bare Callable annotations in this module to
include explicit type parameters (use Callable[..., Any]); locate
functions/types that currently use plain Callable (e.g., annotations referenced
around allow_all and the Callable usages on lines ~44–47) and change them to
Callable[..., Any], ensuring Any is imported from typing if not already present
so the annotations are fully typed.
---
Nitpick comments:
In `@agents-core/vision_agents/core/agents/agent_launcher.py`:
- Around line 306-317: The finalizer currently swallows RuntimeError silently
when asyncio.get_running_loop() fails; update the except block in _finalizer to
log a debug message including session_id_ and call_id_ (and optionally the
exception message) to indicate that self._registry.remove(session_id_) couldn't
be scheduled and the session will expire by TTL; use the module/class logger
(e.g., logger.debug) so this situation is visible during debugging while
preserving the existing behavior.
- Around line 462-478: Replace the broad "except Exception" in the maintenance
loop around the call to self._registry.get_close_requests and the processing of
self._sessions with a narrow exception handler that only catches expected
runtime/IO errors (for example: OSError, ConnectionError and your Redis client
error type such as redis.RedisError) rather than all exceptions; update the
except clause to "except (OSError, ConnectionError, redis.RedisError) as e:"
(add the redis import if needed) and keep the existing logger.exception call to
record the error, so programming errors like TypeError/KeyError still surface
while transient registry/IO errors are handled gracefully.
- Around line 493-499: Narrow the broad except in _refresh_active_sessions:
catch only connection/IO-related errors that can occur when calling
self._registry.refresh (e.g., ConnectionError, asyncio.TimeoutError, and any
HTTP client errors used in this project such as aiohttp.ClientError), log the
caught exception with logger.exception including the exception variable (e) and
context, and let unexpected exceptions propagate (or re-raise) instead of
swallowing all Exception types; keep sessions_map and the call to
self._registry.refresh as-is.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`:
- Around line 48-52: Move the private helper methods _prefixed and _strip_prefix
so they appear after the public feature methods (delete, publish, subscribe) and
after any properties, but before any dunder methods; ensure the class method
order follows the project's convention: __init__, public lifecycle methods
(start/close), properties, public feature methods (delete/publish/subscribe),
then private helpers (_prefixed/_strip_prefix).
- Around line 54-64: In the start method of RedisSessionKVStore, replace the
inspect.iscoroutine guard around self._redis.ping() with a direct await (i.e.,
await self._redis.ping()) and remove the now-unused import inspect; update the
start function to always await the ping coroutine and delete the inspect import
from the module top.
In `@agents-core/vision_agents/core/agents/session_registry/registry.py`:
- Around line 81-95: The current update_metrics function performs a non-atomic
read-modify-write using self._store.get and self._store.set which can cause lost
updates under concurrent calls; change update_metrics (the method named
update_metrics that reads the key f"sessions/{session_id}" and writes metrics
and metrics_updated_at) to perform the update atomically — either use your store
client's optimistic locking (WATCH/MULTI/EXEC) around the key, or run the update
in a single server-side operation (e.g., a Redis EVAL/Lua script or an atomic
JSON/SET command) so that writing "metrics" and "metrics_updated_at" to the
sessions/{session_id} value cannot be interleaved by other updaters.
In `@agents-core/vision_agents/core/agents/session_registry/storage.py`:
- Around line 27-46: Move the async context-manager dunders to the end of the
class: relocate __aenter__ and __aexit__ so they appear after the public
lifecycle methods (start, close), properties, public feature methods and private
helpers — i.e., place __aenter__ and __aexit__ at the end of the class body to
follow the project's ordering convention; keep their implementations unchanged
and ensure they still call await self.start() and await self.close()
respectively.
In `@agents-core/vision_agents/core/runner/http/dependencies.py`:
- Around line 7-16: The four permission stub functions (can_start_session,
can_close_session, can_view_session, can_view_metrics) lack return-type
annotations and docstrings; update each signature to return bool (e.g. def
can_start_session(call_id: str) -> bool:) and add a one-line Google-style
docstring describing the purpose, the call_id parameter, and that it returns
True if the action is permitted (False otherwise), so users overriding these
public hooks have explicit types and minimal documentation.
In `@examples/08_agent_server_example/README.md`:
- Around line 181-184: The can_start_session function accepts call_id but
doesn't enforce call-level authorization; update can_start_session to check that
the token_payload grants permission for this specific call_id (e.g., look for a
scoped permission like f"agents:start:{call_id}" or verify call_id is present in
token_payload.get("allowed_call_ids", [])) instead of only checking the global
"agents:start" permission; use the existing token_payload and call_id parameters
and raise HTTPException(403) if the scoped check fails so the example
demonstrates per-call authorization.
In `@tests/test_agents/test_agent_launcher.py`:
- Around line 15-21: The test imports use internal submodule paths for
InMemorySessionKVStore, RedisSessionKVStore, and SessionKVStore; update the
import statements to use the public re-export (e.g., import these three symbols
from vision_agents.core.agents.session_registry or the top-level
vision_agents.core re-export) so the test relies on the public API rather than
internal modules (replace the three imports referencing
session_registry.in_memory_storage, session_registry.redis_storage, and
session_registry.storage with a single public import of InMemorySessionKVStore,
RedisSessionKVStore, and SessionKVStore).
- Around line 63-68: The redis_url fixture launches a real Redis container and
causes Redis to start even for memory-backed runs; update tests so
Redis-dependent runs are marked or the fixture is split: either add
`@pytest.mark.integration` to TestAgentLauncherWithStorage (so the whole class is
treated as integration) or refactor the storage fixture to conditionally require
redis_url only when storage param != "memory" (ensure the storage fixture checks
the param and only references redis_url when needed, leaving memory cases free
of container startup).
In `@tests/test_agents/test_session_registry/test_in_memory_store.py`:
- Around line 9-16: The async pytest fixture "store" should declare its return
type to make typing clear: change "async def store()" to "async def store() ->
AsyncIterator[InMemorySessionKVStore]"; add the needed import "from typing
import AsyncIterator" at the top of the test file; keep the body using
InMemorySessionKVStore, await s.start(), yield s, and await s.close() as-is.
In `@tests/test_agents/test_session_registry/test_redis_store.py`:
- Around line 7-9: The test currently imports RedisSessionKVStore from its
internal module; update the import to use the public re-export instead by
importing RedisSessionKVStore from vision_agents.core.agents.session_registry so
the test relies on the public API rather than the internal module.
- Around line 188-194: The test test_session_expires_without_refresh currently
accesses the private attribute registry._store; instead, instantiate a
standalone SessionRegistry using the provided redis_store fixture (e.g.,
short_registry = SessionRegistry(store=redis_store, ttl=1.0)) so the test
depends only on the public API; update the test to register `"sess-expire"` on
short_registry, await asyncio.sleep(1.5), and assert
short_registry.get("sess-expire") is None, removing any access to
registry._store.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (22)
agents-core/pyproject.tomlagents-core/vision_agents/core/__init__.pyagents-core/vision_agents/core/agents/__init__.pyagents-core/vision_agents/core/agents/agent_launcher.pyagents-core/vision_agents/core/agents/session_registry/__init__.pyagents-core/vision_agents/core/agents/session_registry/in_memory_storage.pyagents-core/vision_agents/core/agents/session_registry/redis_storage.pyagents-core/vision_agents/core/agents/session_registry/registry.pyagents-core/vision_agents/core/agents/session_registry/storage.pyagents-core/vision_agents/core/agents/session_registry/types.pyagents-core/vision_agents/core/runner/http/api.pyagents-core/vision_agents/core/runner/http/dependencies.pyagents-core/vision_agents/core/runner/http/models.pyagents-core/vision_agents/core/runner/http/options.pyagents-core/vision_agents/core/runner/runner.pyexamples/08_agent_server_example/README.mdpyproject.tomltests/test_agents/test_agent_launcher.pytests/test_agents/test_runner.pytests/test_agents/test_session_registry/__init__.pytests/test_agents/test_session_registry/test_in_memory_store.pytests/test_agents/test_session_registry/test_redis_store.py
💤 Files with no reviewable changes (2)
- agents-core/vision_agents/core/runner/http/models.py
- agents-core/vision_agents/core/runner/runner.py
| async def subscribe(self, channel: str) -> AsyncIterator[bytes]: | ||
| queue: asyncio.Queue[bytes] = asyncio.Queue() | ||
| self._subscribers.setdefault(channel, []).append(queue) | ||
| try: | ||
| while True: | ||
| yield await queue.get() | ||
| finally: | ||
| self._subscribers[channel].remove(queue) | ||
| if not self._subscribers[channel]: | ||
| del self._subscribers[channel] |
There was a problem hiding this comment.
Potential KeyError in subscribe() finalizer if close() has already cleared _subscribers.
If close() runs self._subscribers.clear() while an async generator from subscribe() is still active, the subsequent generator finalization will attempt self._subscribers[channel].remove(queue), raising KeyError.
🐛 Proposed fix
finally:
- self._subscribers[channel].remove(queue)
- if not self._subscribers[channel]:
- del self._subscribers[channel]
+ subs = self._subscribers.get(channel)
+ if subs is not None:
+ subs.remove(queue)
+ if not subs:
+ del self._subscribers[channel]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/in_memory_storage.py`
around lines 115 - 124, In subscribe(), the generator finalizer can raise
KeyError if close() cleared self._subscribers; modify the finally block in
subscribe(self, channel: str) so it checks for the channel and queue presence
before removing (e.g., use self._subscribers.get(channel) and remove if present,
or guard with "if channel in self._subscribers and queue in
self._subscribers[channel]" and only then remove and possibly delete the empty
list) to avoid accessing a missing key on cleanup.
| async def mset(self, items: list[tuple[str, bytes, float]]) -> None: | ||
| """Atomically store multiple values via a MULTI/EXEC pipeline.""" | ||
| async with self._redis.pipeline() as pipe: | ||
| for key, value, ttl in items: | ||
| await pipe.set(self._prefixed(key), value, px=int(ttl * 1000)) | ||
| await pipe.execute() |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
redis-py asyncio pipeline await commands usage redis 5.0
💡 Result:
In redis-py 5.0’s asyncio API (import redis.asyncio as redis), pipeline command methods (e.g., get, set) queue/buffer commands and generally should not be awaited; you await only execute() (or use an async with pipeline context manager and still await execute()). (redis-py-uglide.readthedocs.io)
Basic asyncio pipeline (batch / no atomicity)
import redis.asyncio as redis
r = redis.Redis(host="localhost", port=6379)
pipe = r.pipeline(transaction=False) # plain pipelining (no MULTI/EXEC)
pipe.set("k1", "v1")
pipe.get("k1")
results = await pipe.execute() # await ONLY execute()
# results == [True, b"v1"]
await r.close()Pipelines buffer commands and run them when execute() is called, returning results in order. (redis.io)
Transaction pipeline (atomic MULTI/EXEC)
By default, pipelines are wrapped in MULTI/EXEC for atomic execution unless you set transaction=False. (github.com)
import redis.asyncio as redis
r = await redis.from_url("redis://localhost")
async with r.pipeline(transaction=True) as pipe:
ok1, ok2 = await (
pipe.set("key1", "value1")
.set("key2", "value2")
.execute()
)
await r.close()async with is supported; commands still take effect only after await pipe.execute(). (redis-py-uglide.readthedocs.io)
Citations:
- 1: https://redis-py-uglide.readthedocs.io/en/latest/examples/asyncio_examples.html?utm_source=openai
- 2: https://redis.io/docs/latest/develop/clients/redis-py/transpipe/?utm_source=openai
- 3: https://github.com/redis/redis-py?utm_source=openai
- 4: https://redis-py-uglide.readthedocs.io/en/latest/examples/asyncio_examples.html?utm_source=openai
Remove await from individual pipeline commands; only execute() should be awaited.
In redis-py 5.0's asyncio API, pipeline command methods (set, get, etc.) queue commands and should not be awaited. Only pipe.execute() should be awaited. Change line 77 to:
pipe.set(self._prefixed(key), value, px=int(ttl * 1000))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`
around lines 75 - 80, In mset, remove the erroneous await on pipeline command
calls so commands are queued instead of awaited: inside the async def mset(self,
items...) loop call pipe.set(self._prefixed(key), value, px=int(ttl * 1000)) (no
await) for each item and leave only a single await pipe.execute() after the
loop; this fixes improper use of await with redis-py asyncio pipeline methods.
| async def __aexit__(self, *exc) -> None: | ||
| await self.close() |
There was a problem hiding this comment.
Use the standard three-argument __aexit__ signature.
*exc captures all three exception arguments into a single tuple, which is non-idiomatic and breaks static analysis tools and editors that expect the standard (exc_type, exc_val, exc_tb) signature.
✏️ Proposed fix
- async def __aexit__(self, *exc) -> None:
+ async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/storage.py` around
lines 45 - 46, Change the async context-manager exit method to use the standard
three-argument signature rather than a catch-all tuple: update the method
definition for __aexit__ to async def __aexit__(self, exc_type, exc_val, exc_tb)
-> None so static analyzers and editors can recognize the expected parameters;
keep calling await self.close() inside the method and preserve the existing
return behavior (None/False) so exceptions continue to propagate as before.
| @abc.abstractmethod | ||
| def subscribe(self, channel: str) -> AsyncIterator[bytes]: | ||
| """ | ||
| Subscribe to a channel. | ||
|
|
||
| Returns an async iterator that yields messages as they arrive. | ||
| The iterator should be used with ``async for``:: | ||
|
|
||
| async for message in storage.subscribe("events/session-123"): | ||
| handle(message) | ||
|
|
||
| Args: | ||
| channel: The channel name to subscribe to. | ||
|
|
||
| Returns: | ||
| An async iterator yielding message payloads as bytes. | ||
| """ | ||
| ... |
There was a problem hiding this comment.
subscribe abstract method should be declared async def to match all implementations.
The abstract method is declared as a plain sync def, but both InMemorySessionKVStore.subscribe and RedisSessionKVStore.subscribe are async def generators (async def + yield). This creates a type mismatch: the abstract contract says "returns an AsyncIterator", but static analysis tools (mypy/pyright) see a sync callable vs async generator. Declare it async def to match the actual protocol.
🛠️ Proposed fix
- `@abc.abstractmethod`
- def subscribe(self, channel: str) -> AsyncIterator[bytes]:
+ `@abc.abstractmethod`
+ async def subscribe(self, channel: str) -> AsyncIterator[bytes]:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @abc.abstractmethod | |
| def subscribe(self, channel: str) -> AsyncIterator[bytes]: | |
| """ | |
| Subscribe to a channel. | |
| Returns an async iterator that yields messages as they arrive. | |
| The iterator should be used with ``async for``:: | |
| async for message in storage.subscribe("events/session-123"): | |
| handle(message) | |
| Args: | |
| channel: The channel name to subscribe to. | |
| Returns: | |
| An async iterator yielding message payloads as bytes. | |
| """ | |
| ... | |
| `@abc.abstractmethod` | |
| async def subscribe(self, channel: str) -> AsyncIterator[bytes]: | |
| """ | |
| Subscribe to a channel. | |
| Returns an async iterator that yields messages as they arrive. | |
| The iterator should be used with ``async for``:: | |
| async for message in storage.subscribe("events/session-123"): | |
| handle(message) | |
| Args: | |
| channel: The channel name to subscribe to. | |
| Returns: | |
| An async iterator yielding message payloads as bytes. | |
| """ | |
| ... |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/storage.py` around
lines 154 - 171, The abstract method subscribe is declared as a sync def but all
implementations (InMemorySessionKVStore.subscribe and
RedisSessionKVStore.subscribe) are async generator functions; change the
abstract signature to "async def subscribe(self, channel: str) ->
AsyncIterator[bytes]:" so the abstract method matches the implementations and
type checkers (mypy/pyright) recognize it as an async generator returning
AsyncIterator[bytes]; update the docstring if needed but keep the same semantics
and return type.
| info = await launcher.get_session_info(session_id) | ||
| if info is None: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_404_NOT_FOUND, | ||
| detail=f"Session with id '{session_id}' not found", | ||
| ) | ||
|
|
||
| return Response(status_code=204) | ||
| await launcher.request_close_session(session_id) | ||
| return Response(status_code=202) |
There was a problem hiding this comment.
Enforce call_id ↔ session_id ownership before close/read operations.
These handlers only verify that the session exists; they do not verify it belongs to the path call_id. That allows cross-call access/closure when a caller has permission for one call but knows another call’s session_id.
🔒 Proposed fix
async def close_session(
call_id: str,
session_id: str,
launcher: AgentLauncher = Depends(get_launcher),
) -> Response:
@@
- info = await launcher.get_session_info(session_id)
- if info is None:
+ info = await launcher.get_session_info(session_id)
+ if info is None or info.call_id != call_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session with id '{session_id}' not found",
)
@@
async def close_session_beacon(
call_id: str,
session_id: str,
launcher: AgentLauncher = Depends(get_launcher),
) -> Response:
@@
- info = await launcher.get_session_info(session_id)
- if info is None:
+ info = await launcher.get_session_info(session_id)
+ if info is None or info.call_id != call_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session with id '{session_id}' not found",
)
@@
async def get_session_info(
call_id: str,
session_id: str,
launcher: AgentLauncher = Depends(get_launcher),
) -> GetAgentSessionResponse:
@@
- if info is None:
+ if info is None or info.call_id != call_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session with id '{session_id}' not found",
)
@@
async def get_session_metrics(
call_id: str,
session_id: str,
launcher: AgentLauncher = Depends(get_launcher),
) -> GetAgentSessionMetricsResponse:
@@
- if info is None:
+ if info is None or info.call_id != call_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session with id '{session_id}' not found",
)Also applies to: 137-144, 160-170, 187-201
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/runner/http/api.py` around lines 110 - 117,
Handlers call launcher.get_session_info(session_id) then operate on the session
without verifying the session actually belongs to the provided call_id; update
each handler (the ones that call launcher.get_session_info and then call
launcher.request_close_session or launcher.request_read_session) to check that
the returned session info has the same call_id as the path param (e.g., compare
info.call_id or info.session_call_id to the call_id arg) and if it doesn’t match
raise an HTTPException (404 or 403) before calling
launcher.request_close_session/request_read_session; apply this check to the
close and read handlers that use
get_session_info/request_close_session/request_read_session so callers cannot
close/read sessions from other calls.
| async def test_ttl_expiry(self, store: InMemorySessionKVStore) -> None: | ||
| await store.set("ephemeral", b"bye", ttl=0.5) | ||
| await asyncio.sleep(0.7) | ||
| assert await store.get("ephemeral") is None |
There was a problem hiding this comment.
Timing-sensitive TTL tests may be flaky on loaded CI runners.
test_ttl_expiry (and similarly test_expire_refreshes_ttl at lines 56-61, test_expire_multiple_keys at lines 66-72) uses a 200 ms margin between TTL expiry and the assertion sleep. On a heavily loaded CI runner, scheduling jitter can push the actual elapsed time under the threshold. Consider widening the sleep/TTL margin ratio (e.g., ttl=0.1, sleep=0.3) or using pytest-timeout to cap total test time without tightening the relative margin.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_agents/test_session_registry/test_in_memory_store.py` around lines
32 - 35, Tests like test_ttl_expiry (and the related test_expire_refreshes_ttl
and test_expire_multiple_keys) are timing-sensitive and can flake on busy CI;
update these tests in
tests/test_agents/test_session_registry/test_in_memory_store.py to widen the
expiry margin by reducing the TTL and increasing the post-set sleep (for example
change ttl from 0.5 to 0.1 and sleep from 0.7 to 0.3) or otherwise increase the
sleep-to-ttl ratio (ensure sleep >= ttl * 2–3) so the assertion checking
InMemorySessionKVStore.get("...") being None is robust on loaded runners.
- Added SessionRegistry to store agent sessions - Added in-memory and Redis-based backends for SessionRegistry - [breaking] updated HTTP API urls to always include call_id for easy access in permission callbacks - [breaking] removed "get_current_user" permission callback as redundant
b387971 to
221c226
Compare
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
agents-core/vision_agents/core/agents/agent_launcher.py (2)
306-328:⚠️ Potential issue | 🟡 MinorFire-and-forget
registry.removein_finalizerhas no exception handler.The
create_task(self._registry.remove(session_id_))at lines 313-315 fires and forgets the coroutine. Ifregistry.removeraises (e.g., Redis connection error during shutdown), the unhandled task exception will surface as an asyncio warning. Consider adding a done callback or usingasyncio.shieldwith error logging.This is a minor reliability concern — the session will still be removed by TTL expiry, but operational noise is worth avoiding.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 306 - 328, The _finalizer currently fire-and-forgets the coroutine created by asyncio.get_running_loop().create_task(self._registry.remove(session_id_)) which can produce unhandled-task exceptions; wrap that call so exceptions are observed—either create a small async wrapper that awaits self._registry.remove(session_id_) inside try/except and logs errors, or attach a done callback to the created Task to catch and log exceptions; update the code in _finalizer (referencing _finalizer, self._registry.remove, and the create_task call) to ensure any exception from registry.remove is caught and logged rather than left unhandled.
186-190:⚠️ Potential issue | 🟠 MajorBroad
except Exceptionviolates the "catch specific exceptions" guideline.
cancel_and_waitcan only realistically raise task-related errors. Prefer catchingasyncio.CancelledError(orBaseExceptionif truly needed) and uselogger.exception()instead oflogger.error()so the traceback is preserved.🐛 Proposed fix
for result in asyncio.as_completed(coros): try: await result - except Exception as exc: - logger.error(f"Failed to cancel the agent task: {exc}") + except BaseException: + logger.exception("Failed to cancel the agent task")As per coding guidelines, "Never write
except Exception as e; catch specific exceptions instead" and "Preferlogger.exception()when logging an error with a traceback instead oflogger.error('Error: {exc}')".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 186 - 190, Replace the broad "except Exception as exc" around "await result" with a targeted handler for cancellation-related errors (e.g., "except asyncio.CancelledError") and use logger.exception() to preserve the traceback; if you truly need to catch everything (rare), use "except BaseException" intentionally and document why. Update the block that currently calls logger.error(...) (referencing "result" and the surrounding cancel_and_wait logic) to call logger.exception(...) in the cancellation-specific except handler so tracebacks are retained.
♻️ Duplicate comments (7)
agents-core/vision_agents/core/agents/session_registry/storage.py (2)
45-46: Non-standard__aexit__signature — unchanged from prior review.The three exception arguments are still collapsed into
*excrather than named(exc_type, exc_val, exc_tb).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/storage.py` around lines 45 - 46, The asynchronous context manager method __aexit__ currently collapses exception args into *exc; change its signature to the standard async def __aexit__(self, exc_type, exc_val, exc_tb) -> None in the SessionRegistry storage class so the interpreter provides the three exception parameters correctly, and keep the body to await self.close() (returning None implicitly).
154-155:subscribeabstract method is still declareddefinstead ofasync def— unchanged from prior review.Both
InMemorySessionKVStore.subscribeandRedisSessionKVStore.subscribeare async generators; the ABC contract still doesn't match.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/storage.py` around lines 154 - 155, The abstract method subscribe in the SessionKVStore ABC is declared with a regular def but the concrete implementations InMemorySessionKVStore.subscribe and RedisSessionKVStore.subscribe are async generators; change the ABC declaration to "async def subscribe(self, channel: str) -> AsyncIterator[bytes]" and keep the `@abc.abstractmethod` decorator so the signature matches the concrete async implementations; update any type imports if needed to use typing.AsyncIterator.agents-core/vision_agents/core/agents/session_registry/redis_storage.py (1)
75-80:awaiton pipeline commands inmset— unchanged from prior review.Individual pipeline command calls should not be awaited; only
pipe.execute()should be awaited.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py` around lines 75 - 80, In mset, the pipeline command calls are being awaited incorrectly; inside async def mset (function mset) remove the await before pipe.set within the for loop so pipeline commands are queued (use pipe.set(self._prefixed(key), value, px=int(ttl * 1000)) without await), and keep only a single await on pipe.execute() to run the pipeline; reference the async with self._redis.pipeline() as pipe block and ensure _prefixed(key) is still used as the key.agents-core/vision_agents/core/agents/session_registry/in_memory_storage.py (1)
122-124:KeyErrorinsubscribefinalizer — unchanged from prior review.
self._subscribers[channel].remove(queue)will raiseKeyErrorifclose()already calledself._subscribers.clear().🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/in_memory_storage.py` around lines 122 - 124, The finalizer in the subscribe flow can raise KeyError when self._subscribers has been cleared; update the removal to first fetch the subscriber collection with self._subscribers.get(channel) (or check "if channel in self._subscribers") and only remove the queue if present, then delete the channel key if the collection is empty, or wrap the removal in a try/except KeyError to safely ignore races; target the finalizer inside the subscribe logic that references self._subscribers[channel].remove(queue) and the subsequent deletion so it no longer assumes the channel or queue always exists.tests/test_agents/test_session_registry/test_in_memory_store.py (1)
32-35:⚠️ Potential issue | 🟡 MinorHarden TTL assertions against scheduler jitter.
These expiry checks still use tight sleep margins and can intermittently fail on loaded runners. Increase the sleep-to-TTL ratio (≥2–3x) to make assertions deterministic.
♻️ Proposed stabilization
- await store.set("ephemeral", b"bye", ttl=0.5) - await asyncio.sleep(0.7) + await store.set("ephemeral", b"bye", ttl=0.2) + await asyncio.sleep(0.6) assert await store.get("ephemeral") is None @@ - await store.set("refresh_me", b"val", ttl=1.0) - await asyncio.sleep(0.5) - await store.expire("refresh_me", ttl=2.0) - await asyncio.sleep(1.0) + await store.set("refresh_me", b"val", ttl=0.3) + await asyncio.sleep(0.2) + await store.expire("refresh_me", ttl=0.8) + await asyncio.sleep(0.4) assert await store.get("refresh_me") == b"val" @@ - await store.mset([("m1", b"a", 1.0), ("m2", b"b", 1.0)]) - await asyncio.sleep(0.5) - await store.expire("m1", "m2", ttl=2.0) - await asyncio.sleep(1.0) + await store.mset([("m1", b"a", 0.3), ("m2", b"b", 0.3)]) + await asyncio.sleep(0.2) + await store.expire("m1", "m2", ttl=0.8) + await asyncio.sleep(0.4) assert await store.get("m1") == b"a" assert await store.get("m2") == b"b"Also applies to: 56-61, 66-72
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_session_registry/test_in_memory_store.py` around lines 32 - 35, Increase the sleep margins in the TTL-related tests to avoid flakiness: for tests that call InMemorySessionKVStore.set(..., ttl=...), e.g., test_ttl_expiry (and the other cases around lines 56-61 and 66-72), multiply the sleep duration to at least 2–3× the TTL (for example change asyncio.sleep(0.7) after ttl=0.5 to asyncio.sleep(1.1–1.5) or equivalent) so assertions using store.get("...") reliably observe expiry; update the sleeps wherever ttl-based expiry is asserted to use the higher ratio.tests/test_agents/test_session_registry/test_redis_store.py (1)
44-44:⚠️ Potential issue | 🟠 MajorMark Redis-dependent suites as integration tests.
These classes depend on Testcontainers/Redis and should be explicitly marked integration so they don’t run in unit-only jobs.
♻️ Proposed fix
+@pytest.mark.integration class TestRedisSessionKVStore: @@ +@pytest.mark.integration class TestSessionRegistryWithRedis:As per coding guidelines, "Mark integration tests with
@pytest.mark.integrationdecorator".Also applies to: 144-144
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_session_registry/test_redis_store.py` at line 44, The Redis-dependent test class TestRedisSessionKVStore should be marked as an integration test by adding the pytest.mark.integration decorator above the class definition; locate class TestRedisSessionKVStore and prepend `@pytest.mark.integration` (and do the same for the other Redis-dependent test class in this file) so these suites are excluded from unit-only jobs.agents-core/vision_agents/core/runner/http/api.py (1)
110-117:⚠️ Potential issue | 🔴 CriticalEnforce
call_idownership before close/read/metrics operations.These handlers check existence only, not that the session belongs to the path
call_id. That enables cross-call operations with a knownsession_id.🔒 Proposed fix
- info = await launcher.get_session_info(session_id) - if info is None: + info = await launcher.get_session_info(session_id) + if info is None or info.call_id != call_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session with id '{session_id}' not found", ) @@ - info = await launcher.get_session_info(session_id) - if info is None: + info = await launcher.get_session_info(session_id) + if info is None or info.call_id != call_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session with id '{session_id}' not found", ) @@ - info = await launcher.get_session_info(session_id) - if info is None: + info = await launcher.get_session_info(session_id) + if info is None or info.call_id != call_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session with id '{session_id}' not found", ) @@ - info = await launcher.get_session_info(session_id) - if info is None: + info = await launcher.get_session_info(session_id) + if info is None or info.call_id != call_id: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session with id '{session_id}' not found", )Also applies to: 137-144, 160-170, 187-202
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/runner/http/api.py` around lines 110 - 117, Handler code calls launcher.get_session_info(session_id) and only checks for existence; change each handler (the close/read/metrics HTTP handlers that call launcher.get_session_info and then launcher.request_close_session / launcher.request_read_session / launcher.request_session_metrics) to also verify that the returned session info belongs to the provided path call_id (e.g., compare info.call_id or info["call_id"] to the request path call_id) and raise HTTPException(status_code=404, detail="Session with id '...' not found") when it does not match; do this check before calling launcher.request_close_session/request_read_session/request_session_metrics so cross-call session access is prevented.
🧹 Nitpick comments (3)
agents-core/vision_agents/core/agents/session_registry/redis_storage.py (1)
54-64: Simplify ping —inspect.iscoroutinebranch is always true inredis.asyncio.In the
redis.asyncionamespace,Redis.ping()always returns a coroutine. The conditional dance exists to work around a Union return type in redis-py's stubs, but the sync branch is dead code. A directawait self._redis.ping()is cleaner and equally correct.♻️ Proposed simplification
async def start(self) -> None: """Open the Redis connection and verify it with a PING.""" - # Handle non-specific Union return type here - ping = self._redis.ping() - if inspect.iscoroutine(ping): - await ping + await self._redis.ping() connection_kwargs = self._redis.connection_pool.connection_kwargsIf
inspectis not used elsewhere, remove it from the imports too.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py` around lines 54 - 64, In RedisSessionKVStore.start(), remove the unnecessary inspect.iscoroutine check and simply await the coroutine returned by self._redis.ping() (i.e., replace the ping = ... / inspect.iscoroutine branch with await self._redis.ping()); also remove the unused inspect import if it isn’t used elsewhere so the module no longer contains dead code.agents-core/vision_agents/core/__init__.py (1)
25-29: SilentImportErrorswallowing may hide development-time bugs.Any
ImportErrorinsideredis_storage.pyitself (e.g., a missing sub-import) will be silently swallowed. Alogger.debugorlogger.warningcall in theexceptbranch would surface such issues during development without affecting production.♻️ Proposed refinement
try: from vision_agents.core.agents.session_registry import RedisSessionKVStore __all__ += ["RedisSessionKVStore"] except ImportError: - pass + import logging as _logging + _logging.getLogger(__name__).debug( + "RedisSessionKVStore not available (install redis[hiredis] to enable)" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/__init__.py` around lines 25 - 29, The except block currently swallows ImportError when importing RedisSessionKVStore; update the import in __init__.py to catch ImportError as e and call the module logger (e.g., logger.debug or logger.warning) to emit the exception details (including e or exc_info) while still allowing fallback behavior, referencing the symbol RedisSessionKVStore and the import line from vision_agents.core.agents.session_registry to locate the change.tests/test_agents/test_runner.py (1)
405-406: Add response-status assertions in call_id propagation tests.These tests currently verify callback invocation only. Add status assertions so they also fail on endpoint behavior regressions.
♻️ Proposed fix
- await client.delete(f"/calls/my-call-456/sessions/{session_id}") + resp = await client.delete(f"/calls/my-call-456/sessions/{session_id}") + assert resp.status_code == 202 assert received_call_ids == ["my-call-456"] @@ - await client.get(f"/calls/my-call-789/sessions/{session_id}") + resp = await client.get(f"/calls/my-call-789/sessions/{session_id}") + assert resp.status_code == 200 assert received_call_ids == ["my-call-789"] @@ - await client.get(f"/calls/my-call-abc/sessions/{session_id}/metrics") + resp = await client.get(f"/calls/my-call-abc/sessions/{session_id}/metrics") + assert resp.status_code == 200 assert received_call_ids == ["my-call-abc"]Also applies to: 428-429, 451-452
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_agents/test_runner.py` around lines 405 - 406, Add assertions that check the HTTP response status when calling the delete endpoint so the tests fail on endpoint regressions; specifically, capture the result of the call to client.delete(f"/calls/my-call-456/sessions/{session_id}") (and the two other similar calls at the other locations) into a variable like response and assert response.status_code == 200 before asserting on received_call_ids to ensure both callback invocation and endpoint status are validated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@agents-core/vision_agents/core/agents/agent_launcher.py`:
- Around line 493-499: Replace the broad "except Exception" in
_refresh_active_sessions with a specific registry/storage-backend exception (the
same specific exception used in _close_expired_sessions), e.g., catch
RegistryError or StorageBackendError thrown by self._registry.refresh; import
that exception symbol and handle only it (log with logger.exception to record
the exception), and let unexpected exceptions propagate (or re-raise) so they
aren't silently swallowed; reference symbols: _refresh_active_sessions,
self._registry.refresh, sessions_map, logger.exception, and the specific
exception type used in _close_expired_sessions.
- Around line 462-477: The broad except in _close_expired_sessions should be
replaced with a specific storage-related exception handler: change the catch
around the call to registry.get_close_requests to only catch your storage error
(e.g., redis.exceptions.RedisError) or a new domain-level StorageError that all
session-store implementations (like RedisSessionKVStore) raise; log the
exception with logger.exception when that storage error occurs and allow other
exceptions to propagate (or re-raise) so logical bugs aren’t suppressed. Ensure
registry.get_close_requests and the storage backend implementations raise the
chosen StorageError type (or the specific backend exception) so the except
targets a concrete error class instead of Exception.
- Around line 486-491: The loop over to_close and result currently logs
exceptions using logger.error(..., exc_info=r); replace that call with
logger.exception so the traceback is logged idiomatically — e.g., in the block
that checks isinstance(r, Exception) for each agent (refer to the
to_close/result loop and agent.agent_user.id), call logger.exception(f"Failed to
close agent with user_id {agent.agent_user.id}", exc_info=r) (or simply
logger.exception(f"Failed to close agent with user_id {agent.agent_user.id}") if
you ensure r is the active exception) to conform to logging guidelines.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`:
- Around line 82-89: The expire method is awaiting non-awaitable pipeline
queueing calls which causes a TypeError and prevents commands from being
buffered; in the expire function remove the spurious await when queuing pexpire
on the redis pipeline (i.e. change await pipe.pexpire(...) to pipe.pexpire(...))
so that the commands are enqueued and only executed when await pipe.execute() is
called; ensure you still call self._prefixed(key) for each key and keep the
final await pipe.execute() intact.
In `@agents-core/vision_agents/core/agents/session_registry/registry.py`:
- Around line 123-128: The current get method uses
SessionInfo(**json.loads(raw)) which will raise TypeError if stored JSON
contains extra fields; change both get and get_for_call to parse raw with
json.loads(raw), then filter the resulting dict to only keys present in
SessionInfo's defined fields (e.g., using SessionInfo.__dataclass_fields__ or
dataclasses.fields to build the allowed key set) before instantiating
SessionInfo, so unknown/newer fields are ignored and construction succeeds
during rolling deployments.
- Around line 40-46: Add async context manager support to SessionRegistry by
implementing __aenter__ and __aexit__: have __aenter__ await self.start() and
return self, and have __aexit__ await self.stop(); ensure __aexit__ accepts
(exc_type, exc, tb) and always attempts to stop (so resources are cleaned up
even on errors) and re-raise or return False per current semantics. Update the
class where start and stop are defined (SessionRegistry, methods start/stop) to
include these two new async dunder methods.
In `@tests/test_agents/test_agent_launcher.py`:
- Around line 71-72: The parameterized fixture storage currently runs both
"memory" and "redis" for every test; update the storage fixture to mark the
"redis" variant as an integration-only case so it only runs when integration
tests are requested. Concretely, change the parametrization in the storage
fixture to use pytest.param("redis", marks=pytest.mark.integration) for the
Redis variant (keeping "memory" unmarked), ensuring the fixture signature
(storage(request, redis_url) -> AsyncIterator[SessionKVStore]) and existing use
of request/redis_url remain intact.
---
Outside diff comments:
In `@agents-core/vision_agents/core/agents/agent_launcher.py`:
- Around line 306-328: The _finalizer currently fire-and-forgets the coroutine
created by
asyncio.get_running_loop().create_task(self._registry.remove(session_id_)) which
can produce unhandled-task exceptions; wrap that call so exceptions are
observed—either create a small async wrapper that awaits
self._registry.remove(session_id_) inside try/except and logs errors, or attach
a done callback to the created Task to catch and log exceptions; update the code
in _finalizer (referencing _finalizer, self._registry.remove, and the
create_task call) to ensure any exception from registry.remove is caught and
logged rather than left unhandled.
- Around line 186-190: Replace the broad "except Exception as exc" around "await
result" with a targeted handler for cancellation-related errors (e.g., "except
asyncio.CancelledError") and use logger.exception() to preserve the traceback;
if you truly need to catch everything (rare), use "except BaseException"
intentionally and document why. Update the block that currently calls
logger.error(...) (referencing "result" and the surrounding cancel_and_wait
logic) to call logger.exception(...) in the cancellation-specific except handler
so tracebacks are retained.
---
Duplicate comments:
In `@agents-core/vision_agents/core/agents/session_registry/in_memory_storage.py`:
- Around line 122-124: The finalizer in the subscribe flow can raise KeyError
when self._subscribers has been cleared; update the removal to first fetch the
subscriber collection with self._subscribers.get(channel) (or check "if channel
in self._subscribers") and only remove the queue if present, then delete the
channel key if the collection is empty, or wrap the removal in a try/except
KeyError to safely ignore races; target the finalizer inside the subscribe logic
that references self._subscribers[channel].remove(queue) and the subsequent
deletion so it no longer assumes the channel or queue always exists.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`:
- Around line 75-80: In mset, the pipeline command calls are being awaited
incorrectly; inside async def mset (function mset) remove the await before
pipe.set within the for loop so pipeline commands are queued (use
pipe.set(self._prefixed(key), value, px=int(ttl * 1000)) without await), and
keep only a single await on pipe.execute() to run the pipeline; reference the
async with self._redis.pipeline() as pipe block and ensure _prefixed(key) is
still used as the key.
In `@agents-core/vision_agents/core/agents/session_registry/storage.py`:
- Around line 45-46: The asynchronous context manager method __aexit__ currently
collapses exception args into *exc; change its signature to the standard async
def __aexit__(self, exc_type, exc_val, exc_tb) -> None in the SessionRegistry
storage class so the interpreter provides the three exception parameters
correctly, and keep the body to await self.close() (returning None implicitly).
- Around line 154-155: The abstract method subscribe in the SessionKVStore ABC
is declared with a regular def but the concrete implementations
InMemorySessionKVStore.subscribe and RedisSessionKVStore.subscribe are async
generators; change the ABC declaration to "async def subscribe(self, channel:
str) -> AsyncIterator[bytes]" and keep the `@abc.abstractmethod` decorator so the
signature matches the concrete async implementations; update any type imports if
needed to use typing.AsyncIterator.
In `@agents-core/vision_agents/core/runner/http/api.py`:
- Around line 110-117: Handler code calls launcher.get_session_info(session_id)
and only checks for existence; change each handler (the close/read/metrics HTTP
handlers that call launcher.get_session_info and then
launcher.request_close_session / launcher.request_read_session /
launcher.request_session_metrics) to also verify that the returned session info
belongs to the provided path call_id (e.g., compare info.call_id or
info["call_id"] to the request path call_id) and raise
HTTPException(status_code=404, detail="Session with id '...' not found") when it
does not match; do this check before calling
launcher.request_close_session/request_read_session/request_session_metrics so
cross-call session access is prevented.
In `@tests/test_agents/test_session_registry/test_in_memory_store.py`:
- Around line 32-35: Increase the sleep margins in the TTL-related tests to
avoid flakiness: for tests that call InMemorySessionKVStore.set(..., ttl=...),
e.g., test_ttl_expiry (and the other cases around lines 56-61 and 66-72),
multiply the sleep duration to at least 2–3× the TTL (for example change
asyncio.sleep(0.7) after ttl=0.5 to asyncio.sleep(1.1–1.5) or equivalent) so
assertions using store.get("...") reliably observe expiry; update the sleeps
wherever ttl-based expiry is asserted to use the higher ratio.
In `@tests/test_agents/test_session_registry/test_redis_store.py`:
- Line 44: The Redis-dependent test class TestRedisSessionKVStore should be
marked as an integration test by adding the pytest.mark.integration decorator
above the class definition; locate class TestRedisSessionKVStore and prepend
`@pytest.mark.integration` (and do the same for the other Redis-dependent test
class in this file) so these suites are excluded from unit-only jobs.
---
Nitpick comments:
In `@agents-core/vision_agents/core/__init__.py`:
- Around line 25-29: The except block currently swallows ImportError when
importing RedisSessionKVStore; update the import in __init__.py to catch
ImportError as e and call the module logger (e.g., logger.debug or
logger.warning) to emit the exception details (including e or exc_info) while
still allowing fallback behavior, referencing the symbol RedisSessionKVStore and
the import line from vision_agents.core.agents.session_registry to locate the
change.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`:
- Around line 54-64: In RedisSessionKVStore.start(), remove the unnecessary
inspect.iscoroutine check and simply await the coroutine returned by
self._redis.ping() (i.e., replace the ping = ... / inspect.iscoroutine branch
with await self._redis.ping()); also remove the unused inspect import if it
isn’t used elsewhere so the module no longer contains dead code.
In `@tests/test_agents/test_runner.py`:
- Around line 405-406: Add assertions that check the HTTP response status when
calling the delete endpoint so the tests fail on endpoint regressions;
specifically, capture the result of the call to
client.delete(f"/calls/my-call-456/sessions/{session_id}") (and the two other
similar calls at the other locations) into a variable like response and assert
response.status_code == 200 before asserting on received_call_ids to ensure both
callback invocation and endpoint status are validated.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (22)
agents-core/pyproject.tomlagents-core/vision_agents/core/__init__.pyagents-core/vision_agents/core/agents/__init__.pyagents-core/vision_agents/core/agents/agent_launcher.pyagents-core/vision_agents/core/agents/session_registry/__init__.pyagents-core/vision_agents/core/agents/session_registry/in_memory_storage.pyagents-core/vision_agents/core/agents/session_registry/redis_storage.pyagents-core/vision_agents/core/agents/session_registry/registry.pyagents-core/vision_agents/core/agents/session_registry/storage.pyagents-core/vision_agents/core/agents/session_registry/types.pyagents-core/vision_agents/core/runner/http/api.pyagents-core/vision_agents/core/runner/http/dependencies.pyagents-core/vision_agents/core/runner/http/models.pyagents-core/vision_agents/core/runner/http/options.pyagents-core/vision_agents/core/runner/runner.pyexamples/08_agent_server_example/README.mdpyproject.tomltests/test_agents/test_agent_launcher.pytests/test_agents/test_runner.pytests/test_agents/test_session_registry/__init__.pytests/test_agents/test_session_registry/test_in_memory_store.pytests/test_agents/test_session_registry/test_redis_store.py
💤 Files with no reviewable changes (2)
- agents-core/vision_agents/core/runner/runner.py
- agents-core/vision_agents/core/runner/http/models.py
🚧 Files skipped from review as they are similar to previous changes (5)
- agents-core/vision_agents/core/agents/session_registry/init.py
- agents-core/vision_agents/core/runner/http/options.py
- agents-core/vision_agents/core/agents/session_registry/types.py
- agents-core/pyproject.toml
- agents-core/vision_agents/core/agents/init.py
| try: | ||
| flagged = [ | ||
| session_ | ||
| for session_id in await self._registry.get_close_requests( | ||
| list(self._sessions.keys()) | ||
| ) | ||
| if (session_ := self._sessions.get(session_id)) is not None | ||
| and session_.agent not in to_close | ||
| ] | ||
| for session in flagged: | ||
| logger.info( | ||
| "Closing session %s due to registry close request", session.id | ||
| ) | ||
| to_close.append(session.agent) | ||
| except Exception: | ||
| logger.exception("Failed to check registry close requests") |
There was a problem hiding this comment.
Broad except Exception in _close_expired_sessions violates guideline.
This suppresses any error from get_close_requests including logical bugs. While some catch-all is reasonable for a maintenance loop, the guideline requires specific exception types.
🐛 Proposed fix
- except Exception:
+ except (OSError, RuntimeError):
logger.exception("Failed to check registry close requests")Use the appropriate storage-backend error type (e.g., redis.exceptions.RedisError when using RedisSessionKVStore) or define a domain-level StorageError base class that all backends raise.
As per coding guidelines, "Never write except Exception as e; catch specific exceptions instead".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 462 -
477, The broad except in _close_expired_sessions should be replaced with a
specific storage-related exception handler: change the catch around the call to
registry.get_close_requests to only catch your storage error (e.g.,
redis.exceptions.RedisError) or a new domain-level StorageError that all
session-store implementations (like RedisSessionKVStore) raise; log the
exception with logger.exception when that storage error occurs and allow other
exceptions to propagate (or re-raise) so logical bugs aren’t suppressed. Ensure
registry.get_close_requests and the storage backend implementations raise the
chosen StorageError type (or the specific backend exception) so the except
targets a concrete error class instead of Exception.
| for agent, r in zip(to_close, result): | ||
| if isinstance(r, Exception): | ||
| logger.error( | ||
| f"Failed to close agent with user_id {agent.agent_user.id}", | ||
| exc_info=r, | ||
| ) |
There was a problem hiding this comment.
Use logger.exception() instead of logger.error(..., exc_info=r).
When logging with a traceback, logger.exception() is the idiomatic and guideline-mandated form.
🐛 Proposed fix
for agent, r in zip(to_close, result):
if isinstance(r, Exception):
- logger.error(
- f"Failed to close agent with user_id {agent.agent_user.id}",
- exc_info=r,
- )
+ logger.exception(
+ "Failed to close agent with user_id %s",
+ agent.agent_user.id,
+ exc_info=r,
+ )As per coding guidelines, "Prefer logger.exception() when logging an error with a traceback instead of logger.error('Error: {exc}')".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for agent, r in zip(to_close, result): | |
| if isinstance(r, Exception): | |
| logger.error( | |
| f"Failed to close agent with user_id {agent.agent_user.id}", | |
| exc_info=r, | |
| ) | |
| for agent, r in zip(to_close, result): | |
| if isinstance(r, Exception): | |
| logger.exception( | |
| "Failed to close agent with user_id %s", | |
| agent.agent_user.id, | |
| exc_info=r, | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 486 -
491, The loop over to_close and result currently logs exceptions using
logger.error(..., exc_info=r); replace that call with logger.exception so the
traceback is logged idiomatically — e.g., in the block that checks isinstance(r,
Exception) for each agent (refer to the to_close/result loop and
agent.agent_user.id), call logger.exception(f"Failed to close agent with user_id
{agent.agent_user.id}", exc_info=r) (or simply logger.exception(f"Failed to
close agent with user_id {agent.agent_user.id}") if you ensure r is the active
exception) to conform to logging guidelines.
| async def _refresh_active_sessions(self) -> None: | ||
| """Send heartbeats for all active sessions to the registry.""" | ||
| try: | ||
| sessions_map = {sid: s.call_id for sid, s in self._sessions.items()} | ||
| await self._registry.refresh(sessions_map) | ||
| except Exception: | ||
| logger.exception("Registry heartbeat failed") |
There was a problem hiding this comment.
Broad except Exception in _refresh_active_sessions violates guideline.
Same concern as _close_expired_sessions. Use a specific storage-backend exception type.
🐛 Proposed fix
- except Exception:
+ except (OSError, RuntimeError):
logger.exception("Registry heartbeat failed")As per coding guidelines, "Never write except Exception as e; catch specific exceptions instead".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _refresh_active_sessions(self) -> None: | |
| """Send heartbeats for all active sessions to the registry.""" | |
| try: | |
| sessions_map = {sid: s.call_id for sid, s in self._sessions.items()} | |
| await self._registry.refresh(sessions_map) | |
| except Exception: | |
| logger.exception("Registry heartbeat failed") | |
| async def _refresh_active_sessions(self) -> None: | |
| """Send heartbeats for all active sessions to the registry.""" | |
| try: | |
| sessions_map = {sid: s.call_id for sid, s in self._sessions.items()} | |
| await self._registry.refresh(sessions_map) | |
| except (OSError, RuntimeError): | |
| logger.exception("Registry heartbeat failed") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/agent_launcher.py` around lines 493 -
499, Replace the broad "except Exception" in _refresh_active_sessions with a
specific registry/storage-backend exception (the same specific exception used in
_close_expired_sessions), e.g., catch RegistryError or StorageBackendError
thrown by self._registry.refresh; import that exception symbol and handle only
it (log with logger.exception to record the exception), and let unexpected
exceptions propagate (or re-raise) so they aren't silently swallowed; reference
symbols: _refresh_active_sessions, self._registry.refresh, sessions_map,
logger.exception, and the specific exception type used in
_close_expired_sessions.
| async def expire(self, *keys: str, ttl: float) -> None: | ||
| """Refresh TTL on one or more keys via a transactional PEXPIRE pipeline.""" | ||
| if not keys: | ||
| return | ||
| async with self._redis.pipeline() as pipe: | ||
| for key in keys: | ||
| await pipe.pexpire(self._prefixed(key), int(ttl * 1000)) | ||
| await pipe.execute() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n agents-core/vision_agents/core/agents/session_registry/redis_storage.py | sed -n '82,89p'Repository: GetStream/Vision-Agents
Length of output: 492
🏁 Script executed:
# Check the entire file context to understand the class and import patterns
wc -l agents-core/vision_agents/core/agents/session_registry/redis_storage.pyRepository: GetStream/Vision-Agents
Length of output: 142
🏁 Script executed:
# Look for similar pipeline patterns in the file to understand the existing code
rg "async with.*pipeline\(\)" agents-core/vision_agents/core/agents/session_registry/redis_storage.py -A 5Repository: GetStream/Vision-Agents
Length of output: 573
🌐 Web query:
redis-py asyncio Pipeline execute_command behavior documentation
💡 Result:
In redis-py asyncio, Pipeline.execute_command() does not always “queue” the command; it conditionally executes immediately during the WATCH phase.
- Default behavior (queue/stage):
Pipeline.execute_command()typically callspipeline_execute_command(), which appends(args, options)toself.command_stackand returnsselfso you can chain calls; the queued commands only run when you laterawait pipe.execute(). [1] - WATCH behavior (immediate I/O): If the pipeline is currently watching (or the command is
WATCH) and you have not started an explicit transaction viapipe.multi(), thenexecute_command()routes toimmediate_execute_command()and sends the command to Redis immediately (this is how redis-py supports reading values afterWATCHbut beforeMULTI/EXEC). [1] - Execution results:
execute()runs the buffered commands and returns results in order (a list), and in pipeline mode errors are typically captured and placed into the response list rather than stopping subsequent commands. [2] - Asyncio docs/examples confirm: commands in an asyncio pipeline “will not be reflected in Redis until
execute()is called & awaited” (outside the WATCH/immediate-execute exception above). [3]
Sources:
[1] redis-py asyncio Pipeline.execute_command implementation (redis.readthedocs.io module source). (redis.readthedocs.io)
[2] Redis/redis-py pipelines & transactions docs (execute() returns ordered list; errors captured). (redis.readthedocs.io)
[3] redis-py asyncio examples (pipeline buffers until awaited execute()). (redis.readthedocs.io)
Citations:
- 1: https://redis.readthedocs.io/en/v5.0.0/_modules/redis/asyncio/client.html
- 2: https://redis.readthedocs.io/en/latest/_modules/redis/asyncio/client.html
- 3: https://redis.readthedocs.io/en/v4.4.1/examples/asyncio_examples.html?utm_source=openai
Remove await from pipeline command in expire method — awaiting non-awaitable object will raise TypeError.
At line 88, await pipe.pexpire(...) is incorrect. In redis-py asyncio's Pipeline queuing mode, pexpire() returns the Pipeline instance (for chaining), not a coroutine. Awaiting it raises TypeError at runtime, preventing the command from being queued and causing the method to fail immediately. Remove the await keyword — commands are buffered by the pipeline and only executed when await pipe.execute() is called.
Proposed fix
async def expire(self, *keys: str, ttl: float) -> None:
"""Refresh TTL on one or more keys via a transactional PEXPIRE pipeline."""
if not keys:
return
async with self._redis.pipeline() as pipe:
for key in keys:
- await pipe.pexpire(self._prefixed(key), int(ttl * 1000))
+ pipe.pexpire(self._prefixed(key), int(ttl * 1000))
await pipe.execute()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/redis_storage.py`
around lines 82 - 89, The expire method is awaiting non-awaitable pipeline
queueing calls which causes a TypeError and prevents commands from being
buffered; in the expire function remove the spurious await when queuing pexpire
on the redis pipeline (i.e. change await pipe.pexpire(...) to pipe.pexpire(...))
so that the commands are enqueued and only executed when await pipe.execute() is
called; ensure you still call self._prefixed(key) for each key and keep the
final await pipe.execute() intact.
| async def start(self) -> None: | ||
| """Initialize the storage backend.""" | ||
| await self._store.start() | ||
|
|
||
| async def stop(self) -> None: | ||
| """Close the storage backend.""" | ||
| await self._store.close() |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Missing __aenter__/__aexit__ — guideline requires context manager support alongside start/stop.
SessionRegistry exposes start() and stop() but no async context manager protocol, preventing ergonomic async with SessionRegistry() as reg: usage.
♻️ Proposed addition
async def stop(self) -> None:
"""Close the storage backend."""
await self._store.close()
+
+ async def __aenter__(self) -> "SessionRegistry":
+ await self.start()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: object,
+ ) -> None:
+ await self.stop()As per coding guidelines, "Implement async-first lifecycle methods (start/stop) and support __aenter__/__aexit__ for context manager usage".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/registry.py` around
lines 40 - 46, Add async context manager support to SessionRegistry by
implementing __aenter__ and __aexit__: have __aenter__ await self.start() and
return self, and have __aexit__ await self.stop(); ensure __aexit__ accepts
(exc_type, exc, tb) and always attempts to stop (so resources are cleaned up
even on errors) and re-raise or return False per current semantics. Update the
class where start and stop are defined (SessionRegistry, methods start/stop) to
include these two new async dunder methods.
| async def get(self, session_id: str) -> SessionInfo | None: | ||
| """Look up a session by ID from shared storage.""" | ||
| raw = await self._store.get(f"sessions/{session_id}") | ||
| if raw is None: | ||
| return None | ||
| return SessionInfo(**json.loads(raw)) |
There was a problem hiding this comment.
SessionInfo(**json.loads(raw)) will raise TypeError on unexpected fields during rolling deployments.
If the stored JSON contains fields added by a newer node version that the current SessionInfo dataclass does not yet define, the ** unpack will fail. Consider filtering to known fields:
🛡️ Proposed fix
- return SessionInfo(**json.loads(raw))
+ from dataclasses import fields as dc_fields
+ data = json.loads(raw)
+ known = {f.name for f in dc_fields(SessionInfo)}
+ return SessionInfo(**{k: v for k, v in data.items() if k in known})Apply the same pattern to get_for_call at line 138.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def get(self, session_id: str) -> SessionInfo | None: | |
| """Look up a session by ID from shared storage.""" | |
| raw = await self._store.get(f"sessions/{session_id}") | |
| if raw is None: | |
| return None | |
| return SessionInfo(**json.loads(raw)) | |
| async def get(self, session_id: str) -> SessionInfo | None: | |
| """Look up a session by ID from shared storage.""" | |
| raw = await self._store.get(f"sessions/{session_id}") | |
| if raw is None: | |
| return None | |
| data = json.loads(raw) | |
| from dataclasses import fields as dc_fields | |
| known = {f.name for f in dc_fields(SessionInfo)} | |
| return SessionInfo(**{k: v for k, v in data.items() if k in known}) |
| async def get(self, session_id: str) -> SessionInfo | None: | |
| """Look up a session by ID from shared storage.""" | |
| raw = await self._store.get(f"sessions/{session_id}") | |
| if raw is None: | |
| return None | |
| return SessionInfo(**json.loads(raw)) | |
| async def get(self, session_id: str) -> SessionInfo | None: | |
| """Look up a session by ID from shared storage.""" | |
| raw = await self._store.get(f"sessions/{session_id}") | |
| if raw is None: | |
| return None | |
| data = json.loads(raw) | |
| known = {f.name for f in fields(SessionInfo)} | |
| return SessionInfo(**{k: v for k, v in data.items() if k in known}) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@agents-core/vision_agents/core/agents/session_registry/registry.py` around
lines 123 - 128, The current get method uses SessionInfo(**json.loads(raw))
which will raise TypeError if stored JSON contains extra fields; change both get
and get_for_call to parse raw with json.loads(raw), then filter the resulting
dict to only keys present in SessionInfo's defined fields (e.g., using
SessionInfo.__dataclass_fields__ or dataclasses.fields to build the allowed key
set) before instantiating SessionInfo, so unknown/newer fields are ignored and
construction succeeds during rolling deployments.
| @pytest.fixture(params=["memory", "redis"]) | ||
| async def storage(request, redis_url) -> AsyncIterator[SessionKVStore]: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's check if the file exists and read the fixture implementation
cat -n tests/test_agents/test_agent_launcher.py | head -100Repository: GetStream/Vision-Agents
Length of output: 3732
🏁 Script executed:
# Search for the redis_url fixture definition
rg "redis_url" tests/ -A 5 -B 2Repository: GetStream/Vision-Agents
Length of output: 3189
🏁 Script executed:
# Check for existing pytest.param usage with marks in the codebase
rg "pytest.param.*marks" --type pyRepository: GetStream/Vision-Agents
Length of output: 49
🏁 Script executed:
# Look for other integration-marked tests in the codebase
rg "@pytest.mark.integration" --type py -B 2 -A 2Repository: GetStream/Vision-Agents
Length of output: 50379
Mark the Redis fixture variant as integration.
The parameterized fixture currently runs both "memory" (in-process) and "redis" (Docker-backed) for every test. The Redis variant requires external infrastructure via RedisContainer() and should only run in integration test mode.
♻️ Proposed fix
-@pytest.fixture(params=["memory", "redis"])
+@pytest.fixture(
+ params=[
+ "memory",
+ pytest.param("redis", marks=pytest.mark.integration),
+ ]
+)
async def storage(request, redis_url) -> AsyncIterator[SessionKVStore]:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_agents/test_agent_launcher.py` around lines 71 - 72, The
parameterized fixture storage currently runs both "memory" and "redis" for every
test; update the storage fixture to mark the "redis" variant as an
integration-only case so it only runs when integration tests are requested.
Concretely, change the parametrization in the storage fixture to use
pytest.param("redis", marks=pytest.mark.integration) for the Redis variant
(keeping "memory" unmarked), ensuring the fixture signature (storage(request,
redis_url) -> AsyncIterator[SessionKVStore]) and existing use of
request/redis_url remain intact.
Summary by CodeRabbit
New Features
API Changes
Tests