Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5915,6 +5915,43 @@ def _handle_personality_command(self, cmd: str):
print(" Usage: /personality <name>")
print()

def _handle_b00t_command(self, cmd: str):
"""Handle /b00t command — route to b00t-cli for execution."""
import shlex, subprocess, textwrap

# Strip command prefix, get args
parts = shlex.split(cmd)
# parts[0] is "/b00t" or "b00t"
b00t_args = parts[1:] if len(parts) > 1 else ["--help"]

# Check b00t-cli exists
b00t_bin = shutil.which("b00t-cli") or shutil.which("b00t")
if not b00t_bin:
print("🥾 b00t-cli not found in PATH")
print(" Install: cd ~/.b00t && just install")
print(" Or: cargo install --path ~/.b00t/b00t-cli --force")
return

try:
result = subprocess.run(
[b00t_bin] + b00t_args,
capture_output=True, text=True, timeout=30,
)
if result.stdout:
# Strip trailing whitespace, present cleanly
out = result.stdout.rstrip()
_cprint(_rich_text_from_ansi(out))
if result.stderr:
err = result.stderr.rstrip()
if err:
_cprint(f"[dim]{err}[/]")
if result.returncode != 0:
_cprint(f"[bold red]🥾 b00t exited {result.returncode}[/]")
except subprocess.TimeoutExpired:
_cprint("[bold red]🥾 b00t command timed out (30s)[/]")
except Exception as e:
_cprint(f"[bold red]🥾 b00t error: {e}[/]")

def _handle_cron_command(self, cmd: str):
"""Handle the /cron command to manage scheduled tasks."""
import shlex
Expand Down Expand Up @@ -6521,6 +6558,11 @@ def process_command(self, command: str) -> bool:
print(f" {status} {p['name']}{version}{detail}{error}")
except Exception as e:
print(f"Plugin system error: {e}")
elif canonical == "b00t":
self._handle_b00t_command(cmd_original)
elif canonical == "hive":
# /hive is an alias for /b00t hive
self._handle_b00t_command(f"/b00t hive {cmd_original.split(None, 1)[1] if len(cmd_original.split(None, 1)) > 1 else ''}".strip())
elif canonical == "rollback":
self._handle_rollback_command(cmd_original)
elif canonical == "snapshot":
Expand Down
6 changes: 6 additions & 0 deletions hermes_cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ class CommandDef:
CommandDef("plugins", "List installed plugins and their status",
"Tools & Skills", cli_only=True),

# b00t Integration (PromptExecution fork)
CommandDef("b00t", "Run a b00t command (hive, grok, learn, task, etc.)", "b00t",
cli_only=True, args_hint="<command> [args...]"),
CommandDef("hive", "Alias for b00t hive — system state CMDB", "b00t",
cli_only=True, args_hint="[status|list|plan|activate|peers]"),

# Info
CommandDef("commands", "Browse all commands and skills (paginated)", "Info",
gateway_only=True, args_hint="[page]"),
Expand Down
69 changes: 54 additions & 15 deletions hermes_cli/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -1169,23 +1169,22 @@ def invoke_hook(hook_name: str, **kwargs: Any) -> List[Any]:



def get_pre_tool_call_block_message(
def get_pre_tool_call_directives(
tool_name: str,
args: Optional[Dict[str, Any]],
task_id: str = "",
session_id: str = "",
tool_call_id: str = "",
) -> Optional[str]:
"""Check ``pre_tool_call`` hooks for a blocking directive.
) -> tuple[Optional[str], Optional[Dict[str, Any]]]:
"""Fire ``pre_tool_call`` hook ONCE, return (block_message, rewritten_args).

Plugins that need to enforce policy (rate limiting, security
restrictions, approval workflows) can return::
Plugins can return either a block or rewrite directive::

{"action": "block", "message": "Reason the tool was blocked"}
{"action": "block", "message": "Reason"}
{"action": "rewrite", "args": {<modified tool args>}}

from their ``pre_tool_call`` callback. The first valid block
directive wins. Invalid or irrelevant hook return values are
silently ignored so existing observer-only hooks are unaffected.
Observer-only hooks returning ``None`` are unaffected.
The first valid block or rewrite wins (block takes priority).
"""
hook_results = invoke_hook(
"pre_tool_call",
Expand All @@ -1196,16 +1195,56 @@ def get_pre_tool_call_block_message(
tool_call_id=tool_call_id,
)

block_message: Optional[str] = None
rewritten_args: Optional[Dict[str, Any]] = None

for result in hook_results:
if not isinstance(result, dict):
continue
if result.get("action") != "block":
continue
message = result.get("message")
if isinstance(message, str) and message:
return message
action = result.get("action")

if action == "block" and block_message is None:
message = result.get("message")
if isinstance(message, str) and message:
block_message = message

elif action == "rewrite" and rewritten_args is None:
new_args = result.get("args")
if isinstance(new_args, dict):
rewritten_args = new_args

return block_message, rewritten_args
Comment on lines 1172 to +1216


# Backward-compat aliases — keep these so existing callers don't break.
# New code should use get_pre_tool_call_directives() which fires the
# hook once instead of twice.
def get_pre_tool_call_block_message(
tool_name: str,
args: Optional[Dict[str, Any]],
task_id: str = "",
session_id: str = "",
tool_call_id: str = "",
) -> Optional[str]:
"""Backward-compat: returns block message only. Fires hook each call."""
block_msg, _ = get_pre_tool_call_directives(
tool_name, args, task_id, session_id, tool_call_id,
)
return block_msg


return None
def get_pre_tool_call_rewrite(
tool_name: str,
args: Optional[Dict[str, Any]],
task_id: str = "",
session_id: str = "",
tool_call_id: str = "",
) -> Optional[Dict[str, Any]]:
"""Backward-compat: returns rewritten args only. Fires hook each call."""
_, rewritten = get_pre_tool_call_directives(
tool_name, args, task_id, session_id, tool_call_id,
)
return rewritten


def _ensure_plugins_discovered(force: bool = False) -> PluginManager:
Expand Down
22 changes: 11 additions & 11 deletions model_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,33 +674,33 @@ def handle_function_call(
if function_name in _AGENT_LOOP_TOOLS:
return json.dumps({"error": f"{function_name} must be handled by the agent loop"})

# Check plugin hooks for a block directive (unless caller already
# Check plugin hooks for pre-tool directives (unless caller already
# checked — e.g. run_agent._invoke_tool passes skip=True to
# avoid double-firing the hook).
#
# Single-fire contract: pre_tool_call fires exactly once per tool
# execution. get_pre_tool_call_block_message() internally calls
# execution. get_pre_tool_call_directives() internally calls
# invoke_hook("pre_tool_call", ...) and returns the first block
# directive (if any), so observer plugins see the hook on that same
# pass. When skip=True, the caller already fired it — do nothing
# here.
# directive and/or first rewrite directive from that same pass, so
# observer plugins see one hook invocation. When skip=True, the
# caller already fired it — do nothing here.
if not skip_pre_tool_call_hook:
block_message: Optional[str] = None
try:
from hermes_cli.plugins import get_pre_tool_call_block_message
block_message = get_pre_tool_call_block_message(
from hermes_cli.plugins import get_pre_tool_call_directives
_block_msg, _rewritten = get_pre_tool_call_directives(
function_name,
function_args,
task_id=task_id or "",
session_id=session_id or "",
tool_call_id=tool_call_id or "",
)
if _block_msg is not None:
return json.dumps({"error": _block_msg}, ensure_ascii=False)
if _rewritten is not None:
function_args = _rewritten
except Exception:
pass

if block_message is not None:
return json.dumps({"error": block_message}, ensure_ascii=False)

# Notify the read-loop tracker when a non-read/search tool runs,
# so the *consecutive* counter resets (reads after other work are fine).
if function_name not in _READ_SEARCH_TOOLS:
Expand Down
153 changes: 153 additions & 0 deletions plugins/b00t/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""b00t guard interposition plugin.

Intercepts terminal tool calls and routes them through ``b00t hive run --dry-run``
for guard evaluation. Handles:

- **warn + redirect** (🦨): rewrites the command (e.g. pip → uv pip)
- **block** (🚫💩): returns an error message, command never executes
- **pass** (no emoji): lets the command through unchanged

Config:
Set ``terminal.backend: local`` in config.yaml — b00t intercepts at
the plugin level, not as a full environment backend replacement.
"""

from __future__ import annotations

import json
import logging
import sys
import subprocess
from typing import Any

logger = logging.getLogger(__name__)

# Emoji markers produced by b00t hive run guards
_BLOCK_EMOJIS = {"🚫", "💩"}
_WARN_EMOJI = "🦨"
_B00T_EMOJI = "🥾" # b00t identity marker for all interposition output


def _run_b00t_guard(cmd: str) -> dict[str, Any]:
"""Run ``b00t hive run --dry-run <cmd>`` and parse the result.

Returns:
{"action": "pass"} — no guard matched
{"action": "warn", "message": ...,
"redirect": "rewritten cmd"} — guard warned, may redirect
{"action": "block", "message": ...} — guard blocked
"""
try:
result = subprocess.run(
["b00t-cli", "hive", "run", "--dry-run", "--", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=10,
)
# Forward guard output to user's stderr + capture for parsing
if result.stderr:
sys.stderr.write(result.stderr)
sys.stderr.flush()
for line in result.stderr.splitlines():
if line.strip():
logger.info("%s", line.strip())
except FileNotFoundError:
logger.debug("b00t-cli not found — guard interposition disabled")
return {"action": "pass"}
except subprocess.TimeoutExpired:
logger.warning("%s b00t guard timed out for cmd: %.60s", _B00T_EMOJI, cmd)
return {"action": "pass"}

stdout = result.stdout or ""
stderr = result.stderr or ""

# Try JSONL parsing first (structured contract, v2)
for line in stdout.split("\n"):
stripped = line.strip()
if stripped.startswith("{"):
try:
j = json.loads(stripped)
action = j.get("action", "pass")
if action == "block":
msg = j.get("message", "") or j.get("error", "blocked by guard")
return {"action": "block", "message": msg}
if action == "warn":
return {"action": "warn", "message": j.get("message", ""), "redirect": j.get("redirect")}
if action == "pass":
return {"action": "pass"}
except (json.JSONDecodeError, KeyError):
continue # fall through to emoji scraping

# Fallback: emoji scraping from combined output (v1 compat)
combined = stdout + stderr

for emoji in _BLOCK_EMOJIS:
if emoji in combined:
msg = ""
for line in combined.split("\n"):
if emoji in line:
msg = line.strip()
break
return {"action": "block", "message": msg or "blocked by guard"}

if _WARN_EMOJI in combined:
redirect = None
msg = ""
for line in combined.split("\n"):
stripped = line.strip()
if _WARN_EMOJI in stripped:
msg = stripped
if "suggested:" in stripped.lower() or "redirect" in stripped.lower():
parts = stripped.split(":", 1)
if len(parts) > 1:
redirect = parts[1].strip()
return {"action": "warn", "message": msg, "redirect": redirect}

return {"action": "pass"}
Comment on lines +31 to +107


def _b00t_pre_tool_hook(
tool_name: str,
args: dict[str, Any] | None,
**kwargs: Any,
) -> dict[str, Any] | None:
"""Intercept terminal tool calls and route through b00t guards."""
if tool_name != "terminal":
return None # only intercept terminal commands

if not args:
return None

cmd = args.get("command", "")
if not cmd:
return None

guard_result = _run_b00t_guard(cmd)

if guard_result["action"] == "block":
return {
"action": "block",
"message": guard_result.get(
"message", "Command blocked by b00t guard"
),
}

if guard_result["action"] == "warn":
redirect = guard_result.get("redirect")
if redirect:
# Rewrite the command to the redirected version
new_args = dict(args)
new_args["command"] = redirect
return {"action": "rewrite", "args": new_args}
# Warn only — let it through, message is logged by guard output
return None

# Pass through
return None


def register(ctx) -> None:
"""Register the b00t guard interposition hook."""
ctx.register_hook("pre_tool_call", _b00t_pre_tool_hook)
logger.info("%s b00t guard interposition plugin registered", _B00T_EMOJI)
9 changes: 9 additions & 0 deletions plugins/b00t/plugin.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: b00t
version: 1.0.0
description: "b00t guard interposition plugin — routes terminal commands through b00t hive run guards before execution. Intercepts pip→uv, docker→podman, main-branch protection, conventional commit enforcement, and custom Rhai guard rules."
author: PromptExecution
kind: hook
platforms:
- linux
hooks:
- pre_tool_call
Loading