Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/uipath/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""UiPath Runtime Package."""

from uipath.runtime.base import UiPathBaseRuntime
from uipath.runtime.base import UiPathBaseRuntime, UiPathStreamNotSupportedError
from uipath.runtime.context import UiPathRuntimeContext
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.factory import UiPathRuntimeExecutor, UiPathRuntimeFactory
Expand All @@ -10,6 +10,7 @@
UiPathResumeTrigger,
UiPathResumeTriggerType,
UiPathRuntimeResult,
UiPathRuntimeStatus,
)

__all__ = [
Expand All @@ -18,9 +19,11 @@
"UiPathRuntimeFactory",
"UiPathRuntimeExecutor",
"UiPathRuntimeResult",
"UiPathRuntimeStatus",
"UiPathRuntimeEvent",
"UiPathBreakpointResult",
"UiPathApiTrigger",
"UiPathResumeTrigger",
"UiPathResumeTriggerType",
"UiPathStreamNotSupportedError",
]
6 changes: 3 additions & 3 deletions src/uipath/runtime/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
logger = logging.getLogger(__name__)


class UiPathRuntimeStreamNotSupportedError(NotImplementedError):
class UiPathStreamNotSupportedError(NotImplementedError):
"""Raised when a runtime does not support streaming."""

pass
Expand Down Expand Up @@ -130,7 +130,7 @@ async def stream(
Final yield: UiPathRuntimeResult (or its subclass UiPathBreakpointResult)

Raises:
UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming
UiPathStreamNotSupportedError: If the runtime doesn't support streaming
RuntimeError: If execution fails

Example:
Expand All @@ -146,7 +146,7 @@ async def stream(
# Handle state update
print(f"State updated by: {event.node_name}")
"""
raise UiPathRuntimeStreamNotSupportedError(
raise UiPathStreamNotSupportedError(
f"{self.__class__.__name__} does not implement streaming. "
"Use execute() instead."
)
Expand Down
13 changes: 13 additions & 0 deletions src/uipath/runtime/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Initialization module for the debug package."""

from uipath.runtime.debug.bridge import UiPathDebugBridge
from uipath.runtime.debug.exception import (
UiPathDebugQuitError,
)
from uipath.runtime.debug.runtime import UiPathDebugRuntime

__all__ = [
"UiPathDebugQuitError",
"UiPathDebugBridge",
"UiPathDebugRuntime",
]
74 changes: 74 additions & 0 deletions src/uipath/runtime/debug/bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Abstract debug bridge interface."""

from abc import ABC, abstractmethod
from typing import Any, List, Literal

from uipath.runtime import (
UiPathBreakpointResult,
UiPathRuntimeResult,
)
from uipath.runtime.events import UiPathRuntimeStateEvent


class UiPathDebugBridge(ABC):
"""Abstract interface for debug communication.

Implementations: SignalR, Console, WebSocket, etc.
"""

@abstractmethod
async def connect(self) -> None:
"""Establish connection to debugger."""
pass

@abstractmethod
async def disconnect(self) -> None:
"""Close connection to debugger."""
pass

@abstractmethod
async def emit_execution_started(self, **kwargs) -> None:
"""Notify debugger that execution started."""
pass

@abstractmethod
async def emit_state_update(self, state_event: UiPathRuntimeStateEvent) -> None:
"""Notify debugger of runtime state update."""
pass

@abstractmethod
async def emit_breakpoint_hit(
self, breakpoint_result: UiPathBreakpointResult
) -> None:
"""Notify debugger that a breakpoint was hit."""
pass

@abstractmethod
async def emit_execution_completed(
self,
runtime_result: UiPathRuntimeResult,
) -> None:
"""Notify debugger that execution completed."""
pass

@abstractmethod
async def emit_execution_error(
self,
error: str,
) -> None:
"""Notify debugger that an error occurred."""
pass

@abstractmethod
async def wait_for_resume(self) -> Any:
"""Wait for resume command from debugger."""
pass

@abstractmethod
def get_breakpoints(self) -> List[str] | Literal["*"]:
"""Get nodes to suspend execution at.

Returns:
List of node names to suspend at, or ["*"] for all nodes (step mode)
"""
pass
7 changes: 7 additions & 0 deletions src/uipath/runtime/debug/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Debug exception definitions."""


class UiPathDebugQuitError(Exception):
"""Raised when user quits the debugger."""

pass
130 changes: 130 additions & 0 deletions src/uipath/runtime/debug/runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Debug runtime implementation."""

import logging
from typing import Generic, TypeVar

from uipath.runtime import (
UiPathBaseRuntime,
UiPathBreakpointResult,
UiPathRuntimeContext,
UiPathRuntimeResult,
UiPathRuntimeStatus,
UiPathStreamNotSupportedError,
)
from uipath.runtime.debug import UiPathDebugBridge, UiPathDebugQuitError
from uipath.runtime.events import (
UiPathRuntimeStateEvent,
)

logger = logging.getLogger(__name__)

T = TypeVar("T", bound=UiPathBaseRuntime)


class UiPathDebugRuntime(UiPathBaseRuntime, Generic[T]):
"""Specialized runtime for debug runs that streams events to a debug bridge."""

def __init__(
self,
context: UiPathRuntimeContext,
delegate: T,
debug_bridge: UiPathDebugBridge,
):
"""Initialize the UiPathDebugRuntime."""
super().__init__(context)
self.context: UiPathRuntimeContext = context
self.delegate: T = delegate
self.debug_bridge: UiPathDebugBridge = debug_bridge

async def execute(self) -> UiPathRuntimeResult:
"""Execute the workflow with debug support."""
try:
await self.debug_bridge.connect()

await self.debug_bridge.emit_execution_started()

result: UiPathRuntimeResult
# Try to stream events from inner runtime
try:
result = await self._stream_and_debug(self.delegate)
except UiPathStreamNotSupportedError:
# Fallback to regular execute if streaming not supported
logger.debug(
f"Runtime {self.delegate.__class__.__name__} does not support "
"streaming, falling back to execute()"
)
result = await self.delegate.execute()

await self.debug_bridge.emit_execution_completed(result)

self.context.result = result

return result

except Exception as e:
# Emit execution error
self.context.result = UiPathRuntimeResult(
status=UiPathRuntimeStatus.FAULTED,
)
await self.debug_bridge.emit_execution_error(
error=str(e),
)
raise

async def _stream_and_debug(self, inner_runtime: T) -> UiPathRuntimeResult:
"""Stream events from inner runtime and handle debug interactions."""
final_result: UiPathRuntimeResult
execution_completed = False

# Starting in paused state - wait for breakpoints and resume
await self.debug_bridge.wait_for_resume()

# Keep streaming until execution completes (not just paused at breakpoint)
while not execution_completed:
# Update breakpoints from debug bridge
inner_runtime.context.breakpoints = self.debug_bridge.get_breakpoints()
# Stream events from inner runtime
async for event in inner_runtime.stream():
# Handle final result
if isinstance(event, UiPathRuntimeResult):
final_result = event

# Check if it's a breakpoint result
if isinstance(event, UiPathBreakpointResult):
try:
# Hit a breakpoint - wait for resume and continue
await self.debug_bridge.emit_breakpoint_hit(event)
await self.debug_bridge.wait_for_resume()

self.delegate.context.resume = True

except UiPathDebugQuitError:
final_result = UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUCCESSFUL,
)
execution_completed = True
else:
# Normal completion or suspension with dynamic interrupt
execution_completed = True
# Handle dynamic interrupts if present
# In the future, poll for resume trigger completion here, using the debug bridge

# Handle state update events - send to debug bridge
elif isinstance(event, UiPathRuntimeStateEvent):
await self.debug_bridge.emit_state_update(event)

return final_result

async def validate(self) -> None:
"""Validate runtime configuration."""
await self.delegate.validate()

async def cleanup(self) -> None:
"""Cleanup runtime resources."""
try:
await self.delegate.cleanup()
finally:
try:
await self.debug_bridge.disconnect()
except Exception as e:
logger.warning(f"Error disconnecting debug bridge: {e}")
Loading