diff --git a/docs/adr/0001-prompt-session-unifies-rule-not-signals.md b/docs/adr/0001-prompt-session-unifies-rule-not-signals.md new file mode 100644 index 0000000..8fbd503 --- /dev/null +++ b/docs/adr/0001-prompt-session-unifies-rule-not-signals.md @@ -0,0 +1,9 @@ +# Prompt Session unifies the scan-first rule, not the scan-state signals + +The scan-first gate was duplicated across the CLI and MCP adapters, and the two had drifted: the CLI gate is config-gated and honors a `FORERUNNER_SCAN_DONE` escape hatch, while MCP gates unconditionally with no escape; the CLI validates task existence against the Task Registry while MCP validated only that a prompt file existed on disk. We introduced a run-scoped Prompt Session that owns the single scan-first *rule* (exempt task → allow; scan satisfied → allow; else deny) and task lookup (via the Task Registry for both adapters), but it does **not** unify the scan-state *signals*: each adapter still computes its own `scan_satisfied` boolean (CLI from `.forerunner/scan.md` ∨ env ∨ absent-config; MCP from `.forerunner/scan.md`, plus an in-process flag set when the `scan` tool runs) and injects it. + +We chose this deliberately because the issue (#49) scoped out changing the scan-first invariant and required preserving `FORERUNNER_SCAN_DONE` for the CLI; fully unifying the signals would change MCP's observable behavior (it would gain config-gating and an env escape) and is deferred to a separate migration. So a future reader who sees CLI and MCP still differing on env/config signals — despite the glossary saying adapters share one policy — should know the divergence in *signal sourcing* is intentional; only the *rule and deny path* were unified here. + +## Status + +accepted diff --git a/src/codeforerunner/cli.py b/src/codeforerunner/cli.py index 9388c19..a7404ab 100644 --- a/src/codeforerunner/cli.py +++ b/src/codeforerunner/cli.py @@ -8,13 +8,21 @@ from pathlib import Path from typing import Sequence -from codeforerunner.bundle import find_prompts_root, resolve_bundle as _resolve_bundle -from codeforerunner.tasks import get as _get_task +from codeforerunner.bundle import find_prompts_root +from codeforerunner.prompt_session import Denial, PromptSession from codeforerunner.tasks import refresh_tasks as _refresh_tasks -from codeforerunner.tasks import scan_exempt_names as _scan_exempt_names SCAN_DONE_ENV = "FORERUNNER_SCAN_DONE" +def _scan_satisfied(repo_root: Path) -> bool: + """CLI scan-first signal: scan artifact present, env override set, or no config to gate.""" + return ( + (repo_root / ".forerunner" / "scan.md").is_file() + or bool(os.environ.get(SCAN_DONE_ENV)) + or not (repo_root / "forerunner.config.yaml").is_file() + ) + + def _get_bundle(args: argparse.Namespace) -> tuple[str, int]: """Resolve bundle for args.task. Returns (bundle_text, exit_code). exit_code != 0 on error.""" try: @@ -23,19 +31,13 @@ def _get_bundle(args: argparse.Namespace) -> tuple[str, int]: print(f"error: {e}", file=sys.stderr) return "", 2 - try: - _get_task(args.task) - except KeyError: - print(f"error: unknown task '{args.task}'", file=sys.stderr) - return "", 2 - repo_root = Path(args.repo).resolve() if args.repo else Path.cwd() - if ( - args.task not in _scan_exempt_names() - and (repo_root / "forerunner.config.yaml").is_file() - and not (repo_root / ".forerunner" / "scan.md").is_file() - and not os.environ.get(SCAN_DONE_ENV) - ): + session = PromptSession(prompts_root, _scan_satisfied(repo_root)) + decision = session.can_run(args.task) + if not decision.allowed: + if decision.reason is Denial.UNKNOWN_TASK: + print(f"error: unknown task '{args.task}'", file=sys.stderr) + return "", 2 print( f"error: scan-first required — run `forerunner scan` first " f"(writes .forerunner/scan.md). Set {SCAN_DONE_ENV}=1 to skip.", @@ -44,7 +46,7 @@ def _get_bundle(args: argparse.Namespace) -> tuple[str, int]: return "", 1 try: - return _resolve_bundle(prompts_root, args.task), 0 + return session.bundle_for(args.task), 0 except FileNotFoundError as e: print(f"error: {e}", file=sys.stderr) return "", 2 diff --git a/src/codeforerunner/mcp_server.py b/src/codeforerunner/mcp_server.py index 1258115..70bb68d 100644 --- a/src/codeforerunner/mcp_server.py +++ b/src/codeforerunner/mcp_server.py @@ -11,9 +11,9 @@ from typing import Any, Iterable from codeforerunner import __version__ as _pkg_version -from codeforerunner.bundle import find_prompts_root, resolve_bundle +from codeforerunner.bundle import find_prompts_root +from codeforerunner.prompt_session import Denial, PromptSession from codeforerunner.tasks import all_tasks as _all_tasks -from codeforerunner.tasks import scan_exempt_names as _scan_exempt_names PROTOCOL_VERSION = "2025-03-26" SERVER_NAME = "codeforerunner" @@ -85,11 +85,11 @@ def _handle(prompts_root: Path, msg: dict[str, Any], state: dict[str, Any]) -> d name = params.get("name") if not isinstance(name, str) or "/" in name or "\\" in name or ".." in name: return _err(req_id, -32602, f"invalid tool name: {name!r}") - task_path = prompts_root / "tasks" / f"{name}.md" - tasks_root = (prompts_root / "tasks").resolve() - if not task_path.resolve().is_relative_to(tasks_root) or not task_path.is_file(): - return _err(req_id, -32602, f"unknown tool: {name!r}") - if name not in _scan_exempt_names() and not state.get("scan_called"): + session = PromptSession(prompts_root, scan_satisfied=bool(state.get("scan_called"))) + decision = session.can_run(name) + if not decision.allowed: + if decision.reason is Denial.UNKNOWN_TASK: + return _err(req_id, -32602, f"unknown tool: {name!r}") return _err( req_id, -32000, @@ -98,7 +98,7 @@ def _handle(prompts_root: Path, msg: dict[str, Any], state: dict[str, Any]) -> d if name == "scan": state["scan_called"] = True try: - text = resolve_bundle(prompts_root, name) + text = session.bundle_for(name) except Exception as e: # pragma: no cover - defensive return _err(req_id, -32603, f"internal error: {e}") return _ok( diff --git a/src/codeforerunner/prompt_session.py b/src/codeforerunner/prompt_session.py new file mode 100644 index 0000000..4f797c5 --- /dev/null +++ b/src/codeforerunner/prompt_session.py @@ -0,0 +1,53 @@ +"""Prompt Session — run-scoped owner of task lookup and scan-first enforcement. + +CLI and MCP are thin adapters: they construct a session, ask whether a task +can run, and translate the structured Decision into their own surface +(stdout/exit codes for CLI, JSON-RPC for MCP). The session unifies the +scan-first *rule*; each adapter still computes its own scan-satisfied signal +and injects it (see docs/adr/0001). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum, auto +from pathlib import Path + +from codeforerunner import tasks +from codeforerunner.bundle import resolve_bundle + + +class Denial(Enum): + UNKNOWN_TASK = auto() + SCAN_REQUIRED = auto() + + +@dataclass(frozen=True) +class Decision: + allowed: bool + task: tasks.Task | None = None + reason: Denial | None = None + message: str | None = None + + +class PromptSession: + def __init__(self, prompts_root: Path, scan_satisfied: bool) -> None: + self._prompts_root = prompts_root + self._scan_done = scan_satisfied + + def mark_scan_done(self) -> None: + """Record that scan ran in this session (MCP adapter, after scan tool).""" + self._scan_done = True + + def can_run(self, name: str) -> Decision: + try: + task = tasks.get(name) + except KeyError: + return Decision(False, reason=Denial.UNKNOWN_TASK, message=f"unknown task: {name!r}") + if task.scan_exempt or self._scan_done: + return Decision(True, task=task) + return Decision(False, task=task, reason=Denial.SCAN_REQUIRED, message="scan-first required") + + def bundle_for(self, name: str) -> str: + """Resolve the prompt bundle text for *name*. Call only after can_run allows.""" + return resolve_bundle(self._prompts_root, name) diff --git a/tests/test_cli.py b/tests/test_cli.py index e53c413..f2f86ff 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -273,7 +273,7 @@ def test_get_bundle_catches_resolve_bundle_error(tmp_path, capsys, monkeypatch): monkeypatch.delenv("FORERUNNER_SCAN_DONE", raising=False) (tmp_path / ".forerunner").mkdir() (tmp_path / ".forerunner" / "scan.md").write_text("scan: {}\n", encoding="utf-8") - with patch("codeforerunner.cli._resolve_bundle", side_effect=FileNotFoundError("gone")): + with patch("codeforerunner.prompt_session.resolve_bundle", side_effect=FileNotFoundError("gone")): rc = main(["--repo", str(tmp_path), "doc", "readme"]) cap = capsys.readouterr() assert rc == 2 diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 6f4c007..73fd360 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -453,7 +453,7 @@ def _seed_prompts(path: Path) -> Path: (prompts / "tasks").mkdir() (prompts / "tasks" / "scan.md").write_text("# Scan\n", encoding="utf-8") (prompts / "tasks" / "init-agent-onboarding.md").write_text("# Onboarding\n", encoding="utf-8") - (prompts / "tasks" / "check-task.md").write_text("# Check Task\n", encoding="utf-8") + (prompts / "tasks" / "check.md").write_text("# Check Task\n", encoding="utf-8") return prompts @@ -466,7 +466,7 @@ def test_serve_allows_non_exempt_when_scan_artifact_present(tmp_path): out = io.StringIO() msgs = _rpc( {"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}}, - {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check-task"}}, + {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check"}}, ) serve(prompts_root, repo_root=repo_root, stdin=msgs, stdout=out, stderr=io.StringIO()) out.seek(0) @@ -483,7 +483,7 @@ def test_serve_blocks_non_exempt_when_scan_artifact_absent(tmp_path): out = io.StringIO() msgs = _rpc( {"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}}, - {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check-task"}}, + {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check"}}, ) serve(prompts_root, repo_root=repo_root, stdin=msgs, stdout=out, stderr=io.StringIO()) out.seek(0) diff --git a/tests/test_prompt_session.py b/tests/test_prompt_session.py new file mode 100644 index 0000000..1a17b39 --- /dev/null +++ b/tests/test_prompt_session.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from pathlib import Path + +import codeforerunner +from codeforerunner.prompt_session import Denial, PromptSession + +PROMPTS_ROOT = Path(codeforerunner.__file__).parent / "prompts" + + +def test_non_exempt_task_denied_when_scan_not_satisfied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=False) + result = session.can_run("readme") + assert result.allowed is False + assert result.reason is Denial.SCAN_REQUIRED + + +def test_exempt_task_allowed_when_scan_not_satisfied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=False) + for name in ("scan", "init-agent-onboarding"): + result = session.can_run(name) + assert result.allowed is True, name + assert result.task.name == name + + +def test_non_exempt_task_allowed_when_scan_satisfied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=True) + result = session.can_run("readme") + assert result.allowed is True + assert result.task.name == "readme" + + +def test_unknown_task_denied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=True) + result = session.can_run("not-a-real-task") + assert result.allowed is False + assert result.reason is Denial.UNKNOWN_TASK + + +def test_mark_scan_done_unblocks_non_exempt_task(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=False) + assert session.can_run("readme").allowed is False + session.mark_scan_done() + assert session.can_run("readme").allowed is True + + +def test_bundle_for_resolves_prompt_text(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=True) + text = session.bundle_for("readme") + assert "" in text