diff --git a/cli.py b/cli.py index da917ae19065..c9a02d2cece9 100644 --- a/cli.py +++ b/cli.py @@ -5915,6 +5915,43 @@ def _handle_personality_command(self, cmd: str): print(" Usage: /personality ") print() + def _handle_b00t_command(self, cmd: str): + """Handle /b00t command β€” route to b00t-cli for execution.""" + import shlex, subprocess, textwrap + + # Strip command prefix, get args + parts = shlex.split(cmd) + # parts[0] is "/b00t" or "b00t" + b00t_args = parts[1:] if len(parts) > 1 else ["--help"] + + # Check b00t-cli exists + b00t_bin = shutil.which("b00t-cli") or shutil.which("b00t") + if not b00t_bin: + print("πŸ₯Ύ b00t-cli not found in PATH") + print(" Install: cd ~/.b00t && just install") + print(" Or: cargo install --path ~/.b00t/b00t-cli --force") + return + + try: + result = subprocess.run( + [b00t_bin] + b00t_args, + capture_output=True, text=True, timeout=30, + ) + if result.stdout: + # Strip trailing whitespace, present cleanly + out = result.stdout.rstrip() + _cprint(_rich_text_from_ansi(out)) + if result.stderr: + err = result.stderr.rstrip() + if err: + _cprint(f"[dim]{err}[/]") + if result.returncode != 0: + _cprint(f"[bold red]πŸ₯Ύ b00t exited {result.returncode}[/]") + except subprocess.TimeoutExpired: + _cprint("[bold red]πŸ₯Ύ b00t command timed out (30s)[/]") + except Exception as e: + _cprint(f"[bold red]πŸ₯Ύ b00t error: {e}[/]") + def _handle_cron_command(self, cmd: str): """Handle the /cron command to manage scheduled tasks.""" import shlex @@ -6521,6 +6558,11 @@ def process_command(self, command: str) -> bool: print(f" {status} {p['name']}{version}{detail}{error}") except Exception as e: print(f"Plugin system error: {e}") + elif canonical == "b00t": + self._handle_b00t_command(cmd_original) + elif canonical == "hive": + # /hive is an alias for /b00t hive + self._handle_b00t_command(f"/b00t hive {cmd_original.split(None, 1)[1] if len(cmd_original.split(None, 1)) > 1 else ''}".strip()) elif canonical == "rollback": self._handle_rollback_command(cmd_original) elif canonical == "snapshot": diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index 1b4b85bd679a..43ec58d8175e 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -175,6 +175,12 @@ class CommandDef: CommandDef("plugins", "List installed plugins and their status", "Tools & Skills", cli_only=True), + # b00t Integration (PromptExecution fork) + CommandDef("b00t", "Run a b00t command (hive, grok, learn, task, etc.)", "b00t", + cli_only=True, args_hint=" [args...]"), + CommandDef("hive", "Alias for b00t hive β€” system state CMDB", "b00t", + cli_only=True, args_hint="[status|list|plan|activate|peers]"), + # Info CommandDef("commands", "Browse all commands and skills (paginated)", "Info", gateway_only=True, args_hint="[page]"), diff --git a/hermes_cli/plugins.py b/hermes_cli/plugins.py index e921034699f4..a882aa1c8e02 100644 --- a/hermes_cli/plugins.py +++ b/hermes_cli/plugins.py @@ -1169,23 +1169,22 @@ def invoke_hook(hook_name: str, **kwargs: Any) -> List[Any]: -def get_pre_tool_call_block_message( +def get_pre_tool_call_directives( tool_name: str, args: Optional[Dict[str, Any]], task_id: str = "", session_id: str = "", tool_call_id: str = "", -) -> Optional[str]: - """Check ``pre_tool_call`` hooks for a blocking directive. +) -> tuple[Optional[str], Optional[Dict[str, Any]]]: + """Fire ``pre_tool_call`` hook ONCE, return (block_message, rewritten_args). - Plugins that need to enforce policy (rate limiting, security - restrictions, approval workflows) can return:: + Plugins can return either a block or rewrite directive:: - {"action": "block", "message": "Reason the tool was blocked"} + {"action": "block", "message": "Reason"} + {"action": "rewrite", "args": {}} - from their ``pre_tool_call`` callback. The first valid block - directive wins. Invalid or irrelevant hook return values are - silently ignored so existing observer-only hooks are unaffected. + Observer-only hooks returning ``None`` are unaffected. + The first valid block or rewrite wins (block takes priority). """ hook_results = invoke_hook( "pre_tool_call", @@ -1196,16 +1195,56 @@ def get_pre_tool_call_block_message( tool_call_id=tool_call_id, ) + block_message: Optional[str] = None + rewritten_args: Optional[Dict[str, Any]] = None + for result in hook_results: if not isinstance(result, dict): continue - if result.get("action") != "block": - continue - message = result.get("message") - if isinstance(message, str) and message: - return message + action = result.get("action") + + if action == "block" and block_message is None: + message = result.get("message") + if isinstance(message, str) and message: + block_message = message + + elif action == "rewrite" and rewritten_args is None: + new_args = result.get("args") + if isinstance(new_args, dict): + rewritten_args = new_args + + return block_message, rewritten_args + + +# Backward-compat aliases β€” keep these so existing callers don't break. +# New code should use get_pre_tool_call_directives() which fires the +# hook once instead of twice. +def get_pre_tool_call_block_message( + tool_name: str, + args: Optional[Dict[str, Any]], + task_id: str = "", + session_id: str = "", + tool_call_id: str = "", +) -> Optional[str]: + """Backward-compat: returns block message only. Fires hook each call.""" + block_msg, _ = get_pre_tool_call_directives( + tool_name, args, task_id, session_id, tool_call_id, + ) + return block_msg + - return None +def get_pre_tool_call_rewrite( + tool_name: str, + args: Optional[Dict[str, Any]], + task_id: str = "", + session_id: str = "", + tool_call_id: str = "", +) -> Optional[Dict[str, Any]]: + """Backward-compat: returns rewritten args only. Fires hook each call.""" + _, rewritten = get_pre_tool_call_directives( + tool_name, args, task_id, session_id, tool_call_id, + ) + return rewritten def _ensure_plugins_discovered(force: bool = False) -> PluginManager: diff --git a/model_tools.py b/model_tools.py index 2eb31ab0df7f..11bc84c86d7d 100644 --- a/model_tools.py +++ b/model_tools.py @@ -674,33 +674,33 @@ def handle_function_call( if function_name in _AGENT_LOOP_TOOLS: return json.dumps({"error": f"{function_name} must be handled by the agent loop"}) - # Check plugin hooks for a block directive (unless caller already + # Check plugin hooks for pre-tool directives (unless caller already # checked β€” e.g. run_agent._invoke_tool passes skip=True to # avoid double-firing the hook). # # Single-fire contract: pre_tool_call fires exactly once per tool - # execution. get_pre_tool_call_block_message() internally calls + # execution. get_pre_tool_call_directives() internally calls # invoke_hook("pre_tool_call", ...) and returns the first block - # directive (if any), so observer plugins see the hook on that same - # pass. When skip=True, the caller already fired it β€” do nothing - # here. + # directive and/or first rewrite directive from that same pass, so + # observer plugins see one hook invocation. When skip=True, the + # caller already fired it β€” do nothing here. if not skip_pre_tool_call_hook: - block_message: Optional[str] = None try: - from hermes_cli.plugins import get_pre_tool_call_block_message - block_message = get_pre_tool_call_block_message( + from hermes_cli.plugins import get_pre_tool_call_directives + _block_msg, _rewritten = get_pre_tool_call_directives( function_name, function_args, task_id=task_id or "", session_id=session_id or "", tool_call_id=tool_call_id or "", ) + if _block_msg is not None: + return json.dumps({"error": _block_msg}, ensure_ascii=False) + if _rewritten is not None: + function_args = _rewritten except Exception: pass - if block_message is not None: - return json.dumps({"error": block_message}, ensure_ascii=False) - # Notify the read-loop tracker when a non-read/search tool runs, # so the *consecutive* counter resets (reads after other work are fine). if function_name not in _READ_SEARCH_TOOLS: diff --git a/plugins/b00t/__init__.py b/plugins/b00t/__init__.py new file mode 100644 index 000000000000..58f98a2e38d9 --- /dev/null +++ b/plugins/b00t/__init__.py @@ -0,0 +1,153 @@ +"""b00t guard interposition plugin. + +Intercepts terminal tool calls and routes them through ``b00t hive run --dry-run`` +for guard evaluation. Handles: + +- **warn + redirect** (🦨): rewrites the command (e.g. pip β†’ uv pip) +- **block** (πŸš«πŸ’©): returns an error message, command never executes +- **pass** (no emoji): lets the command through unchanged + +Config: + Set ``terminal.backend: local`` in config.yaml β€” b00t intercepts at + the plugin level, not as a full environment backend replacement. +""" + +from __future__ import annotations + +import json +import logging +import sys +import subprocess +from typing import Any + +logger = logging.getLogger(__name__) + +# Emoji markers produced by b00t hive run guards +_BLOCK_EMOJIS = {"🚫", "πŸ’©"} +_WARN_EMOJI = "🦨" +_B00T_EMOJI = "πŸ₯Ύ" # b00t identity marker for all interposition output + + +def _run_b00t_guard(cmd: str) -> dict[str, Any]: + """Run ``b00t hive run --dry-run `` and parse the result. + + Returns: + {"action": "pass"} β€” no guard matched + {"action": "warn", "message": ..., + "redirect": "rewritten cmd"} β€” guard warned, may redirect + {"action": "block", "message": ...} β€” guard blocked + """ + try: + result = subprocess.run( + ["b00t-cli", "hive", "run", "--dry-run", "--", cmd], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=10, + ) + # Forward guard output to user's stderr + capture for parsing + if result.stderr: + sys.stderr.write(result.stderr) + sys.stderr.flush() + for line in result.stderr.splitlines(): + if line.strip(): + logger.info("%s", line.strip()) + except FileNotFoundError: + logger.debug("b00t-cli not found β€” guard interposition disabled") + return {"action": "pass"} + except subprocess.TimeoutExpired: + logger.warning("%s b00t guard timed out for cmd: %.60s", _B00T_EMOJI, cmd) + return {"action": "pass"} + + stdout = result.stdout or "" + stderr = result.stderr or "" + + # Try JSONL parsing first (structured contract, v2) + for line in stdout.split("\n"): + stripped = line.strip() + if stripped.startswith("{"): + try: + j = json.loads(stripped) + action = j.get("action", "pass") + if action == "block": + msg = j.get("message", "") or j.get("error", "blocked by guard") + return {"action": "block", "message": msg} + if action == "warn": + return {"action": "warn", "message": j.get("message", ""), "redirect": j.get("redirect")} + if action == "pass": + return {"action": "pass"} + except (json.JSONDecodeError, KeyError): + continue # fall through to emoji scraping + + # Fallback: emoji scraping from combined output (v1 compat) + combined = stdout + stderr + + for emoji in _BLOCK_EMOJIS: + if emoji in combined: + msg = "" + for line in combined.split("\n"): + if emoji in line: + msg = line.strip() + break + return {"action": "block", "message": msg or "blocked by guard"} + + if _WARN_EMOJI in combined: + redirect = None + msg = "" + for line in combined.split("\n"): + stripped = line.strip() + if _WARN_EMOJI in stripped: + msg = stripped + if "suggested:" in stripped.lower() or "redirect" in stripped.lower(): + parts = stripped.split(":", 1) + if len(parts) > 1: + redirect = parts[1].strip() + return {"action": "warn", "message": msg, "redirect": redirect} + + return {"action": "pass"} + + +def _b00t_pre_tool_hook( + tool_name: str, + args: dict[str, Any] | None, + **kwargs: Any, +) -> dict[str, Any] | None: + """Intercept terminal tool calls and route through b00t guards.""" + if tool_name != "terminal": + return None # only intercept terminal commands + + if not args: + return None + + cmd = args.get("command", "") + if not cmd: + return None + + guard_result = _run_b00t_guard(cmd) + + if guard_result["action"] == "block": + return { + "action": "block", + "message": guard_result.get( + "message", "Command blocked by b00t guard" + ), + } + + if guard_result["action"] == "warn": + redirect = guard_result.get("redirect") + if redirect: + # Rewrite the command to the redirected version + new_args = dict(args) + new_args["command"] = redirect + return {"action": "rewrite", "args": new_args} + # Warn only β€” let it through, message is logged by guard output + return None + + # Pass through + return None + + +def register(ctx) -> None: + """Register the b00t guard interposition hook.""" + ctx.register_hook("pre_tool_call", _b00t_pre_tool_hook) + logger.info("%s b00t guard interposition plugin registered", _B00T_EMOJI) diff --git a/plugins/b00t/plugin.yaml b/plugins/b00t/plugin.yaml new file mode 100644 index 000000000000..22674bb1f69f --- /dev/null +++ b/plugins/b00t/plugin.yaml @@ -0,0 +1,9 @@ +name: b00t +version: 1.0.0 +description: "b00t guard interposition plugin β€” routes terminal commands through b00t hive run guards before execution. Intercepts pipβ†’uv, dockerβ†’podman, main-branch protection, conventional commit enforcement, and custom Rhai guard rules." +author: PromptExecution +kind: hook +platforms: + - linux +hooks: + - pre_tool_call diff --git a/plugins/h3rmes_capability/__init__.py b/plugins/h3rmes_capability/__init__.py new file mode 100644 index 000000000000..cb600314ce68 --- /dev/null +++ b/plugins/h3rmes_capability/__init__.py @@ -0,0 +1,260 @@ +""" +h3rmes Capability Monitor plugin. + +Detects missing subsystems (irontology-mcp, codebase-memory, b00t-mcp) +at session start and dispatches operator sub-agents to auto-remediate. + +Architecture: + Executive (this plugin) β†’ operator dispatch β†’ sub-agent fix β†’ re-check + +Usage: + On session start, performs a capability check. If any known capability + is missing, it logs the gap and (if auto-fix is enabled) dispatches a + sub-agent via delegate_task to remediate. + +Config (in ~/.hermes/config.yaml): + h3rmes: + auto_fix: true # auto-dispatch fixes on capability gap + check_interval: 3600 # min seconds between checks +""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import subprocess +import time +from typing import Any + +logger = logging.getLogger(__name__) + +# ── Known capability registry ───────────────────────────────────────────────── +# Each entry: (name, check_fn, fix_cmd, description) +# check_fn: returns True if capability is operational +# fix_cmd: shell command to install/remediate (None if no auto-fix available) + +_CAPABILITIES: list[dict[str, Any]] = [] + + +def _find_b00t_cli() -> str | None: + return shutil.which("b00t-cli") or shutil.which("b00t") + + +def _check_b00t_mcp() -> bool: + """Check if b00t-mcp binary exists and Hermes has it in config.""" + cli = _find_b00t_cli() + if not cli: + return False + # Check binary exists + bin_path = shutil.which("b00t-mcp") + if bin_path: + return True + # Check if it's registered in config + config_path = os.path.expanduser("~/.hermes/config.yaml") + if os.path.exists(config_path): + with open(config_path) as f: + return "b00t-mcp" in f.read() + return False + + +def _check_codebase_memory() -> bool: + """Check if codebase-memory-mcp binary exists.""" + paths = [ + os.path.expanduser("~/.b00t/vendor/codebase-memory-mcp-b00t-ir0n-ledg3rr/build/c/codebase-memory-mcp"), + shutil.which("codebase-memory-mcp") or "", + ] + for p in paths: + if p and os.path.isfile(p) and os.access(p, os.X_OK): + return True + return False + + +def _check_irontology_mcp() -> bool: + """Check if irontology-mcp binary exists.""" + paths = [ + os.path.expanduser("~/.b00t/target/release/irontology-mcp"), + shutil.which("irontology-mcp") or "", + ] + for p in paths: + if p and os.path.isfile(p) and os.access(p, os.X_OK): + return True + # Also check if it's registered in config + config_path = os.path.expanduser("~/.hermes/config.yaml") + if os.path.exists(config_path): + with open(config_path) as f: + if "irontology-mcp" in f.read(): + return True + return False + + +def _check_guard_plugin() -> bool: + """Check if b00t guard interposition plugin is installed.""" + plugin_dir = os.path.expanduser("~/.b00t/vendor/hermes-agent-b00t/plugins/b00t") + if os.path.isdir(plugin_dir): + return os.path.isfile(os.path.join(plugin_dir, "__init__.py")) + return False + + +def _check_b00t_cli() -> bool: + """Check if b00t-cli is installed.""" + return _find_b00t_cli() is not None + + +def _init_capabilities(): + """Populate the capability registry once.""" + global _CAPABILITIES + if _CAPABILITIES: + return + + b00t_root = os.path.expanduser("~/.b00t") + + _CAPABILITIES = [ + { + "name": "b00t-cli", + "description": "b00t CLI tool β€” hive, grok, task management", + "check": _check_b00t_cli, + "fix": f"cd {b00t_root} && cargo install --path b00t-cli --force", + "severity": "critical", + }, + { + "name": "b00t-mcp", + "description": "b00t MCP server β€” guard interposition, agent context", + "check": _check_b00t_mcp, + "fix": f"cd {b00t_root} && cargo install --path b00t-mcp --force && b00t-cli mcp install b00t-mcp hermes", + "severity": "high", + }, + { + "name": "guard-plugin", + "description": "b00t guard interposition plugin in Hermes", + "check": _check_guard_plugin, + "fix": None, # must update submodule + "severity": "medium", + }, + { + "name": "irontology-mcp", + "description": "Knowledge graph MCP server β€” 4-way fusion retrieval", + "check": _check_irontology_mcp, + "fix": f"cd {b00t_root} && cargo build --release -p mcp-server --manifest-path vendor/irontology-mcp/Cargo.toml", + "severity": "high", + }, + { + "name": "codebase-memory", + "description": "Code knowledge graph β€” search_graph, trace_path, architecture", + "check": _check_codebase_memory, + "fix": None, # must build from source + "severity": "medium", + }, + ] + + +def _run_capability_check() -> list[dict]: + """Run all capability checks, return list of gaps.""" + _init_capabilities() + gaps = [] + for cap in _CAPABILITIES: + try: + ok = cap["check"]() + status = "βœ“" if ok else "βœ—" + logger.info("[h3rmes-cap] %s %s", status, cap["name"]) + if not ok: + gaps.append(cap) + except Exception as e: + logger.warning("[h3rmes-cap] ? %s (check error: %s)", cap["name"], e) + gaps.append({**cap, "check_error": str(e)}) + return gaps + + +def _dispatch_fix(cap: dict) -> dict: + """Dispatch a fix for a missing capability. + + Returns a dict with action result. Since we're in a plugin hook + and can't call delegate_task directly, we log the fix command + for the agent to pick up. + """ + fix_cmd = cap.get("fix") + if not fix_cmd: + msg = f"no auto-fix available for {cap['name']}" + logger.info("[h3rmes-cap] %s", msg) + return {"action": "notify", "message": msg} + + logger.info("[h3rmes-cap] dispatching fix for %s: %s", cap["name"], fix_cmd[:80]) + try: + result = subprocess.run( + fix_cmd, shell=True, capture_output=True, text=True, timeout=120 + ) + if result.returncode == 0: + logger.info("[h3rmes-cap] fix applied for %s", cap["name"]) + return {"action": "fixed", "name": cap["name"]} + else: + logger.warning( + "[h3rmes-cap] fix failed for %s: %s", + cap["name"], result.stderr[:200], + ) + return {"action": "failed", "name": cap["name"], "error": result.stderr[:200]} + except subprocess.TimeoutExpired: + logger.warning("[h3rmes-cap] fix timed out for %s", cap["name"]) + return {"action": "timeout", "name": cap["name"]} + except Exception as e: + logger.warning("[h3rmes-cap] fix error for %s: %s", cap["name"], e) + return {"action": "error", "name": cap["name"], "error": str(e)} + + +def on_session_start(ctx) -> None: + """Hook: run capability check at session start, auto-fix if configured.""" + _init_capabilities() + + # Check if auto-fix is enabled + auto_fix = getattr(ctx, "config", {}).get("h3rmes", {}).get("auto_fix", False) + + last_check_file = os.path.expanduser("~/.hermes/h3rmes-last-check.json") + check_interval = getattr(ctx, "config", {}).get("h3rmes", {}).get("check_interval", 3600) + + # Throttle: don't check more often than interval + now = time.time() + if os.path.exists(last_check_file): + try: + with open(last_check_file) as f: + last_check = json.load(f).get("timestamp", 0) + if now - last_check < check_interval: + logger.debug("[h3rmes-cap] throttled (last check %ds ago)", int(now - last_check)) + return + except Exception: + pass + + # Run capability check + gaps = _run_capability_check() + + # Save last check timestamp + os.makedirs(os.path.dirname(last_check_file), exist_ok=True) + with open(last_check_file, "w") as f: + json.dump({"timestamp": now, "gaps": [g["name"] for g in gaps]}, f) + + if not gaps: + logger.info("[h3rmes-cap] all capabilities βœ“") + return + + # Log gaps + gap_names = ", ".join(g["name"] for g in gaps) + logger.info("[h3rmes-cap] capability gaps: %s", gap_names) + + # Auto-fix mode: dispatch fixes for critical/high gaps + if auto_fix: + for cap in gaps: + if cap.get("severity") in ("critical", "high"): + result = _dispatch_fix(cap) + if result["action"] != "fixed": + logger.warning("[h3rmes-cap] %s: %s", cap["name"], result.get("error", "unknown")) + else: + # Notify: log the fix commands so the agent can offer + logger.info("[h3rmes-cap] auto-fix disabled; %d gap(s) detected", len(gaps)) + for cap in gaps: + if cap.get("fix"): + logger.info("[h3rmes-cap] %s: %s", cap["name"], cap["fix"][:100]) + + +def register(ctx) -> None: + """Register the h3rmes capability monitor hook.""" + ctx.register_hook("on_session_start", on_session_start) + logger.info("πŸ₯Ύ h3rmes capability monitor plugin registered") diff --git a/plugins/h3rmes_capability/plugin.yaml b/plugins/h3rmes_capability/plugin.yaml new file mode 100644 index 000000000000..a747a3094002 --- /dev/null +++ b/plugins/h3rmes_capability/plugin.yaml @@ -0,0 +1,9 @@ +name: h3rmes-capability +version: 1.0.0 +description: "h3rmes capability monitor β€” detects missing subsystems (irontology-mcp, codebase-memory) on session start and dispatches operator sub-agents to auto-remediate. Self-healing CMDB for h3rmes." +author: PromptExecution +kind: hook +platforms: + - linux +hooks: + - on_session_start diff --git a/run_agent.py b/run_agent.py index aac067ed4e85..b5d4ca6bbd39 100644 --- a/run_agent.py +++ b/run_agent.py @@ -9270,18 +9270,20 @@ def _invoke_tool(self, function_name: str, function_args: dict, effective_task_i tools. Used by the concurrent execution path; the sequential path retains its own inline invocation for backward-compatible display handling. """ - # Check plugin hooks for a block directive before executing anything. - block_message: Optional[str] = None + # Check plugin hooks for directives before executing anything. + # Fires pre_tool_call hook ONCE, returns (block_message, rewritten_args). if not pre_tool_block_checked: try: - from hermes_cli.plugins import get_pre_tool_call_block_message - block_message = get_pre_tool_call_block_message( + from hermes_cli.plugins import get_pre_tool_call_directives + _block_msg, _rewritten = get_pre_tool_call_directives( function_name, function_args, task_id=effective_task_id or "", ) + if _block_msg is not None: + return json.dumps({"error": _block_msg}, ensure_ascii=False) + if _rewritten is not None: + function_args = _rewritten except Exception: pass - if block_message is not None: - return json.dumps({"error": block_message}, ensure_ascii=False) if function_name == "todo": from tools.todo_tool import todo_tool as _todo_tool @@ -9433,17 +9435,28 @@ def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effe block_result = None blocked_by_guardrail = False + guardrails_checked = False + _block_msg = None + _rewritten = None try: - from hermes_cli.plugins import get_pre_tool_call_block_message - block_message = get_pre_tool_call_block_message( + from hermes_cli.plugins import get_pre_tool_call_directives + _block_msg, _rewritten = get_pre_tool_call_directives( function_name, function_args, task_id=effective_task_id or "", ) + if _block_msg is not None: + block_result = json.dumps({"error": _block_msg}, ensure_ascii=False) + elif _rewritten is not None: + function_args = _rewritten + # Re-check guardrails with rewritten args + guardrail_decision = self._tool_guardrails.before_call(function_name, function_args) + guardrails_checked = True + if not guardrail_decision.allows_execution: + block_result = self._guardrail_block_result(guardrail_decision) + blocked_by_guardrail = True except Exception: - block_message = None + pass - if block_message is not None: - block_result = json.dumps({"error": block_message}, ensure_ascii=False) - else: + if block_result is None and _block_msg is None and not guardrails_checked: guardrail_decision = self._tool_guardrails.before_call(function_name, function_args) if not guardrail_decision.allows_execution: block_result = self._guardrail_block_result(guardrail_decision) @@ -9783,10 +9796,12 @@ def _execute_tool_calls_sequential(self, assistant_message, messages: list, effe # Check plugin hooks for a block directive before executing. _block_msg: Optional[str] = None try: - from hermes_cli.plugins import get_pre_tool_call_block_message - _block_msg = get_pre_tool_call_block_message( + from hermes_cli.plugins import get_pre_tool_call_directives + _block_msg, _rewritten = get_pre_tool_call_directives( function_name, function_args, task_id=effective_task_id or "", ) + if _block_msg is None and _rewritten is not None: + function_args = _rewritten except Exception: pass diff --git a/tests/hermes_cli/test_plugins.py b/tests/hermes_cli/test_plugins.py index 0c2a4a884259..6646d4196aad 100644 --- a/tests/hermes_cli/test_plugins.py +++ b/tests/hermes_cli/test_plugins.py @@ -20,6 +20,7 @@ get_plugin_manager, get_plugin_command_handler, get_plugin_commands, + get_pre_tool_call_directives, get_pre_tool_call_block_message, resolve_plugin_command_result, discover_plugins, @@ -536,6 +537,41 @@ def test_first_valid_block_wins(self, monkeypatch): ) assert get_pre_tool_call_block_message("terminal", {}) == "first blocker" + def test_rewrite_only_directive(self, monkeypatch): + monkeypatch.setattr( + "hermes_cli.plugins.invoke_hook", + lambda hook_name, **kwargs: [ + {"action": "rewrite", "args": {"command": "uv pip install pytest"}}, + ], + ) + block_message, rewritten = get_pre_tool_call_directives("terminal", {"command": "pip install pytest"}) + assert block_message is None + assert rewritten == {"command": "uv pip install pytest"} + + def test_rewrite_then_block_block_wins(self, monkeypatch): + monkeypatch.setattr( + "hermes_cli.plugins.invoke_hook", + lambda hook_name, **kwargs: [ + {"action": "rewrite", "args": {"command": "uv pip install pytest"}}, + {"action": "block", "message": "no installs allowed"}, + ], + ) + block_message, rewritten = get_pre_tool_call_directives("terminal", {"command": "pip install pytest"}) + assert block_message == "no installs allowed" + assert rewritten == {"command": "uv pip install pytest"} + + def test_malformed_rewrite_payload_ignored(self, monkeypatch): + monkeypatch.setattr( + "hermes_cli.plugins.invoke_hook", + lambda hook_name, **kwargs: [ + {"action": "rewrite", "args": "not a dict"}, + {"action": "rewrite", "args": None}, + ], + ) + block_message, rewritten = get_pre_tool_call_directives("terminal", {"command": "pip install pytest"}) + assert block_message is None + assert rewritten is None + # ── TestPluginContext ────────────────────────────────────────────────────── diff --git a/tests/plugins/test_b00t_plugin.py b/tests/plugins/test_b00t_plugin.py new file mode 100644 index 000000000000..ae33f327bf2f --- /dev/null +++ b/tests/plugins/test_b00t_plugin.py @@ -0,0 +1,64 @@ +"""Tests for the b00t guard interposition plugin.""" + +from __future__ import annotations + +import importlib +import io +import subprocess +import sys + + +def _fresh_b00t(): + sys.modules.pop("plugins.b00t", None) + return importlib.import_module("plugins.b00t") + + +def test_warn_jsonl_rewrites_terminal_command(monkeypatch): + mod = _fresh_b00t() + + def _fake_run(*args, **kwargs): + return subprocess.CompletedProcess( + args=args, + returncode=0, + stdout='{"action":"warn","message":"use uv","redirect":"uv pip install pytest"}\n', + stderr="", + ) + + monkeypatch.setattr(mod.subprocess, "run", _fake_run) + result = mod._b00t_pre_tool_hook("terminal", {"command": "pip install pytest"}) + assert result == {"action": "rewrite", "args": {"command": "uv pip install pytest"}} + + +def test_block_jsonl_returns_block_directive(monkeypatch): + mod = _fresh_b00t() + + def _fake_run(*args, **kwargs): + return subprocess.CompletedProcess( + args=args, + returncode=0, + stdout='{"action":"block","message":"blocked by policy"}\n', + stderr="", + ) + + monkeypatch.setattr(mod.subprocess, "run", _fake_run) + result = mod._b00t_pre_tool_hook("terminal", {"command": "rm -rf /"}) + assert result == {"action": "block", "message": "blocked by policy"} + + +def test_non_json_output_falls_back_to_emoji_scrape_and_forwards_stderr(monkeypatch): + mod = _fresh_b00t() + captured_stderr = io.StringIO() + monkeypatch.setattr(mod.sys, "stderr", captured_stderr) + + def _fake_run(*args, **kwargs): + return subprocess.CompletedProcess( + args=args, + returncode=0, + stdout="not-json\n", + stderr="🦨 guard warning\nsuggested: uv pip install pytest\n", + ) + + monkeypatch.setattr(mod.subprocess, "run", _fake_run) + result = mod._b00t_pre_tool_hook("terminal", {"command": "pip install pytest"}) + assert result == {"action": "rewrite", "args": {"command": "uv pip install pytest"}} + assert "🦨 guard warning" in captured_stderr.getvalue() diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 55ce86e51afd..fc7c5c230c03 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -2007,6 +2007,32 @@ def test_concurrent_tool_callbacks_fire_for_each_tool(self, agent): assert {entry[0] for entry in completes} == {"c1", "c2"} assert {entry[3] for entry in completes} == {'{"id":1}', '{"id":2}'} + def test_concurrent_rewrite_runs_guardrail_once(self, agent, monkeypatch): + tc1 = _mock_tool_call(name="web_search", arguments='{"query":"one"}', call_id="c1") + mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1]) + messages = [] + + monkeypatch.setattr( + "hermes_cli.plugins.get_pre_tool_call_directives", + lambda *args, **kwargs: (None, {"query": "rewritten"}), + ) + + before_call_args = [] + agent._tool_guardrails = SimpleNamespace( + before_call=lambda name, args: ( + before_call_args.append((name, args.copy())), + SimpleNamespace(allows_execution=True), + )[1], + after_call=lambda *args, **kwargs: SimpleNamespace(action="allow", should_halt=False), + ) + + with patch("run_agent.handle_function_call", return_value='{"ok": true}') as mock_handle: + agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1") + + assert before_call_args == [("web_search", {"query": "rewritten"})] + mock_handle.assert_called_once() + assert mock_handle.call_args.kwargs["skip_pre_tool_call_hook"] is True + def test_invoke_tool_handles_agent_level_tools(self, agent): """_invoke_tool should handle todo tool directly.""" with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo: