From 2bb353511558f8b178f99a24b1bdeae989dd1590 Mon Sep 17 00:00:00 2001 From: Srimon Date: Wed, 25 Mar 2026 02:40:45 +0530 Subject: [PATCH] Add config-driven agent governance --- .config.example.json | 83 +++++-- operator_use/agent/service.py | 78 +++++-- .../agent/tools/builtin/filesystem.py | 72 ++++-- .../agent/tools/builtin/local_agents.py | 27 ++- operator_use/agent/tools/builtin/patch.py | 55 ++--- operator_use/agent/tools/builtin/subagents.py | 26 ++- operator_use/agent/tools/builtin/terminal.py | 33 ++- operator_use/agent/tools/governance.py | 82 +++++++ operator_use/agent/tools/path_guard.py | 148 ++++++++++++ operator_use/agent/tools/registry.py | 25 +- operator_use/cli/setup.py | 64 ++++++ operator_use/cli/start.py | 42 +++- operator_use/config/__init__.py | 4 + operator_use/config/service.py | 17 ++ operator_use/subagent/manager.py | 29 ++- operator_use/subagent/service.py | 25 +- tests/test_agent.py | 96 +++++++- tests/test_config.py | 73 +++++- tests/test_governance_e2e.py | 217 ++++++++++++++++++ tests/test_local_agents.py | 27 +++ tests/test_tool_paths.py | 105 +++++++++ tests/test_tool_registry.py | 24 ++ 22 files changed, 1221 insertions(+), 131 deletions(-) create mode 100644 operator_use/agent/tools/governance.py create mode 100644 operator_use/agent/tools/path_guard.py create mode 100644 tests/test_governance_e2e.py create mode 100644 tests/test_tool_paths.py diff --git a/.config.example.json b/.config.example.json index a9af541..c926457 100644 --- a/.config.example.json +++ b/.config.example.json @@ -1,22 +1,72 @@ { - "agent": { - "streaming": false, - "llm": { - "provider": "nvidia", - "model": "nemotron-nano-12b-instruct-v1:1" + "agents": { + "defaults": { + "streaming": false, + "maxToolIterations": 40 }, - "stt": { - "enabled": true, - "provider": "sarvam", - "model": "saaras:v3" + "list": [ + { + "id": "operator", + "description": "Default operator agent", + "policy": "operator", + "llmConfig": { + "provider": "nvidia", + "model": "nemotron-nano-12b-instruct-v1:1" + }, + "browserUse": true, + "computerUse": false + }, + { + "id": "web_agent", + "description": "Web-focused helper agent", + "policy": "web_agent", + "llmConfig": { + "provider": "nvidia", + "model": "nemotron-nano-12b-instruct-v1:1" + }, + "browserUse": true, + "computerUse": false + }, + { + "id": "code_agent", + "description": "Code and terminal helper agent", + "policy": "code_agent", + "llmConfig": { + "provider": "nvidia", + "model": "nemotron-nano-12b-instruct-v1:1" + }, + "browserUse": false, + "computerUse": false + } + ] + }, + "stt": { + "enabled": true, + "provider": "sarvam", + "model": "saaras:v3" + }, + "tts": { + "enabled": true, + "provider": "sarvam", + "model": "bulbul:v3", + "voice": "kavya" + }, + "policies": { + "operator": { + "allowedTools": ["agents.*", "message.*", "channel.send"] }, - "tts": { - "enabled": true, - "provider": "sarvam", - "model": "bulbul:v3", - "voice": "kavya" + "web_agent": { + "allowedTools": ["web.*", "filesystem.read", "filesystem.list", "message.*"] + }, + "code_agent": { + "allowedTools": ["filesystem.*", "terminal.exec", "process.*", "message.*"] } }, + "governance": { + "protectCodebase": true, + "protectRuntimeConfig": true, + "protectedPaths": [] + }, "providers": { "nvidia": { "api_key": "API_KEY" @@ -26,6 +76,9 @@ }, "sarvam": { "api_key": "API_KEY" + }, + "deepseek": { + "apiKey": "API_KEY" } } -} \ No newline at end of file +} diff --git a/operator_use/agent/service.py b/operator_use/agent/service.py index 5a81fca..f9bec66 100644 --- a/operator_use/agent/service.py +++ b/operator_use/agent/service.py @@ -13,6 +13,7 @@ from operator_use.messages import AIMessage, HumanMessage, ImageMessage, ToolMessage from operator_use.agent.context import Context from operator_use.agent.tools import ToolRegistry, BUILTIN_TOOLS +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.bus import IncomingMessage from operator_use.providers.events import LLMEventType, LLMStreamEventType from operator_use.session import SessionStore, Session @@ -61,6 +62,8 @@ def __init__( exclude_tools: list | None = None, acp_registry: dict | None = None, plugins: "list[Plugin] | None" = None, + governance_profile: GovernanceProfile | None = None, + protected_paths: list[Path] | None = None, ): self.agent_id = agent_id self.description = description @@ -76,9 +79,16 @@ def __init__( self.cron = cron self.gateway = gateway self.bus = bus - self.subagent_store = SubagentManager(llm=llm, bus=bus) + self.protected_paths = [Path(path).expanduser().resolve() for path in (protected_paths or [])] + self.subagent_store = SubagentManager( + llm=llm, + bus=bus, + workspace=self.workspace, + protected_paths=self.protected_paths, + ) self.process_store = ProcessStore() self.hooks = Hooks() + self.governance_profile = governance_profile self.tool_register.register_tools(BUILTIN_TOOLS) if exclude_tools: @@ -95,6 +105,10 @@ def __init__( self.tool_register.set_extension("_llm", self.llm) self.tool_register.set_extension("_agent", self) self.tool_register.set_extension("_agent_id", self.agent_id) + if self.protected_paths: + self.tool_register.set_extension("_protected_paths", self.protected_paths) + if self.governance_profile is not None: + self.tool_register.set_extension("_governance_profile", self.governance_profile) # Wire plugins self.plugins: "list[Plugin]" = plugins or [] @@ -160,45 +174,59 @@ async def run( pending_replies: Shared dict for tools that wait for a user reply. """ # Set per-message tool extensions + delegated_profile = None if incoming: self.tool_register.set_extension("_channel", incoming.channel) self.tool_register.set_extension("_chat_id", incoming.chat_id) self.tool_register.set_extension("_account_id", incoming.account_id) self.tool_register.set_extension("_metadata", incoming.metadata or {}) self.tool_register.set_extension("_session_id", session_id) + delegated_profile = GovernanceProfile.from_dict( + (incoming.metadata or {}).get("_governance_profile") + ) + if delegated_profile is not None: + delegated_profile = delegated_profile.with_parent(self.governance_profile) + self.tool_register.set_extension("_governance_profile", delegated_profile) if pending_replies is not None: self.tool_register.set_extension("_pending_replies", pending_replies) - session = self.sessions.get_or_create(session_id=session_id) - session.add_message(message) - - await self.hooks.emit( - HookEvent.BEFORE_AGENT_START, - BeforeAgentStartContext(message=incoming, session=session), - ) + try: + session = self.sessions.get_or_create(session_id=session_id) + session.add_message(message) - if publish_stream is not None: - response_message = await self._loop_stream( - session=session, publish_stream=publish_stream, message=incoming + await self.hooks.emit( + HookEvent.BEFORE_AGENT_START, + BeforeAgentStartContext(message=incoming, session=session), ) - else: - response_message = await self._loop(session=session, message=incoming) - # Allow hooks to modify the final response before saving - end_ctx = await self.hooks.emit( - HookEvent.BEFORE_AGENT_END, - BeforeAgentEndContext(message=incoming, session=session, response=response_message), - ) - response_message = end_ctx.response + if publish_stream is not None: + response_message = await self._loop_stream( + session=session, publish_stream=publish_stream, message=incoming + ) + else: + response_message = await self._loop(session=session, message=incoming) - self.sessions.save(session) + # Allow hooks to modify the final response before saving + end_ctx = await self.hooks.emit( + HookEvent.BEFORE_AGENT_END, + BeforeAgentEndContext(message=incoming, session=session, response=response_message), + ) + response_message = end_ctx.response - await self.hooks.emit( - HookEvent.AFTER_AGENT_END, - AfterAgentEndContext(message=incoming, session=session, response=response_message), - ) + self.sessions.save(session) + + await self.hooks.emit( + HookEvent.AFTER_AGENT_END, + AfterAgentEndContext(message=incoming, session=session, response=response_message), + ) - return response_message + return response_message + finally: + if delegated_profile is not None: + if self.governance_profile is not None: + self.tool_register.set_extension("_governance_profile", self.governance_profile) + else: + self.tool_register.unset_extension("_governance_profile") # ------------------------------------------------------------------ # Reaction handling (no LLM call — update metadata only) diff --git a/operator_use/agent/tools/builtin/filesystem.py b/operator_use/agent/tools/builtin/filesystem.py index 5020a73..10e6f46 100644 --- a/operator_use/agent/tools/builtin/filesystem.py +++ b/operator_use/agent/tools/builtin/filesystem.py @@ -1,22 +1,39 @@ -import json -from operator_use.utils.helper import resolve,is_binary_file,ensure_directory -from operator_use.tools.service import Tool,ToolResult,MAX_TOOL_OUTPUT_LENGTH -from operator_use.paths import get_named_workspace_dir -from pydantic import BaseModel, Field, field_validator +import json from pathlib import Path +from pydantic import BaseModel, Field, field_validator + +from operator_use.agent.tools.path_guard import ( + PathAccessError, + ensure_allowed_directory, + ensure_allowed_path, + get_workspace_root, +) +from operator_use.tools.service import Tool,ToolResult,MAX_TOOL_OUTPUT_LENGTH +from operator_use.utils.helper import is_binary_file,ensure_directory + + def _get_workspace(**kwargs) -> Path: - return kwargs.get("_workspace") or get_named_workspace_dir("operator") + return get_workspace_root(**kwargs) + + +def _get_protected_paths(**kwargs) -> list[Path] | None: + return kwargs.get("_protected_paths") + class ReadFile(BaseModel): - path: str = Field(...,description="Absolute path or path relative to the codebase root. Use list_dir first if you're unsure where the file is.") + path: str = Field(...,description="Absolute path or path relative to the workspace root. Use list_dir first if you're unsure where the file is.") start_line: int | None = Field(default=None,description="1-based line to start reading from (inclusive). Use with end_line to read a specific section of a large file.",examples=[1]) end_line: int | None = Field(default=None,description="1-based line to stop reading at (inclusive). Use with start_line to avoid reading the whole file when you only need a section.",examples=[10]) @Tool(name="read_file",description="Read a text file and return its contents with line numbers (format: N | content). Use start_line/end_line to read a slice of a large file. Cannot read binary files — use terminal for those.",model=ReadFile) async def read_file(path: str, start_line: int | None = None, end_line: int | None = None, **kwargs) -> ToolResult: workspace = _get_workspace(**kwargs) - resolved_path=resolve(base=workspace,path=path) + protected_paths = _get_protected_paths(**kwargs) + try: + resolved_path = ensure_allowed_path(path, workspace=workspace, protected_paths=protected_paths) + except PathAccessError as e: + return ToolResult.error_result(str(e)) if not resolved_path.exists(): return ToolResult.error_result(f"File not found: {resolved_path}") @@ -42,7 +59,7 @@ async def read_file(path: str, start_line: int | None = None, end_line: int | No return ToolResult.success_result(content) class WriteFile(BaseModel): - path: str = Field(...,description="Absolute path or path relative to the codebase root. Parent directories are created automatically.") + path: str = Field(...,description="Absolute path or path relative to the workspace root. Parent directories are created automatically.") content: str = Field(...,description="Full content to write. This replaces the entire file — use edit_file to change only part of an existing file.") overwrite: bool = Field(default=True,description="Set to False to prevent accidentally overwriting an existing file. Raises an error if the file already exists.") empty: bool = Field(default=False,description="Set to True only when intentionally writing an empty file (e.g. a placeholder). Prevents accidental empty writes by default.") @@ -50,7 +67,11 @@ class WriteFile(BaseModel): @Tool(name="write_file",description="Create a new file or fully overwrite an existing one. Use for new files or complete rewrites. To change only part of a file, use edit_file instead. Parent directories are created automatically.",model=WriteFile) async def write_file(path: str, content: str, overwrite: bool = True, empty: bool = False, **kwargs) -> ToolResult: workspace = _get_workspace(**kwargs) - resolved_path=resolve(base=workspace,path=path) + protected_paths = _get_protected_paths(**kwargs) + try: + resolved_path = ensure_allowed_path(path, workspace=workspace, protected_paths=protected_paths) + except PathAccessError as e: + return ToolResult.error_result(str(e)) file_exists=resolved_path.exists() if file_exists and not overwrite: return ToolResult.error_result(f"File exists and overwrite=False: {resolved_path}") @@ -58,7 +79,9 @@ async def write_file(path: str, content: str, overwrite: bool = True, empty: boo return ToolResult.error_result("Content is empty. Set empty=True to allow writing empty files.") try: ensure_directory(resolved_path.parent) - resolved_path.write_text(content,encoding='utf-8') + tmp_path = resolved_path.with_suffix(resolved_path.suffix + ".tmp") + tmp_path.write_text(content,encoding='utf-8') + tmp_path.replace(resolved_path) except (OSError,IOError) as e: return ToolResult.error_result(f"Failed to write file: {resolved_path}. {e}") return ToolResult.success_result(f"{'Overwrote' if file_exists else 'Created'} file: {resolved_path}") @@ -69,7 +92,7 @@ class Edit(BaseModel): new_content: str = Field(...,description="The replacement text. Set to empty string to delete the chunk.") class EditFile(BaseModel): - path: str = Field(...,description="Absolute path or path relative to the codebase root.") + path: str = Field(...,description="Absolute path or path relative to the workspace root.") edits: list[Edit] = Field(...,description="One or more edits to apply in order. Each entry finds old_content and replaces it with new_content. Applied sequentially — one read, one write.") @field_validator("edits", mode="before") @@ -84,14 +107,17 @@ def coerce_edits(cls, v): @Tool(name="edit_file",description="Edit a file by replacing exact chunks of text. Pass one or more {old_content, new_content} pairs — applied in order on a single read/write. Set new_content to empty string to delete a chunk. Always read the file first to get the exact text. Use write_file for full rewrites.",model=EditFile) async def edit_file(path: str, edits: list[dict], **kwargs) -> ToolResult: - # Coerce edits if the LLM passed a JSON-encoded string instead of a list if isinstance(edits, str): try: edits = json.loads(edits) except json.JSONDecodeError as e: return ToolResult.error_result(f"edits must be a list, got invalid JSON string: {e}") workspace = _get_workspace(**kwargs) - resolved_path=resolve(base=workspace,path=path) + protected_paths = _get_protected_paths(**kwargs) + try: + resolved_path = ensure_allowed_path(path, workspace=workspace, protected_paths=protected_paths) + except PathAccessError as e: + return ToolResult.error_result(str(e)) if not resolved_path.exists(): return ToolResult.error_result(f"File not found: {resolved_path}") if not resolved_path.is_file(): @@ -116,7 +142,9 @@ async def edit_file(path: str, edits: list[dict], **kwargs) -> ToolResult: content = content.replace(old, new, 1) try: - resolved_path.write_text(content,encoding='utf-8') + tmp_path = resolved_path.with_suffix(resolved_path.suffix + ".tmp") + tmp_path.write_text(content,encoding='utf-8') + tmp_path.replace(resolved_path) except (OSError, IOError) as e: return ToolResult.error_result(f"Failed to write file: {resolved_path}. {e}") @@ -129,13 +157,11 @@ class ListDir(BaseModel): @Tool(name="list_dir",description="List files and subdirectories inside a directory. Directories are shown first, then files, both sorted alphabetically. Use this to explore the filesystem before reading or editing files.",model=ListDir) async def list_dir(path: str = '.', **kwargs) -> ToolResult: workspace = _get_workspace(**kwargs) - resolved_path=resolve(base=workspace,path=path) - - if not resolved_path.exists(): - return ToolResult.error_result(f"Directory not found: {resolved_path}") - - if not resolved_path.is_dir(): - return ToolResult.error_result(f"Path is not a directory: {resolved_path}") + protected_paths = _get_protected_paths(**kwargs) + try: + resolved_path = ensure_allowed_directory(path, workspace=workspace, protected_paths=protected_paths) + except PathAccessError as e: + return ToolResult.error_result(str(e)) try: items=sorted(resolved_path.iterdir(),key=lambda x: (not x.is_dir(),x.name.lower())) @@ -155,5 +181,3 @@ async def list_dir(path: str = '.', **kwargs) -> ToolResult: output="\n".join(lines) return ToolResult.success_result(output) - - diff --git a/operator_use/agent/tools/builtin/local_agents.py b/operator_use/agent/tools/builtin/local_agents.py index 4368e61..31ce22e 100644 --- a/operator_use/agent/tools/builtin/local_agents.py +++ b/operator_use/agent/tools/builtin/local_agents.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.bus.views import IncomingMessage, TextPart from operator_use.messages import HumanMessage from operator_use.tools import Tool, ToolResult @@ -26,22 +27,20 @@ class LocalAgents(BaseModel): default=None, description="Delegated task for the target local agent (required for action='run').", ) - - -def _agent_capabilities(agent) -> str: - caps: list[str] = [] - if agent.get_plugin("browser_use") is not None: - caps.append("browser") - if agent.get_plugin("computer_use") is not None: - caps.append("computer") - return ", ".join(caps) if caps else "general" + allowed_tools: list[str] | None = Field( + default=None, + description=( + "Optional tool patterns to enforce on the delegated run, such as ['web.*'] or " + "['filesystem.read']." + ), + ) @Tool( name="localagents", description=( "Call other configured local Operator agents in-process. " - "Useful for a manager agent coordinating specialized agents on one request." + "Useful when one configured agent needs to coordinate another on one request." ), model=LocalAgents, ) @@ -49,6 +48,7 @@ async def localagents( action: str, name: str | None = None, task: str | None = None, + allowed_tools: list[str] | None = None, **kwargs, ) -> ToolResult: registry: dict = kwargs.get("_agent_registry") or {} @@ -64,10 +64,7 @@ async def localagents( for agent_id, agent in registry.items(): marker = " (current)" if agent_id == current_agent_id else "" description = getattr(agent, "description", "") or "No description provided." - lines.append( - f" • {agent_id}{marker} — {description} " - f"[capabilities: {_agent_capabilities(agent)}]" - ) + lines.append(f" • {agent_id}{marker} — {description}") return ToolResult.success_result("\n".join(lines)) if action != "run": @@ -87,6 +84,7 @@ async def localagents( return ToolResult.error_result("Refusing to delegate to the current agent. Choose a different local agent.") delegated_session_id = f"{parent_session_id}__delegate__{current_agent_id or 'agent'}-to-{name}" + delegated_profile = GovernanceProfile(allowed_tools=list(allowed_tools)) if allowed_tools else None incoming = IncomingMessage( channel="direct", chat_id=delegated_session_id, @@ -97,6 +95,7 @@ async def localagents( "_delegated_local_agent_call": True, "from_agent": current_agent_id, "to_agent": name, + "_governance_profile": delegated_profile.to_dict() if delegated_profile else None, }, ) diff --git a/operator_use/agent/tools/builtin/patch.py b/operator_use/agent/tools/builtin/patch.py index ef57787..ab98509 100644 --- a/operator_use/agent/tools/builtin/patch.py +++ b/operator_use/agent/tools/builtin/patch.py @@ -1,12 +1,18 @@ -"""Patch tool: apply unified diffs to files using difflib for fuzzy matching.""" +"""Patch tool: apply unified diffs to files using difflib for fuzzy matching.""" import difflib import re -from operator_use.utils.helper import resolve, is_binary_file -from operator_use.tools.service import Tool, ToolResult from pydantic import BaseModel, Field +from operator_use.agent.tools.path_guard import ( + PathAccessError, + ensure_allowed_path, + get_workspace_root, +) +from operator_use.tools.service import Tool, ToolResult +from operator_use.utils.helper import is_binary_file + def _parse_unified_diff(patch_content: str) -> list[tuple[int, int, list[str]]]: """ @@ -17,7 +23,6 @@ def _parse_unified_diff(patch_content: str) -> list[tuple[int, int, list[str]]]: lines = patch_content.splitlines() i = 0 - # Skip header until we find a hunk while i < len(lines): line = lines[i] if line.startswith("@@"): @@ -30,7 +35,6 @@ def _parse_unified_diff(patch_content: str) -> list[tuple[int, int, list[str]]]: i += 1 continue - # Parse hunk header: @@ -old_start,old_count +new_start,new_count @@ match = re.match(r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line) if not match: i += 1 @@ -38,7 +42,6 @@ def _parse_unified_diff(patch_content: str) -> list[tuple[int, int, list[str]]]: old_start = int(match.group(1)) old_count = int(match.group(2) or 1) - # new_start, new_count unused for application i += 1 hunk_lines: list[str] = [] @@ -58,29 +61,22 @@ def _parse_unified_diff(patch_content: str) -> list[tuple[int, int, list[str]]]: def _extract_old_context(hunk_lines: list[str]) -> list[str]: """Extract the 'old' lines (context and removals) from a hunk for matching.""" return [ - line[1:] # strip leading ' ' or '-' + line[1:] for line in hunk_lines if line.startswith((" ", "-")) ] def _apply_hunk(old_lines: list[str], hunk_lines: list[str], old_start: int, old_count: int) -> list[str]: - """ - Apply a single hunk to old_lines. Returns the new lines. - old_start is 1-based, old_count is number of context/removal lines. - """ - # Convert to 0-based index + """Apply a single hunk to old_lines. Returns the new lines.""" start_idx = old_start - 1 end_idx = start_idx + old_count - # Build the replacement from the hunk (context and additions only) replacement: list[str] = [] for line in hunk_lines: - content = line[1:] + "\n" # splitlines() strips newlines + content = line[1:] + "\n" if line.startswith(" "): replacement.append(content) - elif line.startswith("-"): - pass # removal elif line.startswith("+"): replacement.append(content) @@ -88,16 +84,11 @@ def _apply_hunk(old_lines: list[str], hunk_lines: list[str], old_start: int, old def _find_hunk_position(lines: list[str], hunk_lines: list[str], old_start: int, old_count: int) -> int | None: - """ - Find the best position to apply the hunk. Uses difflib.SequenceMatcher for fuzzy matching. - Returns 0-based start index or None if no suitable match. - """ + """Find the best position to apply the hunk using fuzzy matching.""" old_context = _extract_old_context(hunk_lines) if not old_context: - # Pure addition hunk - use old_start return max(0, old_start - 1) - # Try exact match first start_idx = old_start - 1 end_idx = start_idx + len(old_context) if end_idx <= len(lines): @@ -105,14 +96,12 @@ def _find_hunk_position(lines: list[str], hunk_lines: list[str], old_start: int, if candidate == old_context: return start_idx - # Fuzzy match using SequenceMatcher text = "\n".join(line.rstrip("\n\r") for line in lines) pattern = "\n".join(old_context) matcher = difflib.SequenceMatcher(None, text, pattern) match = matcher.find_longest_match(0, len(text), 0, len(pattern)) if match.size > 0: - # Find which line this corresponds to before_match = text[: match.a] line_num = before_match.count("\n") return line_num @@ -121,10 +110,7 @@ def _find_hunk_position(lines: list[str], hunk_lines: list[str], old_start: int, def apply_patch_to_text(original: str, patch_content: str) -> tuple[str, list[str]]: - """ - Apply a unified diff to original text. - Returns (patched_text, list of error/warning messages). - """ + """Apply a unified diff to original text.""" work_lines = original.splitlines(keepends=True) if not work_lines: work_lines = [""] @@ -161,7 +147,7 @@ def create_unified_diff(old_lines: list[str], new_lines: list[str], fromfile: st class PatchFile(BaseModel): - path: str = Field(..., description="Absolute path or path relative to the codebase root.") + path: str = Field(..., description="Absolute path or path relative to the workspace root.") patch: str = Field(..., description="Unified diff in standard format (--- a/file, +++ b/file, @@ hunks). Generate with 'diff -u old new' or difflib.unified_diff(). Context lines (unchanged lines around edits) are required for fuzzy matching to work.") @@ -171,7 +157,12 @@ class PatchFile(BaseModel): model=PatchFile, ) async def patch_file(path: str, patch: str, **kwargs) -> ToolResult: - resolved_path = resolve(base="./workspace", path=path) + workspace = get_workspace_root(**kwargs) + protected_paths = kwargs.get("_protected_paths") + try: + resolved_path = ensure_allowed_path(path, workspace=workspace, protected_paths=protected_paths) + except PathAccessError as e: + return ToolResult.error_result(str(e)) if not resolved_path.exists(): return ToolResult.error_result(f"File not found: {resolved_path}") @@ -196,7 +187,9 @@ async def patch_file(path: str, patch: str, **kwargs) -> ToolResult: return ToolResult.error_result("Patch application failed:\n" + "\n".join(errors)) try: - resolved_path.write_text(patched, encoding="utf-8") + tmp_path = resolved_path.with_suffix(resolved_path.suffix + ".tmp") + tmp_path.write_text(patched, encoding="utf-8") + tmp_path.replace(resolved_path) except (OSError, IOError) as e: return ToolResult.error_result(f"Failed to write file: {resolved_path}. {e}") diff --git a/operator_use/agent/tools/builtin/subagents.py b/operator_use/agent/tools/builtin/subagents.py index ec40195..7be543f 100644 --- a/operator_use/agent/tools/builtin/subagents.py +++ b/operator_use/agent/tools/builtin/subagents.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, Field +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.tools import Tool, ToolResult @@ -24,6 +25,13 @@ class Subagents(BaseModel): default=None, description="Short label for this subagent, e.g. 'research', 'file scan' (create action).", ) + allowed_tools: list[str] | None = Field( + default=None, + description=( + "Optional tool patterns for the subagent, such as ['web.*'] or ['filesystem.read']. " + "If omitted, it inherits the current agent's governance profile." + ), + ) task_id: Optional[str] = Field( default=None, description="Subagent task_id to cancel (cancel action).", @@ -56,6 +64,7 @@ async def subagents( action: str, task: str | None = None, label: str | None = None, + allowed_tools: list[str] | None = None, task_id: str | None = None, **kwargs, ) -> ToolResult: @@ -63,6 +72,7 @@ async def subagents( channel = kwargs.get("_channel") chat_id = kwargs.get("_chat_id") account_id = kwargs.get("_account_id", "") + parent_profile = kwargs.get("_governance_profile") if not subagent_store: return ToolResult.error_result("Subagent store not available (internal error)") @@ -74,7 +84,21 @@ async def subagents( return ToolResult.error_result("Provide task description to create a subagent") if channel is None or chat_id is None: return ToolResult.error_result("Channel context not available (internal error)") - tid = await subagent_store.ainvoke(task, label, channel, chat_id, account_id) + governance_profile = ( + parent_profile.scoped(allowed_tools) + if isinstance(parent_profile, GovernanceProfile) + else GovernanceProfile(allowed_tools=allowed_tools or []) + if allowed_tools + else None + ) + tid = await subagent_store.ainvoke( + task, + label, + channel, + chat_id, + account_id, + governance_profile=governance_profile, + ) display = label or task[:60] return ToolResult.success_result( f"Subagent created (task_id={tid} label='{display}')\n" diff --git a/operator_use/agent/tools/builtin/terminal.py b/operator_use/agent/tools/builtin/terminal.py index 3d67cb6..de4936a 100644 --- a/operator_use/agent/tools/builtin/terminal.py +++ b/operator_use/agent/tools/builtin/terminal.py @@ -1,10 +1,13 @@ -from operator_use.tools import Tool,ToolResult,MAX_TOOL_OUTPUT_LENGTH -from pydantic import BaseModel, Field -from pathlib import Path import asyncio -import sys import os import signal +import sys +from pathlib import Path + +from pydantic import BaseModel, Field + +from operator_use.agent.tools.path_guard import PathAccessError, validate_terminal_command +from operator_use.tools import Tool,ToolResult,MAX_TOOL_OUTPUT_LENGTH BLOCKED_COMMANDS={ "rm -rf /", @@ -23,7 +26,13 @@ "halt", "poweroff", "init 0", - "init 6" + "init 6", + ">>", + "<<", + "|&", + "$(", + "`", + "nohup", } class Terminal(BaseModel): @@ -39,12 +48,19 @@ def _is_command_blocked(cmd: str) -> str | None: return None -@Tool(name="terminal",description="Run a shell command and return stdout, stderr, and exit code. Use for git, package installs, running scripts, checking processes, or any CLI task. Commands run from the codebase root. Destructive commands (rm -rf /, format, shutdown, etc.) are blocked. For long outputs, results are truncated — pipe through head/tail if needed.", model=Terminal) +@Tool(name="terminal",description="Run a shell command and return stdout, stderr, and exit code. Use for git, package installs, running scripts, checking processes, or any CLI task. Commands run from the agent workspace. Destructive commands and workspace escapes are blocked. For long outputs, results are truncated — pipe through head/tail if needed.", model=Terminal) async def terminal(cmd: str, timeout: int = 10, **kwargs) -> str: blocked = _is_command_blocked(cmd) if blocked: return ToolResult.error_result(f"Command blocked: contains forbidden pattern '{blocked}'") + workspace = Path(kwargs.get("_workspace") or Path.cwd()).expanduser().resolve() + protected_paths = kwargs.get("_protected_paths") + try: + cwd = validate_terminal_command(cmd, workspace=workspace, protected_paths=protected_paths) + except PathAccessError as e: + return ToolResult.error_result(str(e)) + env=os.environ.copy() if sys.platform == "win32": @@ -52,11 +68,9 @@ async def terminal(cmd: str, timeout: int = 10, **kwargs) -> str: else: shell_cmd = ["/bin/bash", "-c", cmd] - workspace = Path.cwd() - cwd = str(workspace) if workspace.exists() else str(Path.cwd()) process = await asyncio.create_subprocess_exec( *shell_cmd, - cwd=cwd, + cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, @@ -97,5 +111,6 @@ async def terminal(cmd: str, timeout: int = 10, **kwargs) -> str: "exit_code": exit_code, "stdout": stdout, "stderr": stderr, + "cwd": str(cwd), } ) diff --git a/operator_use/agent/tools/governance.py b/operator_use/agent/tools/governance.py new file mode 100644 index 0000000..3390fec --- /dev/null +++ b/operator_use/agent/tools/governance.py @@ -0,0 +1,82 @@ +"""Minimal governance helpers for tool execution.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from fnmatch import fnmatch +from typing import Any + + +TOOL_SCOPE_ALIASES = { + "read_file": "filesystem.read", + "write_file": "filesystem.write", + "edit_file": "filesystem.edit", + "list_dir": "filesystem.list", + "patch_file": "filesystem.patch", + "web_search": "web.search", + "web_fetch": "web.fetch", + "terminal": "terminal.exec", + "cron": "cron.manage", + "process": "process.manage", + "control_center": "control.manage", + "subagents": "agents.subagents", + "localagents": "agents.local", + "acpagents": "agents.acp", + "channel": "channel.send", + "intermediate_message": "message.intermediate", + "react_message": "message.react", + "send_file": "message.file", +} + + +@dataclass(slots=True) +class GovernanceProfile: + """Defines which tools an agent is allowed to execute.""" + + allowed_tools: list[str] = field(default_factory=list) + parent: "GovernanceProfile | None" = None + + def allows(self, tool_name: str) -> bool: + if self.allowed_tools and not any( + fnmatch(identifier, pattern) + for pattern in self.allowed_tools + for identifier in self.identifiers_for(tool_name) + ): + return False + if self.parent is not None: + return self.parent.allows(tool_name) + return True + + def scoped(self, allowed_tools: list[str] | None = None) -> "GovernanceProfile": + if not allowed_tools: + return self + return GovernanceProfile(allowed_tools=list(allowed_tools), parent=self) + + def with_parent(self, parent: "GovernanceProfile | None") -> "GovernanceProfile": + if parent is None: + return self + return GovernanceProfile(allowed_tools=list(self.allowed_tools), parent=parent) + + def to_dict(self) -> dict[str, Any]: + data: dict[str, Any] = {"allowed_tools": list(self.allowed_tools)} + if self.parent is not None: + data["parent"] = self.parent.to_dict() + return data + + @classmethod + def from_dict(cls, data: dict[str, Any] | None) -> "GovernanceProfile | None": + if not data: + return None + parent = cls.from_dict(data.get("parent")) + return cls( + allowed_tools=list(data.get("allowed_tools") or []), + parent=parent, + ) + + @staticmethod + def identifiers_for(tool_name: str) -> set[str]: + identifiers = {tool_name} + alias = TOOL_SCOPE_ALIASES.get(tool_name) + if alias: + identifiers.add(alias) + return identifiers diff --git a/operator_use/agent/tools/path_guard.py b/operator_use/agent/tools/path_guard.py new file mode 100644 index 0000000..3b9e223 --- /dev/null +++ b/operator_use/agent/tools/path_guard.py @@ -0,0 +1,148 @@ +"""Shared path restrictions for filesystem, patch, and terminal tools.""" + +from __future__ import annotations + +import re +from pathlib import Path +from typing import Iterable + +import operator_use + +from operator_use.paths import get_named_workspace_dir, get_userdata_dir + + +class PathAccessError(ValueError): + """Raised when a path or command violates path restrictions.""" + + +_WINDOWS_DRIVE_RE = re.compile(r"^[A-Za-z]:[\\/]") +_TRAVERSAL_RE = re.compile(r"(^|[\\/\s])\.\.([\\/]|$)") +_PATH_TOKEN_RE = re.compile(r'"[^"]+"|\'[^\']+\'|\S+') +_DISALLOWED_EXECUTORS = { + "python", + "python3", + "py", + "node", + "perl", + "ruby", + "powershell", + "pwsh", + "bash", + "sh", +} + + +def get_workspace_root(**kwargs) -> Path: + workspace = kwargs.get("_workspace") or get_named_workspace_dir("operator") + return Path(workspace).expanduser().resolve() + + +def _normalize_paths(paths: Iterable[str | Path]) -> tuple[Path, ...]: + normalized: list[Path] = [] + seen: set[str] = set() + for path in paths: + resolved = Path(path).expanduser().resolve() + key = str(resolved).lower() + if key in seen: + continue + normalized.append(resolved) + seen.add(key) + return tuple(normalized) + + +def default_protected_roots() -> tuple[Path, ...]: + codebase = Path(operator_use.__file__).resolve().parent.parent + userdata = get_userdata_dir().resolve() + return _normalize_paths(( + codebase, + userdata / "config.json", + )) + + +def get_protected_roots(*, protected_paths: Iterable[str | Path] | None = None) -> tuple[Path, ...]: + if protected_paths is None: + return default_protected_roots() + return _normalize_paths(protected_paths) + + +def _is_within(path: Path, root: Path) -> bool: + try: + path.relative_to(root) + return True + except ValueError: + return False + + +def _candidate_real_path(candidate: Path) -> Path: + real_parent = candidate.parent.resolve() + return (real_parent / candidate.name).resolve(strict=False) + + +def ensure_allowed_path( + path: str | Path, + *, + workspace: Path, + protected_paths: Iterable[str | Path] | None = None, +) -> Path: + workspace = workspace.expanduser().resolve() + candidate = Path(path).expanduser() + if not candidate.is_absolute(): + candidate = workspace / candidate + candidate = candidate.resolve(strict=False) + real_candidate = _candidate_real_path(candidate) + + if not _is_within(real_candidate, workspace): + raise PathAccessError(f"Access denied - path outside workspace: {real_candidate}") + + for protected in get_protected_roots(protected_paths=protected_paths): + if real_candidate == protected or _is_within(real_candidate, protected): + raise PathAccessError(f"Access denied - protected path: {real_candidate}") + + return real_candidate + + +def ensure_allowed_directory( + path: str | Path, + *, + workspace: Path, + protected_paths: Iterable[str | Path] | None = None, +) -> Path: + resolved = ensure_allowed_path(path, workspace=workspace, protected_paths=protected_paths) + if not resolved.exists(): + raise PathAccessError(f"Directory not found: {resolved}") + if not resolved.is_dir(): + raise PathAccessError(f"Path is not a directory: {resolved}") + return resolved + + +def validate_terminal_command( + command: str, + *, + workspace: Path, + protected_paths: Iterable[str | Path] | None = None, +) -> Path: + workspace = workspace.expanduser().resolve() + if _TRAVERSAL_RE.search(command): + raise PathAccessError("Command contains path traversal outside the workspace") + + tokens = [token.strip("\"'") for token in _PATH_TOKEN_RE.findall(command)] + if tokens: + executable = tokens[0].lower() + if executable in _DISALLOWED_EXECUTORS: + raise PathAccessError(f"Command uses disallowed executor '{tokens[0]}'") + + for token in tokens[1:]: + if not token or token.startswith("-"): + continue + if token in {".", ".\\"}: + continue + if ( + token.startswith(("~", "/", "\\")) or + _WINDOWS_DRIVE_RE.match(token) or + "/" in token or + "\\" in token or + token.startswith(".") + ): + ensure_allowed_path(token, workspace=workspace, protected_paths=protected_paths) + + return workspace diff --git a/operator_use/agent/tools/registry.py b/operator_use/agent/tools/registry.py index 567e7c9..9844760 100644 --- a/operator_use/agent/tools/registry.py +++ b/operator_use/agent/tools/registry.py @@ -1,8 +1,10 @@ """Tool registry for the agent module.""" -from operator_use.tools.service import Tool,ToolResult -from typing import Any import logging +from typing import Any + +from operator_use.agent.tools.governance import GovernanceProfile +from operator_use.tools.service import Tool, ToolResult logger = logging.getLogger(__name__) @@ -68,11 +70,27 @@ def _coerce_params(self, tool: Tool, params: dict) -> tuple[list[str], dict]: except Exception as e: return [str(e)], params + def _governance_profile(self) -> GovernanceProfile | None: + profile = self._extensions.get("_governance_profile") + if isinstance(profile, GovernanceProfile): + return profile + return None + + def _authorize(self, name: str) -> ToolResult | None: + profile = self._governance_profile() + if profile and not profile.allows(name): + return ToolResult.error_result( + f"Tool '{name}' is not allowed by the current governance profile" + ) + return None + def execute(self, name: str, params: dict) -> ToolResult: """Validate params, execute the tool by name, and return the result.""" tool = self._tools.get(name) if not tool: return ToolResult.error_result(f"Tool '{name}' not found") + if blocked := self._authorize(name): + return blocked if tool.model is not None: errors, coerced = self._coerce_params(tool, params) if errors: @@ -92,6 +110,9 @@ async def aexecute(self, name: str, params: dict) -> ToolResult: if not tool: logger.debug(f"Tool '{name}' not found. Available: {list(self._tools.keys())}") return ToolResult.error_result(f"Tool '{name}' not found") + if blocked := self._authorize(name): + logger.debug("Governance blocked tool '%s'", name) + return blocked if tool.model is not None: errors, coerced = self._coerce_params(tool, params) if errors: diff --git a/operator_use/cli/setup.py b/operator_use/cli/setup.py index 5d41279..c826aba 100644 --- a/operator_use/cli/setup.py +++ b/operator_use/cli/setup.py @@ -234,6 +234,13 @@ def _select_model(label: str, options: list[tuple[str, str]]) -> str: AgentDefaults, AgentsConfig, ProvidersConfig, ProviderConfig, ChannelsConfig, TelegramConfig, DiscordConfig, SlackConfig, AgentDefinition, ACPServerSettings, ACPAgentEntry, HeartbeatConfig, + PolicyDefinition, GovernanceConfig, +) + + +POLICY_TOOL_HINT = ( + "Examples: agents.*, message.*, channel.send, web.*, filesystem.read, " + "filesystem.list, filesystem.*, terminal.exec, process.*" ) @@ -286,10 +293,16 @@ def _save_config( setattr(providers, prov, ProviderConfig(api_key=key)) agent_list = [] + policies: dict[str, PolicyDefinition] = {} for a in agent_defs: defn_kwargs: dict = {"id": a["id"]} if a["llm_provider_key"] and a["llm_model"]: defn_kwargs["llm_config"] = LLMConfig(provider=a["llm_provider_key"], model=a["llm_model"]) + policy_name = (a.get("policy_name") or "").strip() + allowed_tools = [tool for tool in a.get("allowed_tools", []) if tool] + if policy_name and allowed_tools: + defn_kwargs["policy"] = policy_name + policies[policy_name] = PolicyDefinition(allowed_tools=allowed_tools) ch = a.get("channels", {}) if ch.get("telegram") or ch.get("discord") or ch.get("slack_bot"): agent_channels = ChannelsConfig() @@ -311,6 +324,8 @@ def _save_config( defaults=AgentDefaults(), list=agent_list, ), + policies=policies, + governance=GovernanceConfig(), stt=STTConfig(enabled=stt_enabled, provider=stt_provider_key or None, model=stt_model or None), tts=TTSConfig(enabled=tts_enabled, provider=tts_provider_key or None, model=tts_model or None, voice=tts_voice), providers=providers, @@ -492,6 +507,7 @@ def _render_configure_screen() -> None: acp_agents: dict[str, ACPAgentEntry] = { k: ACPAgentEntry(**v) for k, v in _acp_agents_raw.items() } if _acp_agents_raw else {} + _policies_raw = existing_data.get("policies", {}) or {} # Agent definitions: list of dicts with per-agent overrides. # None values mean "use global default". @@ -503,6 +519,8 @@ def _render_configure_screen() -> None: "id": a.get("id", ""), "llm_provider_key": _a_llm.get("provider") or None, "llm_model": _a_llm.get("model") or None, + "policy_name": a.get("policy", "") or "", + "allowed_tools": list((_policies_raw.get(a.get("policy", "") or "", {}) or {}).get("allowedTools", [])), "channels": { "telegram": _a_ch.get("telegram", {}).get("token", "") or "", "discord": _a_ch.get("discord", {}).get("token", "") or "", @@ -516,6 +534,7 @@ def _render_configure_screen() -> None: # Ensure at least one agent entry exists (edge case: corrupted config) if not agent_defs: agent_defs.append({"id": "operator", "llm_provider_key": None, "llm_model": None, + "policy_name": "operator", "allowed_tools": [], "channels": {"telegram": "", "discord": "", "slack_bot": "", "slack_app": ""}, "browser_use": True, "computer_use": False}) @@ -534,6 +553,36 @@ def _configure_llm(cur_prov: str, cur_model: str) -> tuple[str, str]: api_keys_dict[prov_key] = text_input(f"Enter API Key for {prov_name}:", is_password=True) return prov_key, model + def _configure_policy(idx: int) -> None: + a = agent_defs[idx] + current_name = a.get("policy_name") or a["id"] + current_tools = list(a.get("allowed_tools", [])) + + console.print("│") + console.print(f"│ [dim]{POLICY_TOOL_HINT}[/dim]") + + policy_name = text_input("Policy name:", default=current_name).strip() + if not policy_name: + policy_name = a["id"] + + if any( + other.get("policy_name") == policy_name + for i2, other in enumerate(agent_defs) + if i2 != idx + ): + console.print("│") + console.print(f"│ [red]Policy name '{policy_name}' is already used by another agent.[/red]") + return + + tools_raw = text_input( + "Allowed tools (comma-separated, blank to clear):", + default=", ".join(current_tools), + ).strip() + allowed_tools = [tool.strip() for tool in tools_raw.split(",") if tool.strip()] + + agent_defs[idx]["policy_name"] = policy_name if allowed_tools else "" + agent_defs[idx]["allowed_tools"] = allowed_tools + # ── Per-agent submenu ───────────────────────────────────────────────────── def _agent_submenu(idx: int) -> None: while True: @@ -554,10 +603,17 @@ def _agent_submenu(idx: int) -> None: computer_use = a.get("computer_use", False) bu_label = "enabled" if browser_use else "disabled" cu_label = "enabled" if computer_use else "disabled" + policy_name = a.get("policy_name", "") + allowed_tools = a.get("allowed_tools", []) + if policy_name and allowed_tools: + policy_label = f"{policy_name} ({len(allowed_tools)} tools)" + else: + policy_label = "unrestricted" choice = select(f"Configure agent: {a['id']}", [ f"Rename {a['id']}", f"LLM {a_llm_label}", + f"Policy {policy_label}", f"Channels {ch_label}", f"Browser Use {bu_label}", f"Computer Use {cu_label}", @@ -575,7 +631,10 @@ def _agent_submenu(idx: int) -> None: console.print("│") console.print(f"│ [red]Name '{new_id}' is already taken.[/red]") else: + old_id = agent_defs[idx]["id"] agent_defs[idx]["id"] = new_id + if agent_defs[idx].get("policy_name") == old_id: + agent_defs[idx]["policy_name"] = new_id elif choice.startswith("LLM"): prov_choice = select("Pick LLM provider for this agent:", list(LLM_PROVIDERS.keys())) @@ -589,6 +648,9 @@ def _agent_submenu(idx: int) -> None: agent_defs[idx]["llm_provider_key"] = prov_key agent_defs[idx]["llm_model"] = model + elif choice.startswith("Policy"): + _configure_policy(idx) + elif choice.startswith("Channels"): ch = agent_defs[idx].setdefault("channels", {"telegram": "", "discord": "", "slack_bot": "", "slack_app": ""}) while True: @@ -687,6 +749,8 @@ def _agents_menu() -> None: "id": new_id, "llm_provider_key": None, "llm_model": None, + "policy_name": new_id, + "allowed_tools": [], "channels": {"telegram": "", "discord": "", "slack_bot": "", "slack_app": ""}, "browser_use": True, "computer_use": False, diff --git a/operator_use/cli/start.py b/operator_use/cli/start.py index 72b4d06..ce5cacc 100644 --- a/operator_use/cli/start.py +++ b/operator_use/cli/start.py @@ -54,6 +54,7 @@ def setup_logging(userdata_dir: Path, verbose: bool = False) -> None: import operator_use from operator_use.agent import Agent +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.orchestrator import Orchestrator from operator_use.bus import Bus from operator_use.gateway import Gateway @@ -72,7 +73,6 @@ def setup_logging(userdata_dir: Path, verbose: bool = False) -> None: from operator_use.config import Config, load_config, AgentDefinition from operator_use.paths import get_named_workspace_dir from typing import Optional -from pathlib import Path LLM_CLASS_MAP = { "openai": "ChatOpenAI", @@ -172,6 +172,42 @@ def _resolve_agent_workspace(defn: AgentDefinition) -> Path: return get_named_workspace_dir(defn.id) +def _build_governance_profile(config: Config, defn: AgentDefinition) -> GovernanceProfile | None: + if not defn.policy: + return None + policy = config.policies.get(defn.policy) + if policy is None: + raise ValueError( + f"Agent '{defn.id}' references unknown policy '{defn.policy}'. " + "Add it under policies in config.json." + ) + return GovernanceProfile(allowed_tools=list(policy.allowed_tools)) + + +def _build_protected_paths(config: Config) -> list[Path]: + governance = config.governance + protected_paths: list[Path] = [] + + if governance.protect_codebase: + protected_paths.append(Path(operator_use.__file__).resolve().parent.parent) + if governance.protect_runtime_config: + from operator_use.paths import get_userdata_dir + + protected_paths.append(get_userdata_dir().resolve() / "config.json") + + protected_paths.extend(Path(path).expanduser().resolve() for path in governance.protected_paths) + + deduped: list[Path] = [] + seen: set[str] = set() + for path in protected_paths: + key = str(path).lower() + if key in seen: + continue + deduped.append(path) + seen.add(key) + return deduped + + def _build_agents(config: Config, cron, gateway, bus) -> dict[str, Agent]: """Instantiate one Agent per agent definition in config.""" @@ -180,6 +216,7 @@ def _build_agents(config: Config, cron, gateway, bus) -> dict[str, Agent]: defaults = config.agents.defaults agent_defs = config.agents.list + protected_paths = _build_protected_paths(config) if not agent_defs: raise ValueError("No agents defined in config. Run 'operator onboard' to set up an agent.") @@ -193,6 +230,7 @@ def _build_agents(config: Config, cron, gateway, bus) -> dict[str, Agent]: if llm is None: raise ValueError(f"Agent '{defn.id}': failed to initialize LLM provider '{llm_conf.provider}'. Check the provider name and API key.") workspace = _resolve_agent_workspace(defn) + governance_profile = _build_governance_profile(config, defn) plugins = [ ComputerPlugin(enabled=bool(defn.computer_use)), @@ -210,6 +248,8 @@ def _build_agents(config: Config, cron, gateway, bus) -> dict[str, Agent]: bus=bus, acp_registry=config.acp_agents, plugins=plugins, + governance_profile=governance_profile, + protected_paths=protected_paths, ) for agent in agents.values(): diff --git a/operator_use/config/__init__.py b/operator_use/config/__init__.py index 960e713..a0d83c8 100644 --- a/operator_use/config/__init__.py +++ b/operator_use/config/__init__.py @@ -5,6 +5,8 @@ STTConfig, TTSConfig, AgentDefaults, + PolicyDefinition, + GovernanceConfig, AgentDefinition, AgentsConfig, AgentRouteBinding, @@ -28,6 +30,8 @@ "STTConfig", "TTSConfig", "AgentDefaults", + "PolicyDefinition", + "GovernanceConfig", "AgentDefinition", "AgentsConfig", "AgentRouteBinding", diff --git a/operator_use/config/service.py b/operator_use/config/service.py index 55f70ed..768a6db 100644 --- a/operator_use/config/service.py +++ b/operator_use/config/service.py @@ -139,12 +139,27 @@ class AgentDefaults(Base): streaming: bool = True +class PolicyDefinition(Base): + """Named governance policy for an agent.""" + + allowed_tools: List[str] = Field(default_factory=list) + + +class GovernanceConfig(Base): + """Global governance settings shared across agents.""" + + protect_codebase: bool = True + protect_runtime_config: bool = True + protected_paths: List[str] = Field(default_factory=list) + + class AgentDefinition(Base): """Individual agent definition.""" id: str description: str = "" # Short role/capability summary used for delegation and routing hints workspace: Optional[str] = None # Defaults to ~/.operator-use/workspaces/ + policy: Optional[str] = None # Named governance policy from config.policies llm_config: Optional[LLMConfig] = None # Overrides agents.defaults.llm_config max_tool_iterations: Optional[int] = None # Overrides agents.defaults channels: Optional["ChannelsConfig"] = None # Per-agent dedicated channel bots @@ -230,6 +245,8 @@ class Config(BaseSettings): """Root configuration for Operator.""" agents: AgentsConfig = Field(default_factory=AgentsConfig) + policies: Dict[str, PolicyDefinition] = Field(default_factory=dict) + governance: GovernanceConfig = Field(default_factory=GovernanceConfig) bindings: List[AgentRouteBinding] = Field(default_factory=list) stt: STTConfig = Field(default_factory=STTConfig) tts: TTSConfig = Field(default_factory=TTSConfig) diff --git a/operator_use/subagent/manager.py b/operator_use/subagent/manager.py index a675e67..a6420c5 100644 --- a/operator_use/subagent/manager.py +++ b/operator_use/subagent/manager.py @@ -3,8 +3,10 @@ import asyncio import uuid from datetime import datetime +from pathlib import Path from typing import TYPE_CHECKING +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.bus import Bus from operator_use.subagent.views import SubagentRecord @@ -15,15 +17,34 @@ class SubagentManager: """Registry that spawns isolated subagent tasks and tracks their lifecycle.""" - def __init__(self, llm: "BaseChatLLM", bus: Bus) -> None: + def __init__( + self, + llm: "BaseChatLLM", + bus: Bus, + workspace: Path, + protected_paths: list[Path] | None = None, + ) -> None: from operator_use.subagent.service import Subagent - self._runner = Subagent(llm=llm, bus=bus) + self._runner = Subagent( + llm=llm, + bus=bus, + workspace=workspace, + protected_paths=protected_paths, + ) self._tasks: dict[str, asyncio.Task] = {} self._session_tasks: dict[str, set[str]] = {} self._records: dict[str, SubagentRecord] = {} - async def ainvoke(self, task: str, label: str | None, channel: str, chat_id: str, account_id: str = "") -> str: + async def ainvoke( + self, + task: str, + label: str | None, + channel: str, + chat_id: str, + account_id: str = "", + governance_profile: GovernanceProfile | None = None, + ) -> str: """Spawn a background subagent. Returns task_id immediately.""" task_id = f"sub_{uuid.uuid4().hex[:8]}" session_key = f"{channel}:{chat_id}" @@ -42,7 +63,7 @@ async def ainvoke(self, task: str, label: str | None, channel: str, chat_id: str self._records[task_id] = record t = asyncio.create_task( - self._runner.run(record), + self._runner.run(record, governance_profile=governance_profile), name=f"subagent-{task_id}", ) self._tasks[task_id] = t diff --git a/operator_use/subagent/service.py b/operator_use/subagent/service.py index 431f54f..181d57f 100644 --- a/operator_use/subagent/service.py +++ b/operator_use/subagent/service.py @@ -3,8 +3,10 @@ import asyncio import logging from datetime import datetime +from pathlib import Path from typing import TYPE_CHECKING +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.agent.tools import ToolRegistry from operator_use.agent.tools.builtin import FILESYSTEM_TOOLS, WEB_TOOLS, TERMINAL_TOOLS from operator_use.bus import Bus, IncomingMessage @@ -30,17 +32,34 @@ class Subagent: """Runs an isolated agent loop for a single delegated task.""" - def __init__(self, llm: "BaseChatLLM", bus: Bus) -> None: + def __init__( + self, + llm: "BaseChatLLM", + bus: Bus, + workspace: Path, + protected_paths: list[Path] | None = None, + ) -> None: self.llm = llm self.bus = bus - - async def run(self, record: SubagentRecord) -> None: + self.workspace = workspace + self.protected_paths = [Path(path).expanduser().resolve() for path in (protected_paths or [])] + + async def run( + self, + record: SubagentRecord, + governance_profile: GovernanceProfile | None = None, + ) -> None: """Execute the task and update the record with status/result when done.""" logger.info(f"[{record.task_id}] subagent '{record.label}' started") registry = ToolRegistry() registry.register_tools(FILESYSTEM_TOOLS + WEB_TOOLS + TERMINAL_TOOLS) registry.set_extension("_llm", self.llm) + registry.set_extension("_workspace", self.workspace) + if self.protected_paths: + registry.set_extension("_protected_paths", self.protected_paths) + if governance_profile is not None: + registry.set_extension("_governance_profile", governance_profile) tools = registry.list_tools() diff --git a/tests/test_agent.py b/tests/test_agent.py index 5814f7d..08a4782 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -4,8 +4,9 @@ from unittest.mock import AsyncMock, MagicMock from operator_use.agent.service import Agent +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.bus.views import IncomingMessage, TextPart -from operator_use.messages.service import AIMessage, HumanMessage +from operator_use.messages.service import AIMessage, HumanMessage, ToolMessage from operator_use.providers.events import LLMEvent, LLMEventType, ToolCall @@ -239,3 +240,96 @@ async def on_end(ctx): fired.append("end") await agent.run(message=HumanMessage(content="hi"), session_id="hook:session") assert "start" in fired assert "end" in fired + + +@pytest.mark.asyncio +async def test_agent_governance_blocks_builtin_terminal_tool(tmp_path): + llm = MagicMock() + llm.model_name = "mock-llm" + llm.astream = None + llm.ainvoke = AsyncMock(side_effect=[ + LLMEvent( + type=LLMEventType.TOOL_CALL, + tool_call=ToolCall(id="t1", name="terminal", params={"cmd": "echo hello", "timeout": 5}), + ), + LLMEvent(type=LLMEventType.TEXT, content="blocked as expected"), + ]) + + agent = Agent( + llm=llm, + workspace=tmp_path, + governance_profile=GovernanceProfile(allowed_tools=["web.*"]), + ) + + response = await agent.run(message=HumanMessage(content="use terminal"), session_id="gov:block") + + assert response.content == "blocked as expected" + session = agent.sessions.get_or_create("gov:block") + tool_messages = [m for m in session.messages if isinstance(m, ToolMessage)] + assert tool_messages + assert "not allowed" in tool_messages[-1].content + + +@pytest.mark.asyncio +async def test_agent_local_delegation_scoped_profile_allows_target_read(tmp_path): + shared_file = tmp_path / "note.txt" + shared_file.write_text("delegated content", encoding="utf-8") + + boss_llm = MagicMock() + boss_llm.model_name = "boss-llm" + boss_llm.astream = None + boss_llm.ainvoke = AsyncMock(side_effect=[ + LLMEvent( + type=LLMEventType.TOOL_CALL, + tool_call=ToolCall( + id="boss-1", + name="localagents", + params={ + "action": "run", + "name": "reader", + "task": "Read note.txt and answer with its contents.", + "allowed_tools": ["filesystem.read"], + }, + ), + ), + LLMEvent(type=LLMEventType.TEXT, content="delegation complete"), + ]) + + reader_llm = MagicMock() + reader_llm.model_name = "reader-llm" + reader_llm.astream = None + reader_llm.ainvoke = AsyncMock(side_effect=[ + LLMEvent( + type=LLMEventType.TOOL_CALL, + tool_call=ToolCall( + id="reader-1", + name="read_file", + params={"path": "note.txt"}, + ), + ), + LLMEvent(type=LLMEventType.TEXT, content="done"), + ]) + + boss = Agent( + llm=boss_llm, + agent_id="boss", + workspace=tmp_path, + governance_profile=GovernanceProfile(allowed_tools=["agents.*"]), + ) + reader = Agent( + llm=reader_llm, + agent_id="reader", + workspace=tmp_path, + ) + + registry = {"boss": boss, "reader": reader} + boss.tool_register.set_extension("_agent_registry", registry) + reader.tool_register.set_extension("_agent_registry", registry) + + response = await boss.run(message=HumanMessage(content="delegate"), session_id="gov:delegate") + + assert response.content == "delegation complete" + reader_session = reader.sessions.get_or_create("delegation__delegate__boss-to-reader") + tool_messages = [m for m in reader_session.messages if isinstance(m, ToolMessage)] + assert tool_messages + assert "delegated content" in tool_messages[-1].content diff --git a/tests/test_config.py b/tests/test_config.py index fbca659..1a50711 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,6 +1,7 @@ """Tests for configuration models and load_config.""" import json +from pathlib import Path import pytest from pydantic import ValidationError @@ -16,11 +17,14 @@ AgentDefinition, AgentDefaults, AgentsConfig, + PolicyDefinition, + GovernanceConfig, BindingMatch, AgentRouteBinding, Config, load_config, ) +from operator_use.cli.start import _build_governance_profile, _build_protected_paths # --- Channel configs --- @@ -121,6 +125,11 @@ def test_agent_definition_default_workspace_none(): assert a.workspace is None +def test_agent_definition_policy(): + a = AgentDefinition(id="boss", policy="manager") + assert a.policy == "manager" + + # --- AgentsConfig --- def test_agents_config_defaults(): @@ -160,6 +169,10 @@ def test_config_defaults(): c = Config() assert c.bindings == [] assert c.agents.list == [] + assert c.policies == {} + assert c.governance.protect_codebase is True + assert c.governance.protect_runtime_config is True + assert c.governance.protected_paths == [] def test_config_default_agent_none_when_empty(): @@ -181,8 +194,14 @@ def test_load_config_no_file(tmp_path): def test_load_config_from_json(tmp_path): data = { + "policies": { + "researcher": {"allowed_tools": ["web.*", "filesystem.read"]} + }, + "governance": { + "protectedPaths": ["~/secret-area"] + }, "agents": { - "list": [{"id": "json-agent", "browser_use": False}] + "list": [{"id": "json-agent", "browser_use": False, "policy": "researcher"}] } } (tmp_path / "config.json").write_text(json.dumps(data), encoding="utf-8") @@ -194,3 +213,55 @@ def test_load_config_invalid_json_uses_defaults(tmp_path): (tmp_path / "config.json").write_text("{ invalid json }", encoding="utf-8") cfg = load_config(tmp_path) assert isinstance(cfg, Config) + + +def test_build_governance_profile_from_named_policy(): + cfg = Config( + policies={ + "manager": PolicyDefinition(allowed_tools=["agents.*", "message.*"]) + }, + agents=AgentsConfig(list=[AgentDefinition(id="boss", policy="manager")]), + ) + + profile = _build_governance_profile(cfg, cfg.agents.list[0]) + + assert profile is not None + assert profile.allowed_tools == ["agents.*", "message.*"] + + +def test_build_governance_profile_missing_policy_raises(): + cfg = Config(agents=AgentsConfig(list=[AgentDefinition(id="boss", policy="missing")])) + + with pytest.raises(ValueError, match="unknown policy"): + _build_governance_profile(cfg, cfg.agents.list[0]) + + +def test_build_protected_paths_includes_defaults_and_custom(): + custom_path = Path("~/custom-guard").expanduser().resolve() + cfg = Config( + governance=GovernanceConfig( + protected_paths=[str(custom_path)], + ) + ) + + protected_paths = _build_protected_paths(cfg) + protected_strings = {str(path) for path in protected_paths} + + assert any(path.endswith("config.json") for path in protected_strings) + assert any(path.lower().endswith("operator-use") for path in protected_strings) + assert str(custom_path) in protected_strings + + +def test_build_protected_paths_can_disable_defaults(): + custom_path = Path("D:/custom/protected").resolve() + cfg = Config( + governance=GovernanceConfig( + protect_codebase=False, + protect_runtime_config=False, + protected_paths=[str(custom_path)], + ) + ) + + protected_paths = _build_protected_paths(cfg) + + assert protected_paths == [custom_path] diff --git a/tests/test_governance_e2e.py b/tests/test_governance_e2e.py new file mode 100644 index 0000000..c32d645 --- /dev/null +++ b/tests/test_governance_e2e.py @@ -0,0 +1,217 @@ +"""Online E2E coverage for governance using the configured DeepSeek provider.""" + +import asyncio +import os + +import pytest + +from operator_use.agent.tools.governance import GovernanceProfile +from operator_use.bus import Bus +from operator_use.cli.start import _build_agents +from operator_use.config.service import ( + AgentDefinition, + AgentsConfig, + Config, + GovernanceConfig, + LLMConfig, + PolicyDefinition, + ProviderConfig, + ProvidersConfig, + load_config, +) +from operator_use.messages import HumanMessage, ToolMessage +from operator_use.paths import get_userdata_dir + + +def _deepseek_api_key() -> str: + env_api_key = os.environ.get("DEEPSEEK_API_KEY", "").strip() + if env_api_key: + return env_api_key + config = load_config(get_userdata_dir()) + api_key = config.providers.deepseek.api_key.strip() + if not api_key or api_key == "sk-823357d8cdc840668f56681c77bff745": + return "" + return api_key + + +def _tool_messages(agent, session_id: str) -> list[ToolMessage]: + session = agent.sessions.get_or_create(session_id) + return [message for message in session.messages if isinstance(message, ToolMessage)] + + +def _any_tool_message_contains(agent, text: str) -> bool: + lowered = text.lower() + for session in agent.sessions._sessions.values(): + for message in session.messages: + if isinstance(message, ToolMessage) and lowered in (message.content or "").lower(): + return True + return False + + +def _build_live_agents(tmp_path): + api_key = _deepseek_api_key() + if not api_key: + pytest.skip("DeepSeek API key is not configured in ~/.operator-use/config.json") + + boss_workspace = tmp_path / "boss" + researcher_workspace = tmp_path / "researcher" + coder_workspace = tmp_path / "coder" + locked_dir = coder_workspace / "locked" + + for workspace in (boss_workspace, researcher_workspace, coder_workspace, locked_dir): + workspace.mkdir(parents=True, exist_ok=True) + + (researcher_workspace / "note.txt").write_text("RESEARCH_E2E_OK", encoding="utf-8") + (locked_dir / "secret.txt").write_text("TOP_SECRET_E2E", encoding="utf-8") + + config = Config( + agents=AgentsConfig( + defaults={"maxToolIterations": 8, "streaming": False}, + list=[ + AgentDefinition( + id="boss", + description="Coordinator", + workspace=str(boss_workspace), + policy="manager", + llm_config=LLMConfig(provider="deepseek", model="deepseek-chat"), + browser_use=False, + computer_use=False, + ), + AgentDefinition( + id="researcher", + description="Reader", + workspace=str(researcher_workspace), + policy="researcher", + llm_config=LLMConfig(provider="deepseek", model="deepseek-chat"), + browser_use=False, + computer_use=False, + ), + AgentDefinition( + id="coder", + description="Terminal specialist", + workspace=str(coder_workspace), + policy="coder", + llm_config=LLMConfig(provider="deepseek", model="deepseek-chat"), + browser_use=False, + computer_use=False, + ), + ], + ), + policies={ + "manager": PolicyDefinition(allowed_tools=["agents.*", "message.*", "channel.send"]), + "researcher": PolicyDefinition(allowed_tools=["filesystem.read", "filesystem.list", "message.*"]), + "coder": PolicyDefinition(allowed_tools=["filesystem.*", "terminal.exec", "process.*", "message.*"]), + }, + governance=GovernanceConfig( + protect_codebase=True, + protect_runtime_config=True, + protected_paths=[str(locked_dir)], + ), + providers=ProvidersConfig( + deepseek=ProviderConfig(api_key=api_key), + ), + ) + + agents = _build_agents(config, cron=None, gateway=None, bus=Bus()) + return agents + + +async def _wait_for_subagent(subagent_store, task_id: str, timeout: float = 60.0): + deadline = asyncio.get_running_loop().time() + timeout + while asyncio.get_running_loop().time() < deadline: + record = subagent_store.get_record(task_id) + if record is not None and record.finished_at is not None: + return record + await asyncio.sleep(1) + raise AssertionError(f"Timed out waiting for subagent {task_id}") + + +@pytest.mark.asyncio +async def test_governance_e2e_deepseek_direct_delegate_and_subagent(tmp_path): + agents = _build_live_agents(tmp_path) + boss = agents["boss"] + researcher = agents["researcher"] + coder = agents["coder"] + + boss_session_id = "e2e:boss-direct-block" + boss_response = await boss.run( + message=HumanMessage( + content=( + "Use the terminal tool to run `echo BOSS_DIRECT_GOV`. " + "Do not delegate. After the tool result, reply with `boss-direct-complete`." + ) + ), + session_id=boss_session_id, + ) + assert "boss-direct" in boss_response.content.lower() + assert any("not allowed" in message.content.lower() for message in _tool_messages(boss, boss_session_id)) + + coder_terminal_session = "e2e:coder-terminal" + coder_terminal_response = await coder.run( + message=HumanMessage( + content=( + "Use only the terminal tool. Run " + "`echo CODER_DIRECT_E2E_OK > direct_ok.txt && type direct_ok.txt`. " + "Then reply with exactly what the command printed." + ) + ), + session_id=coder_terminal_session, + ) + assert "CODER_DIRECT_E2E_OK" in coder_terminal_response.content + assert any("CODER_DIRECT_E2E_OK" in message.content for message in _tool_messages(coder, coder_terminal_session)) + + coder_protected_session = "e2e:coder-protected" + coder_protected_response = await coder.run( + message=HumanMessage( + content=( + "Use only the read_file tool on `locked/secret.txt`. " + "Do not use terminal. After the tool result, reply with `protected-path-complete`." + ) + ), + session_id=coder_protected_session, + ) + assert "protected-path" in coder_protected_response.content.lower() + assert any("protected path" in message.content.lower() for message in _tool_messages(coder, coder_protected_session)) + + boss_delegate_coder_session = "e2e:boss-delegate-coder" + boss_delegate_coder_response = await boss.run( + message=HumanMessage( + content=( + "Use localagents to ask `coder` to use only the terminal tool and run " + "`echo DELEGATED_CODER_E2E_OK > delegated_ok.txt && type delegated_ok.txt`. " + "Ask it to return only the command output, then return that output to me." + ) + ), + session_id=boss_delegate_coder_session, + ) + assert "DELEGATED_CODER_E2E_OK" in boss_delegate_coder_response.content + assert _any_tool_message_contains(coder, "DELEGATED_CODER_E2E_OK") + + boss_delegate_researcher_session = "e2e:boss-delegate-researcher" + boss_delegate_researcher_response = await boss.run( + message=HumanMessage( + content=( + "Use localagents to ask `researcher` to read `note.txt` using only `filesystem.read`. " + "Ask it to return only the file contents, then return only those contents to me." + ) + ), + session_id=boss_delegate_researcher_session, + ) + assert "RESEARCH_E2E_OK" in boss_delegate_researcher_response.content + assert _any_tool_message_contains(researcher, "RESEARCH_E2E_OK") + + subagent_task_id = await coder.subagent_store.ainvoke( + task=( + "Use only the read_file tool on `locked/secret.txt`. " + "Do not use terminal. If blocked, summarize the error in one short sentence." + ), + label="locked-read", + channel="test", + chat_id="governance-e2e", + governance_profile=GovernanceProfile(allowed_tools=["filesystem.read"]), + ) + subagent_record = await _wait_for_subagent(coder.subagent_store, subagent_task_id) + + assert subagent_record.status == "completed" + assert subagent_record.result is not None + assert "protected path" in subagent_record.result.lower() diff --git a/tests/test_local_agents.py b/tests/test_local_agents.py index be0c475..27587c4 100644 --- a/tests/test_local_agents.py +++ b/tests/test_local_agents.py @@ -2,6 +2,7 @@ import pytest +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.agent.tools.builtin.local_agents import localagents from operator_use.messages.service import AIMessage @@ -27,8 +28,10 @@ async def test_localagents_lists_available_agents(): ) assert result.success is True + assert result.output is not None assert "manager (current)" in result.output assert "Browser specialist" in result.output + assert "[capabilities:" not in result.output @pytest.mark.asyncio @@ -65,4 +68,28 @@ async def test_localagents_refuses_self_delegation(): ) assert result.success is False + assert result.error is not None assert "Refusing to delegate" in result.error + + +@pytest.mark.asyncio +async def test_localagents_passes_governance_profile_to_target(): + current = make_target_agent(description="Manager") + target = make_target_agent(response_text="done", description="Writer") + + result = await localagents.ainvoke( + action="run", + name="writer", + task="Draft a summary", + allowed_tools=["web.*"], + _agent=current, + _agent_id="manager", + _session_id="chat:42", + _agent_registry={"manager": current, "writer": target}, + ) + + assert result.success is True + incoming = target.run.await_args.kwargs["incoming"] + profile = incoming.metadata["_governance_profile"] + assert profile["allowed_tools"] == ["web.*"] + assert "parent" not in profile diff --git a/tests/test_tool_paths.py b/tests/test_tool_paths.py new file mode 100644 index 0000000..4701f07 --- /dev/null +++ b/tests/test_tool_paths.py @@ -0,0 +1,105 @@ +import pytest + +from operator_use.agent.tools.builtin.filesystem import edit_file, list_dir, read_file, write_file +from operator_use.agent.tools.builtin.patch import patch_file +from operator_use.agent.tools.builtin.terminal import terminal +from operator_use.paths import get_userdata_dir + + +@pytest.mark.asyncio +async def test_read_file_denies_runtime_config(tmp_path): + config_path = get_userdata_dir() / "config.json" + + result = await read_file.ainvoke(path=str(config_path), _workspace=tmp_path) + + assert result.success is False + assert "outside workspace" in result.error or "protected path" in result.error + + +@pytest.mark.asyncio +async def test_write_file_denies_outside_workspace(tmp_path): + result = await write_file.ainvoke( + path=str(get_userdata_dir() / "config.json"), + content="x", + _workspace=tmp_path, + ) + + assert result.success is False + assert "outside workspace" in result.error or "protected path" in result.error + + +@pytest.mark.asyncio +async def test_edit_file_denies_outside_workspace(tmp_path): + result = await edit_file.ainvoke( + path=str(get_userdata_dir() / "config.json"), + edits=[{"old_content": "a", "new_content": "b"}], + _workspace=tmp_path, + ) + + assert result.success is False + assert "outside workspace" in result.error or "protected path" in result.error + + +@pytest.mark.asyncio +async def test_patch_file_denies_outside_workspace(tmp_path): + result = await patch_file.ainvoke( + path=str(get_userdata_dir() / "config.json"), + patch="@@ -1 +1 @@\n-a\n+b", + _workspace=tmp_path, + ) + + assert result.success is False + assert "outside workspace" in result.error or "protected path" in result.error + + +@pytest.mark.asyncio +async def test_list_dir_denies_parent_escape(tmp_path): + result = await list_dir.ainvoke(path="..", _workspace=tmp_path) + + assert result.success is False + assert "outside workspace" in result.error + + +@pytest.mark.asyncio +async def test_terminal_runs_from_workspace(tmp_path): + result = await terminal.ainvoke(cmd="echo %CD%", timeout=5, _workspace=tmp_path) + + assert result.success is True + assert str(tmp_path) in result.output + + +@pytest.mark.asyncio +async def test_terminal_denies_path_escape(tmp_path): + result = await terminal.ainvoke(cmd="type ..\\secret.txt", timeout=5, _workspace=tmp_path) + + assert result.success is False + assert "path traversal" in result.error or "outside workspace" in result.error + + +@pytest.mark.asyncio +async def test_terminal_denies_python_executor(tmp_path): + result = await terminal.ainvoke( + cmd='python -c "print(123)"', + timeout=5, + _workspace=tmp_path, + ) + + assert result.success is False + assert "disallowed executor" in result.error + + +@pytest.mark.asyncio +async def test_read_file_denies_configured_protected_path_inside_workspace(tmp_path): + protected_dir = tmp_path / "protected" + protected_dir.mkdir() + secret_file = protected_dir / "secret.txt" + secret_file.write_text("nope", encoding="utf-8") + + result = await read_file.ainvoke( + path="protected/secret.txt", + _workspace=tmp_path, + _protected_paths=[protected_dir], + ) + + assert result.success is False + assert "protected path" in result.error diff --git a/tests/test_tool_registry.py b/tests/test_tool_registry.py index 92ff64f..4a67dd3 100644 --- a/tests/test_tool_registry.py +++ b/tests/test_tool_registry.py @@ -3,6 +3,7 @@ import pytest from pydantic import BaseModel +from operator_use.agent.tools.governance import GovernanceProfile from operator_use.agent.tools.registry import ToolRegistry from operator_use.tools.service import Tool @@ -208,6 +209,29 @@ async def _async_echo(message: str, repeat: int = 1, **kwargs): assert "async:hi" in result.output +@pytest.mark.asyncio +async def test_aexecute_governance_blocks_tool(): + reg = ToolRegistry() + reg.register(echo_tool) + reg.set_extension("_governance_profile", GovernanceProfile(allowed_tools=["web.*"])) + + result = await reg.aexecute("echo", {"message": "blocked"}) + + assert result.success is False + assert "not allowed" in result.error + + +@pytest.mark.asyncio +async def test_aexecute_governance_accepts_alias_pattern(): + reg = ToolRegistry() + reg.register(echo_tool) + reg.set_extension("_governance_profile", GovernanceProfile(allowed_tools=["echo"])) + + result = await reg.aexecute("echo", {"message": "allowed"}) + + assert result.success is True + + # --- register_tools / unregister_tools --- def test_register_tools_list():