From b04d4617978fe1954f1b83084b2e072f8dc8ac5c Mon Sep 17 00:00:00 2001 From: Jack Brown Date: Wed, 15 Apr 2026 12:25:00 -0700 Subject: [PATCH 1/2] Send cancel message immediately on abort instead of deferring to next received message Previously, the cancel message was only sent to the server when the next incoming message arrived in the async for loop. If the agent was busy and not sending messages, the cancel sat unsent indefinitely. Now abort() puts the cancel message directly on the send queue so the transport sends it over WebSocket right away. --- .../src/runtimeuse_client/client.py | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/packages/runtimeuse-client-python/src/runtimeuse_client/client.py b/packages/runtimeuse-client-python/src/runtimeuse_client/client.py index 032129e..3934119 100644 --- a/packages/runtimeuse-client-python/src/runtimeuse_client/client.py +++ b/packages/runtimeuse-client-python/src/runtimeuse_client/client.py @@ -52,6 +52,7 @@ def __init__( raise ValueError("Either ws_url or transport must be provided") self._abort_event = asyncio.Event() + self._send_queue: asyncio.Queue[dict] | None = None def abort(self) -> None: """Signal the current query to cancel. @@ -61,6 +62,13 @@ def abort(self) -> None: coroutine on the same event loop. """ self._abort_event.set() + send_queue = self._send_queue + if send_queue is not None: + send_queue.put_nowait( + CancelMessage(message_type="cancel_message").model_dump( + mode="json" + ) + ) async def query( self, @@ -107,6 +115,7 @@ async def query( ) send_queue: asyncio.Queue[dict] = asyncio.Queue() + self._send_queue = send_queue await send_queue.put(invocation.model_dump(mode="json")) wire_result: ResultMessageInterface | None = None @@ -114,13 +123,6 @@ async def query( async with asyncio.timeout(options.timeout): async for message in self._transport(send_queue=send_queue): if self._abort_event.is_set(): - logger.info("Query cancelled by caller") - await send_queue.put( - CancelMessage(message_type="cancel_message").model_dump( - mode="json" - ) - ) - await send_queue.join() raise CancelledException("Query was cancelled") try: @@ -200,6 +202,11 @@ async def query( f"Received non-result message from agent runtime: {message}" ) + self._send_queue = None + + if self._abort_event.is_set(): + raise CancelledException("Query was cancelled") + if wire_result is None: raise AgentRuntimeError("No result message received") @@ -240,6 +247,7 @@ async def execute_commands( ) send_queue: asyncio.Queue[dict] = asyncio.Queue() + self._send_queue = send_queue await send_queue.put(message.model_dump(mode="json")) wire_result: CommandExecutionResultMessageInterface | None = None @@ -247,13 +255,6 @@ async def execute_commands( async with asyncio.timeout(options.timeout): async for msg in self._transport(send_queue=send_queue): if self._abort_event.is_set(): - logger.info("Command execution cancelled by caller") - await send_queue.put( - CancelMessage(message_type="cancel_message").model_dump( - mode="json" - ) - ) - await send_queue.join() raise CancelledException("Command execution was cancelled") try: @@ -331,6 +332,11 @@ async def execute_commands( f"Received non-result message from agent runtime: {msg}" ) + self._send_queue = None + + if self._abort_event.is_set(): + raise CancelledException("Command execution was cancelled") + if wire_result is None: raise AgentRuntimeError("No result message received") From 93107d4058707adad7445c1fd5e554f6f68d77d3 Mon Sep 17 00:00:00 2001 From: Jack Brown Date: Wed, 15 Apr 2026 13:22:33 -0700 Subject: [PATCH 2/2] Wrap transport loop in try/finally to always clean up _send_queue Ensures self._send_queue is set to None on all exit paths (CancelledException, AgentRuntimeError, TimeoutError), not just the normal exit path. --- .../src/runtimeuse_client/client.py | 280 +++++++++--------- 1 file changed, 141 insertions(+), 139 deletions(-) diff --git a/packages/runtimeuse-client-python/src/runtimeuse_client/client.py b/packages/runtimeuse-client-python/src/runtimeuse_client/client.py index 3934119..5e6e9e3 100644 --- a/packages/runtimeuse-client-python/src/runtimeuse_client/client.py +++ b/packages/runtimeuse-client-python/src/runtimeuse_client/client.py @@ -120,89 +120,90 @@ async def query( wire_result: ResultMessageInterface | None = None - async with asyncio.timeout(options.timeout): - async for message in self._transport(send_queue=send_queue): - if self._abort_event.is_set(): - raise CancelledException("Query was cancelled") - - try: - message_interface = AgentRuntimeMessageInterface.model_validate( - message - ) - except pydantic.ValidationError: - logger.error( - f"Received unknown message type from agent runtime: {message}" - ) - continue - - if message_interface.message_type == "result_message": - wire_result = ResultMessageInterface.model_validate(message) - logger.info( - f"Received result message from agent runtime: {message}" - ) - continue - - elif message_interface.message_type == "assistant_message": - if options.on_assistant_message is not None: - assistant_message_interface = ( - AssistantMessageInterface.model_validate(message) - ) - await options.on_assistant_message(assistant_message_interface) - continue + try: + async with asyncio.timeout(options.timeout): + async for message in self._transport(send_queue=send_queue): + if self._abort_event.is_set(): + raise CancelledException("Query was cancelled") - elif message_interface.message_type == "error_message": try: - error_message_interface = ErrorMessageInterface.model_validate( + message_interface = AgentRuntimeMessageInterface.model_validate( message ) except pydantic.ValidationError: logger.error( - f"Received malformed error message from agent runtime: {message}", + f"Received unknown message type from agent runtime: {message}" + ) + continue + + if message_interface.message_type == "result_message": + wire_result = ResultMessageInterface.model_validate(message) + logger.info( + f"Received result message from agent runtime: {message}" ) - raise AgentRuntimeError(str(message)) - logger.error( - f"Error from agent runtime: {error_message_interface}", - ) - raise AgentRuntimeError( - error_message_interface.error, - metadata=error_message_interface.metadata, - ) - - elif ( - message_interface.message_type == "artifact_upload_request_message" - ): - logger.info( - f"Received artifact upload request message from agent runtime: {message}" - ) - if options.on_artifact_upload_request is not None: - artifact_upload_request_message_interface = ( - ArtifactUploadRequestMessageInterface.model_validate( + continue + + elif message_interface.message_type == "assistant_message": + if options.on_assistant_message is not None: + assistant_message_interface = ( + AssistantMessageInterface.model_validate(message) + ) + await options.on_assistant_message(assistant_message_interface) + continue + + elif message_interface.message_type == "error_message": + try: + error_message_interface = ErrorMessageInterface.model_validate( message ) + except pydantic.ValidationError: + logger.error( + f"Received malformed error message from agent runtime: {message}", + ) + raise AgentRuntimeError(str(message)) + logger.error( + f"Error from agent runtime: {error_message_interface}", ) - upload_result = await options.on_artifact_upload_request( - artifact_upload_request_message_interface + raise AgentRuntimeError( + error_message_interface.error, + metadata=error_message_interface.metadata, ) - artifact_upload_response_message_interface = ArtifactUploadResponseMessageInterface( - message_type="artifact_upload_response_message", - filename=artifact_upload_request_message_interface.filename, - filepath=artifact_upload_request_message_interface.filepath, - presigned_url=upload_result.presigned_url, - content_type=upload_result.content_type, + + elif ( + message_interface.message_type == "artifact_upload_request_message" + ): + logger.info( + f"Received artifact upload request message from agent runtime: {message}" ) - await send_queue.put( - artifact_upload_response_message_interface.model_dump( - mode="json" + if options.on_artifact_upload_request is not None: + artifact_upload_request_message_interface = ( + ArtifactUploadRequestMessageInterface.model_validate( + message + ) ) - ) - continue - - else: - logger.info( - f"Received non-result message from agent runtime: {message}" - ) + upload_result = await options.on_artifact_upload_request( + artifact_upload_request_message_interface + ) + artifact_upload_response_message_interface = ArtifactUploadResponseMessageInterface( + message_type="artifact_upload_response_message", + filename=artifact_upload_request_message_interface.filename, + filepath=artifact_upload_request_message_interface.filepath, + presigned_url=upload_result.presigned_url, + content_type=upload_result.content_type, + ) + await send_queue.put( + artifact_upload_response_message_interface.model_dump( + mode="json" + ) + ) + continue - self._send_queue = None + else: + logger.info( + f"Received non-result message from agent runtime: {message}" + ) + finally: + self._send_queue = None if self._abort_event.is_set(): raise CancelledException("Query was cancelled") @@ -252,87 +253,88 @@ async def execute_commands( wire_result: CommandExecutionResultMessageInterface | None = None - async with asyncio.timeout(options.timeout): - async for msg in self._transport(send_queue=send_queue): - if self._abort_event.is_set(): - raise CancelledException("Command execution was cancelled") - - try: - message_interface = AgentRuntimeMessageInterface.model_validate(msg) - except pydantic.ValidationError: - logger.error( - f"Received unknown message type from agent runtime: {msg}" - ) - continue - - if message_interface.message_type == "command_execution_result_message": - wire_result = CommandExecutionResultMessageInterface.model_validate( - msg - ) - logger.info( - f"Received command execution result from agent runtime: {msg}" - ) - continue - - elif message_interface.message_type == "assistant_message": - if options.on_assistant_message is not None: - assistant_message_interface = ( - AssistantMessageInterface.model_validate(msg) - ) - await options.on_assistant_message(assistant_message_interface) - continue + try: + async with asyncio.timeout(options.timeout): + async for msg in self._transport(send_queue=send_queue): + if self._abort_event.is_set(): + raise CancelledException("Command execution was cancelled") - elif message_interface.message_type == "error_message": try: - error_message_interface = ErrorMessageInterface.model_validate( - msg - ) + message_interface = AgentRuntimeMessageInterface.model_validate(msg) except pydantic.ValidationError: logger.error( - f"Received malformed error message from agent runtime: {msg}", - ) - raise AgentRuntimeError(str(msg)) - logger.error( - f"Error from agent runtime: {error_message_interface}", - ) - raise AgentRuntimeError( - error_message_interface.error, - metadata=error_message_interface.metadata, - ) - - elif ( - message_interface.message_type == "artifact_upload_request_message" - ): - logger.info( - f"Received artifact upload request message from agent runtime: {msg}" - ) - if options.on_artifact_upload_request is not None: - artifact_upload_request_message_interface = ( - ArtifactUploadRequestMessageInterface.model_validate(msg) + f"Received unknown message type from agent runtime: {msg}" ) - upload_result = await options.on_artifact_upload_request( - artifact_upload_request_message_interface + continue + + if message_interface.message_type == "command_execution_result_message": + wire_result = CommandExecutionResultMessageInterface.model_validate( + msg ) - artifact_upload_response_message_interface = ArtifactUploadResponseMessageInterface( - message_type="artifact_upload_response_message", - filename=artifact_upload_request_message_interface.filename, - filepath=artifact_upload_request_message_interface.filepath, - presigned_url=upload_result.presigned_url, - content_type=upload_result.content_type, + logger.info( + f"Received command execution result from agent runtime: {msg}" ) - await send_queue.put( - artifact_upload_response_message_interface.model_dump( - mode="json" + continue + + elif message_interface.message_type == "assistant_message": + if options.on_assistant_message is not None: + assistant_message_interface = ( + AssistantMessageInterface.model_validate(msg) + ) + await options.on_assistant_message(assistant_message_interface) + continue + + elif message_interface.message_type == "error_message": + try: + error_message_interface = ErrorMessageInterface.model_validate( + msg + ) + except pydantic.ValidationError: + logger.error( + f"Received malformed error message from agent runtime: {msg}", ) + raise AgentRuntimeError(str(msg)) + logger.error( + f"Error from agent runtime: {error_message_interface}", + ) + raise AgentRuntimeError( + error_message_interface.error, + metadata=error_message_interface.metadata, ) - continue - else: - logger.info( - f"Received non-result message from agent runtime: {msg}" - ) + elif ( + message_interface.message_type == "artifact_upload_request_message" + ): + logger.info( + f"Received artifact upload request message from agent runtime: {msg}" + ) + if options.on_artifact_upload_request is not None: + artifact_upload_request_message_interface = ( + ArtifactUploadRequestMessageInterface.model_validate(msg) + ) + upload_result = await options.on_artifact_upload_request( + artifact_upload_request_message_interface + ) + artifact_upload_response_message_interface = ArtifactUploadResponseMessageInterface( + message_type="artifact_upload_response_message", + filename=artifact_upload_request_message_interface.filename, + filepath=artifact_upload_request_message_interface.filepath, + presigned_url=upload_result.presigned_url, + content_type=upload_result.content_type, + ) + await send_queue.put( + artifact_upload_response_message_interface.model_dump( + mode="json" + ) + ) + continue - self._send_queue = None + else: + logger.info( + f"Received non-result message from agent runtime: {msg}" + ) + finally: + self._send_queue = None if self._abort_event.is_set(): raise CancelledException("Command execution was cancelled")