diff --git a/src/uipath/runtime/__init__.py b/src/uipath/runtime/__init__.py index fb5b01d..0b076b1 100644 --- a/src/uipath/runtime/__init__.py +++ b/src/uipath/runtime/__init__.py @@ -1,6 +1,6 @@ """UiPath Runtime Package.""" -from uipath.runtime.base import UiPathBaseRuntime +from uipath.runtime.base import UiPathBaseRuntime, UiPathStreamNotSupportedError from uipath.runtime.context import UiPathRuntimeContext from uipath.runtime.events import UiPathRuntimeEvent from uipath.runtime.factory import UiPathRuntimeExecutor, UiPathRuntimeFactory @@ -10,6 +10,7 @@ UiPathResumeTrigger, UiPathResumeTriggerType, UiPathRuntimeResult, + UiPathRuntimeStatus, ) __all__ = [ @@ -18,9 +19,11 @@ "UiPathRuntimeFactory", "UiPathRuntimeExecutor", "UiPathRuntimeResult", + "UiPathRuntimeStatus", "UiPathRuntimeEvent", "UiPathBreakpointResult", "UiPathApiTrigger", "UiPathResumeTrigger", "UiPathResumeTriggerType", + "UiPathStreamNotSupportedError", ] diff --git a/src/uipath/runtime/base.py b/src/uipath/runtime/base.py index 3653f3d..b6d5b22 100644 --- a/src/uipath/runtime/base.py +++ b/src/uipath/runtime/base.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) -class UiPathRuntimeStreamNotSupportedError(NotImplementedError): +class UiPathStreamNotSupportedError(NotImplementedError): """Raised when a runtime does not support streaming.""" pass @@ -130,7 +130,7 @@ async def stream( Final yield: UiPathRuntimeResult (or its subclass UiPathBreakpointResult) Raises: - UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming + UiPathStreamNotSupportedError: If the runtime doesn't support streaming RuntimeError: If execution fails Example: @@ -146,7 +146,7 @@ async def stream( # Handle state update print(f"State updated by: {event.node_name}") """ - raise UiPathRuntimeStreamNotSupportedError( + raise UiPathStreamNotSupportedError( f"{self.__class__.__name__} does not implement streaming. " "Use execute() instead." ) diff --git a/src/uipath/runtime/debug/__init__.py b/src/uipath/runtime/debug/__init__.py new file mode 100644 index 0000000..98bf555 --- /dev/null +++ b/src/uipath/runtime/debug/__init__.py @@ -0,0 +1,13 @@ +"""Initialization module for the debug package.""" + +from uipath.runtime.debug.bridge import UiPathDebugBridge +from uipath.runtime.debug.exception import ( + UiPathDebugQuitError, +) +from uipath.runtime.debug.runtime import UiPathDebugRuntime + +__all__ = [ + "UiPathDebugQuitError", + "UiPathDebugBridge", + "UiPathDebugRuntime", +] diff --git a/src/uipath/runtime/debug/bridge.py b/src/uipath/runtime/debug/bridge.py new file mode 100644 index 0000000..79e52b5 --- /dev/null +++ b/src/uipath/runtime/debug/bridge.py @@ -0,0 +1,74 @@ +"""Abstract debug bridge interface.""" + +from abc import ABC, abstractmethod +from typing import Any, List, Literal + +from uipath.runtime import ( + UiPathBreakpointResult, + UiPathRuntimeResult, +) +from uipath.runtime.events import UiPathRuntimeStateEvent + + +class UiPathDebugBridge(ABC): + """Abstract interface for debug communication. + + Implementations: SignalR, Console, WebSocket, etc. + """ + + @abstractmethod + async def connect(self) -> None: + """Establish connection to debugger.""" + pass + + @abstractmethod + async def disconnect(self) -> None: + """Close connection to debugger.""" + pass + + @abstractmethod + async def emit_execution_started(self, **kwargs) -> None: + """Notify debugger that execution started.""" + pass + + @abstractmethod + async def emit_state_update(self, state_event: UiPathRuntimeStateEvent) -> None: + """Notify debugger of runtime state update.""" + pass + + @abstractmethod + async def emit_breakpoint_hit( + self, breakpoint_result: UiPathBreakpointResult + ) -> None: + """Notify debugger that a breakpoint was hit.""" + pass + + @abstractmethod + async def emit_execution_completed( + self, + runtime_result: UiPathRuntimeResult, + ) -> None: + """Notify debugger that execution completed.""" + pass + + @abstractmethod + async def emit_execution_error( + self, + error: str, + ) -> None: + """Notify debugger that an error occurred.""" + pass + + @abstractmethod + async def wait_for_resume(self) -> Any: + """Wait for resume command from debugger.""" + pass + + @abstractmethod + def get_breakpoints(self) -> List[str] | Literal["*"]: + """Get nodes to suspend execution at. + + Returns: + List of node names to suspend at, or ["*"] for all nodes (step mode) + """ + pass diff --git a/src/uipath/runtime/debug/exception.py b/src/uipath/runtime/debug/exception.py new file mode 100644 index 0000000..3183725 --- /dev/null +++ b/src/uipath/runtime/debug/exception.py @@ -0,0 +1,7 @@ +"""Debug exception definitions.""" + + +class UiPathDebugQuitError(Exception): + """Raised when user quits the debugger.""" + + pass diff --git a/src/uipath/runtime/debug/runtime.py b/src/uipath/runtime/debug/runtime.py new file mode 100644 index 0000000..ba4747d --- /dev/null +++ b/src/uipath/runtime/debug/runtime.py @@ -0,0 +1,130 @@ +"""Debug runtime implementation.""" + +import logging +from typing import Generic, TypeVar + +from uipath.runtime import ( + UiPathBaseRuntime, + UiPathBreakpointResult, + UiPathRuntimeContext, + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathStreamNotSupportedError, +) +from uipath.runtime.debug import UiPathDebugBridge, UiPathDebugQuitError +from uipath.runtime.events import ( + UiPathRuntimeStateEvent, +) + +logger = logging.getLogger(__name__) + +T = TypeVar("T", bound=UiPathBaseRuntime) + + +class UiPathDebugRuntime(UiPathBaseRuntime, Generic[T]): + """Specialized runtime for debug runs that streams events to a debug bridge.""" + + def __init__( + self, + context: UiPathRuntimeContext, + delegate: T, + debug_bridge: UiPathDebugBridge, + ): + """Initialize the UiPathDebugRuntime.""" + super().__init__(context) + self.context: UiPathRuntimeContext = context + self.delegate: T = delegate + self.debug_bridge: UiPathDebugBridge = debug_bridge + + async def execute(self) -> UiPathRuntimeResult: + """Execute the workflow with debug support.""" + try: + await self.debug_bridge.connect() + + await self.debug_bridge.emit_execution_started() + + result: UiPathRuntimeResult + # Try to stream events from inner runtime + try: + result = await self._stream_and_debug(self.delegate) + except UiPathStreamNotSupportedError: + # Fallback to regular execute if streaming not supported + logger.debug( + f"Runtime {self.delegate.__class__.__name__} does not support " + "streaming, falling back to execute()" + ) + result = await self.delegate.execute() + + await self.debug_bridge.emit_execution_completed(result) + + self.context.result = result + + return result + + except Exception as e: + # Emit execution error + self.context.result = UiPathRuntimeResult( + status=UiPathRuntimeStatus.FAULTED, + ) + await self.debug_bridge.emit_execution_error( + error=str(e), + ) + raise + + async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult: + """Stream events from inner runtime and handle debug interactions.""" + final_result: UiPathRuntimeResult + execution_completed = False + + # Starting in paused state - wait for breakpoints and resume + await self.debug_bridge.wait_for_resume() + + # Keep streaming until execution completes (not just paused at breakpoint) + while not execution_completed: + # Update breakpoints from debug bridge + inner_runtime.context.breakpoints = self.debug_bridge.get_breakpoints() + # Stream events from inner runtime + async for event in inner_runtime.stream(): + # Handle final result + if isinstance(event, UiPathRuntimeResult): + final_result = event + + # Check if it's a breakpoint result + if isinstance(event, UiPathBreakpointResult): + try: + # Hit a breakpoint - wait for resume and continue + await self.debug_bridge.emit_breakpoint_hit(event) + await self.debug_bridge.wait_for_resume() + + self.delegate.context.resume = True + + except UiPathDebugQuitError: + final_result = UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + execution_completed = True + else: + # Normal completion or suspension with dynamic interrupt + execution_completed = True + # Handle dynamic interrupts if present + # In the future, poll for resume trigger completion here, using the debug bridge + + # Handle state update events - send to debug bridge + elif isinstance(event, UiPathRuntimeStateEvent): + await self.debug_bridge.emit_state_update(event) + + return final_result + + async def validate(self) -> None: + """Validate runtime configuration.""" + await self.delegate.validate() + + async def cleanup(self) -> None: + """Cleanup runtime resources.""" + try: + await self.delegate.cleanup() + finally: + try: + await self.debug_bridge.disconnect() + except Exception as e: + logger.warning(f"Error disconnecting debug bridge: {e}") diff --git a/tests/test_debugger.py b/tests/test_debugger.py new file mode 100644 index 0000000..b3d1840 --- /dev/null +++ b/tests/test_debugger.py @@ -0,0 +1,352 @@ +"""Tests for UiPathDebugRuntime with mocked runtime and debug bridge.""" + +from __future__ import annotations + +from typing import List, Sequence, cast +from unittest.mock import AsyncMock, Mock + +import pytest + +from uipath.runtime import ( + UiPathBaseRuntime, + UiPathBreakpointResult, + UiPathRuntimeContext, + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathStreamNotSupportedError, +) +from uipath.runtime.debug import ( + UiPathDebugBridge, + UiPathDebugQuitError, + UiPathDebugRuntime, +) +from uipath.runtime.events import UiPathRuntimeStateEvent + + +def make_debug_bridge_mock() -> UiPathDebugBridge: + """Create a debug bridge mock with all methods that UiPathDebugRuntime uses. + + We use `spec=UiPathDebugBridge` so invalid attributes raise at runtime, + but still operate as a unittest.mock.Mock with AsyncMock methods. + """ + bridge_mock: Mock = Mock(spec=UiPathDebugBridge) + + bridge_mock.connect = AsyncMock() + bridge_mock.disconnect = AsyncMock() + bridge_mock.emit_execution_started = AsyncMock() + bridge_mock.emit_execution_completed = AsyncMock() + bridge_mock.emit_execution_error = AsyncMock() + bridge_mock.emit_breakpoint_hit = AsyncMock() + bridge_mock.emit_state_update = AsyncMock() + bridge_mock.wait_for_resume = AsyncMock() + + bridge_mock.get_breakpoints = Mock(return_value=["node-1"]) + + return cast(UiPathDebugBridge, bridge_mock) + + +class StreamingMockRuntime(UiPathBaseRuntime): + """Mock runtime that streams state events, breakpoint hits and a final result.""" + + def __init__( + self, + context: UiPathRuntimeContext, + node_sequence: Sequence[str], + *, + stream_unsupported: bool = False, + error_in_stream: bool = False, + ) -> None: + super().__init__(context) + self.node_sequence: List[str] = list(node_sequence) + self.stream_unsupported: bool = stream_unsupported + self.error_in_stream: bool = error_in_stream + + self.execute_called: bool = False + self.validate_called: bool = False + self.cleanup_called: bool = False + + async def validate(self) -> None: + self.validate_called = True + + async def cleanup(self) -> None: + self.cleanup_called = True + + async def execute(self) -> UiPathRuntimeResult: + """Fallback execute path (used when streaming is not supported).""" + self.execute_called = True + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"mode": "execute"}, + ) + + async def stream(self): + """Async generator yielding state events, breakpoint events, and final result.""" + if self.stream_unsupported: + raise UiPathStreamNotSupportedError("Streaming not supported") + + if self.error_in_stream: + raise RuntimeError("Stream blew up") + + for idx, node in enumerate(self.node_sequence): + # 1) Always emit a state update event + yield UiPathRuntimeStateEvent( + execution_id=self.context.execution_id, + node_name=node, + payload={"index": idx, "node": node}, + ) + + # 2) Check for breakpoints on this node + breakpoints = self.context.breakpoints + hit_breakpoint = False + + if breakpoints == "*": + hit_breakpoint = True + elif isinstance(breakpoints, list) and node in breakpoints: + hit_breakpoint = True + + if hit_breakpoint: + next_nodes = self.node_sequence[idx + 1 : idx + 2] # at most one + yield UiPathBreakpointResult( + breakpoint_node=node, + breakpoint_type="before", + next_nodes=next_nodes, + current_state={"node": node, "index": idx}, + ) + + # 3) Final result at the end of streaming + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"visited_nodes": self.node_sequence}, + ) + + +@pytest.mark.asyncio +async def test_debug_runtime_streams_and_handles_breakpoints_and_state(): + """UiPathDebugRuntime should stream events, handle breakpoints and state updates.""" + + context = UiPathRuntimeContext(execution_id="exec-stream") + + runtime_impl = StreamingMockRuntime( + context, + node_sequence=["node-1", "node-2", "node-3"], + ) + bridge = make_debug_bridge_mock() + + # Initial resume (before streaming) + resume after breakpoint hit + cast(AsyncMock, bridge.wait_for_resume).side_effect = [None, None] + cast(Mock, bridge.get_breakpoints).return_value = ["node-2"] + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + result = await debug_runtime.execute() + + # Result propagation + assert isinstance(result, UiPathRuntimeResult) + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"visited_nodes": ["node-1", "node-2", "node-3"]} + assert context.result is result + + # Bridge lifecycle + cast(AsyncMock, bridge.connect).assert_awaited_once() + cast(AsyncMock, bridge.emit_execution_started).assert_awaited_once() + cast(AsyncMock, bridge.emit_execution_completed).assert_awaited_once_with(result) + + # Streaming interactions + assert cast(AsyncMock, bridge.emit_state_update).await_count >= 1 + cast(AsyncMock, bridge.emit_breakpoint_hit).assert_awaited() + assert ( + cast(AsyncMock, bridge.wait_for_resume).await_count == 2 + ) # initial + after breakpoint + + # Breakpoints applied to inner runtime context + assert runtime_impl.context.breakpoints == ["node-2"] + # After resume, debug runtime should set resume flag + assert runtime_impl.context.resume is True + + +@pytest.mark.asyncio +async def test_debug_runtime_falls_back_when_stream_not_supported(): + """If runtime raises UiPathStreamNotSupportedError, we fall back to execute().""" + + context = UiPathRuntimeContext(execution_id="exec-fallback") + + runtime_impl = StreamingMockRuntime( + context, + node_sequence=["node-1"], + stream_unsupported=True, + ) + bridge = make_debug_bridge_mock() + + # Initial resume (even if streaming fails, debug runtime will still call it once) + cast(AsyncMock, bridge.wait_for_resume).return_value = None + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + result = await debug_runtime.execute() + + # Fallback to execute() path + assert runtime_impl.execute_called is True + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"mode": "execute"} + + # Bridge interactions + cast(AsyncMock, bridge.connect).assert_awaited_once() + cast(AsyncMock, bridge.emit_execution_started).assert_awaited_once() + cast(AsyncMock, bridge.emit_execution_completed).assert_awaited_once_with(result) + + # No streaming-specific events + cast(AsyncMock, bridge.emit_state_update).assert_not_awaited() + cast(AsyncMock, bridge.emit_breakpoint_hit).assert_not_awaited() + + +@pytest.mark.asyncio +async def test_debug_runtime_quit_creates_successful_result(): + """UiPathDebugRuntime should handle UiPathDebugQuitError and return SUCCESSFUL.""" + + context = UiPathRuntimeContext(execution_id="exec-quit") + + runtime_impl = StreamingMockRuntime( + context, + node_sequence=["node-quit"], + ) + bridge = make_debug_bridge_mock() + + # First resume: initial start; second resume: at breakpoint -> raises quit + cast(AsyncMock, bridge.wait_for_resume).side_effect = [ + None, + UiPathDebugQuitError("quit"), + ] + cast(Mock, bridge.get_breakpoints).return_value = ["node-quit"] + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + result = await debug_runtime.execute() + + # Quit result is synthesized as SUCCESSFUL (no specific output required) + assert isinstance(result, UiPathRuntimeResult) + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + + # emit_breakpoint_hit should have been called once + cast(AsyncMock, bridge.emit_breakpoint_hit).assert_awaited() + assert cast(AsyncMock, bridge.wait_for_resume).await_count == 2 + + # Completion event emitted with synthesized result + cast(AsyncMock, bridge.emit_execution_completed).assert_awaited_once_with(result) + + +@pytest.mark.asyncio +async def test_debug_runtime_execute_reports_errors_and_marks_faulted(): + """On unexpected errors, UiPathDebugRuntime should emit error and mark result FAULTED.""" + + context = UiPathRuntimeContext(execution_id="exec-error") + + # This runtime will raise an error as soon as stream() is used + runtime_impl = StreamingMockRuntime( + context, + node_sequence=["node-1"], + error_in_stream=True, + ) + bridge = make_debug_bridge_mock() + cast(AsyncMock, bridge.wait_for_resume).return_value = None + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + with pytest.raises(RuntimeError, match="Stream blew up"): + await debug_runtime.execute() + + # Context should be marked FAULTED + assert context.result is not None + assert context.result.status == UiPathRuntimeStatus.FAULTED + + # Error should be emitted to debug bridge + cast(AsyncMock, bridge.emit_execution_error).assert_awaited_once() + # Completion should not be emitted in error path + cast(AsyncMock, bridge.emit_execution_completed).assert_not_awaited() + + +@pytest.mark.asyncio +async def test_debug_runtime_cleanup_calls_inner_cleanup_and_disconnect(): + """cleanup() should call inner runtime cleanup and debug bridge disconnect.""" + + context = UiPathRuntimeContext(execution_id="exec-cleanup") + + runtime_impl = StreamingMockRuntime(context, node_sequence=["node-1"]) + bridge = make_debug_bridge_mock() + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + await debug_runtime.cleanup() + + assert runtime_impl.cleanup_called is True + cast(AsyncMock, bridge.disconnect).assert_awaited_once() + + +@pytest.mark.asyncio +async def test_debug_runtime_cleanup_suppresses_disconnect_errors(): + """Errors from debug_bridge.disconnect should be suppressed, inner cleanup still runs.""" + + context = UiPathRuntimeContext(execution_id="exec-cleanup-disconnect-error") + + runtime_impl = StreamingMockRuntime(context, node_sequence=["node-1"]) + bridge = make_debug_bridge_mock() + cast(AsyncMock, bridge.disconnect).side_effect = RuntimeError("disconnect failed") + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + # No exception should bubble up from cleanup() + await debug_runtime.cleanup() + + assert runtime_impl.cleanup_called is True + cast(AsyncMock, bridge.disconnect).assert_awaited_once() + + +@pytest.mark.asyncio +async def test_debug_runtime_cleanup_propagates_inner_cleanup_error_but_still_disconnects(): + """If inner runtime cleanup fails, the error bubbles up but disconnect is still attempted.""" + + context = UiPathRuntimeContext(execution_id="exec-cleanup-inner-error") + + runtime_impl = StreamingMockRuntime(context, node_sequence=["node-1"]) + bridge = make_debug_bridge_mock() + + async def failing_cleanup() -> None: + runtime_impl.cleanup_called = True + raise RuntimeError("inner cleanup failed") + + runtime_impl.cleanup = failing_cleanup # type: ignore[method-assign] + + debug_runtime = UiPathDebugRuntime( + context=context, + delegate=runtime_impl, + debug_bridge=bridge, + ) + + with pytest.raises(RuntimeError, match="inner cleanup failed"): + await debug_runtime.cleanup() + + assert runtime_impl.cleanup_called is True + cast(AsyncMock, bridge.disconnect).assert_awaited_once()