From d05c298d15a0c62269c83fa8ab15e4d76eb5e194 Mon Sep 17 00:00:00 2001 From: "Derek Palmer (Creative)" Date: Thu, 28 May 2026 23:24:20 -0400 Subject: [PATCH] feat(prompt-session): own scan-first rule across CLI and MCP Scan-first enforcement and task lookup were duplicated at the CLI and MCP edges and had drifted: CLI gated on config + FORERUNNER_SCAN_DONE and validated tasks against the registry, while MCP gated unconditionally and validated by prompt-file existence. Introduce a run-scoped PromptSession that owns the single scan-first rule, task lookup (via Task Registry), and bundle resolution, returning a structured Decision. Adapters translate Decisions into exit codes / JSON-RPC and still inject their own scan-satisfied signal, so observable CLI behavior is unchanged. MCP task validation now goes through the registry, so unregistered stray prompt files are rejected. Rationale for unifying the rule but not the signals recorded in docs/adr/0001. Closes #49 --- ...prompt-session-unifies-rule-not-signals.md | 9 ++++ src/codeforerunner/cli.py | 34 ++++++------ src/codeforerunner/mcp_server.py | 16 +++--- src/codeforerunner/prompt_session.py | 53 +++++++++++++++++++ tests/test_cli.py | 2 +- tests/test_mcp_server.py | 6 +-- tests/test_prompt_session.py | 50 +++++++++++++++++ 7 files changed, 142 insertions(+), 28 deletions(-) create mode 100644 docs/adr/0001-prompt-session-unifies-rule-not-signals.md create mode 100644 src/codeforerunner/prompt_session.py create mode 100644 tests/test_prompt_session.py diff --git a/docs/adr/0001-prompt-session-unifies-rule-not-signals.md b/docs/adr/0001-prompt-session-unifies-rule-not-signals.md new file mode 100644 index 0000000..8fbd503 --- /dev/null +++ b/docs/adr/0001-prompt-session-unifies-rule-not-signals.md @@ -0,0 +1,9 @@ +# Prompt Session unifies the scan-first rule, not the scan-state signals + +The scan-first gate was duplicated across the CLI and MCP adapters, and the two had drifted: the CLI gate is config-gated and honors a `FORERUNNER_SCAN_DONE` escape hatch, while MCP gates unconditionally with no escape; the CLI validates task existence against the Task Registry while MCP validated only that a prompt file existed on disk. We introduced a run-scoped Prompt Session that owns the single scan-first *rule* (exempt task → allow; scan satisfied → allow; else deny) and task lookup (via the Task Registry for both adapters), but it does **not** unify the scan-state *signals*: each adapter still computes its own `scan_satisfied` boolean (CLI from `.forerunner/scan.md` ∨ env ∨ absent-config; MCP from `.forerunner/scan.md`, plus an in-process flag set when the `scan` tool runs) and injects it. + +We chose this deliberately because the issue (#49) scoped out changing the scan-first invariant and required preserving `FORERUNNER_SCAN_DONE` for the CLI; fully unifying the signals would change MCP's observable behavior (it would gain config-gating and an env escape) and is deferred to a separate migration. So a future reader who sees CLI and MCP still differing on env/config signals — despite the glossary saying adapters share one policy — should know the divergence in *signal sourcing* is intentional; only the *rule and deny path* were unified here. + +## Status + +accepted diff --git a/src/codeforerunner/cli.py b/src/codeforerunner/cli.py index 9388c19..a7404ab 100644 --- a/src/codeforerunner/cli.py +++ b/src/codeforerunner/cli.py @@ -8,13 +8,21 @@ from pathlib import Path from typing import Sequence -from codeforerunner.bundle import find_prompts_root, resolve_bundle as _resolve_bundle -from codeforerunner.tasks import get as _get_task +from codeforerunner.bundle import find_prompts_root +from codeforerunner.prompt_session import Denial, PromptSession from codeforerunner.tasks import refresh_tasks as _refresh_tasks -from codeforerunner.tasks import scan_exempt_names as _scan_exempt_names SCAN_DONE_ENV = "FORERUNNER_SCAN_DONE" +def _scan_satisfied(repo_root: Path) -> bool: + """CLI scan-first signal: scan artifact present, env override set, or no config to gate.""" + return ( + (repo_root / ".forerunner" / "scan.md").is_file() + or bool(os.environ.get(SCAN_DONE_ENV)) + or not (repo_root / "forerunner.config.yaml").is_file() + ) + + def _get_bundle(args: argparse.Namespace) -> tuple[str, int]: """Resolve bundle for args.task. Returns (bundle_text, exit_code). exit_code != 0 on error.""" try: @@ -23,19 +31,13 @@ def _get_bundle(args: argparse.Namespace) -> tuple[str, int]: print(f"error: {e}", file=sys.stderr) return "", 2 - try: - _get_task(args.task) - except KeyError: - print(f"error: unknown task '{args.task}'", file=sys.stderr) - return "", 2 - repo_root = Path(args.repo).resolve() if args.repo else Path.cwd() - if ( - args.task not in _scan_exempt_names() - and (repo_root / "forerunner.config.yaml").is_file() - and not (repo_root / ".forerunner" / "scan.md").is_file() - and not os.environ.get(SCAN_DONE_ENV) - ): + session = PromptSession(prompts_root, _scan_satisfied(repo_root)) + decision = session.can_run(args.task) + if not decision.allowed: + if decision.reason is Denial.UNKNOWN_TASK: + print(f"error: unknown task '{args.task}'", file=sys.stderr) + return "", 2 print( f"error: scan-first required — run `forerunner scan` first " f"(writes .forerunner/scan.md). Set {SCAN_DONE_ENV}=1 to skip.", @@ -44,7 +46,7 @@ def _get_bundle(args: argparse.Namespace) -> tuple[str, int]: return "", 1 try: - return _resolve_bundle(prompts_root, args.task), 0 + return session.bundle_for(args.task), 0 except FileNotFoundError as e: print(f"error: {e}", file=sys.stderr) return "", 2 diff --git a/src/codeforerunner/mcp_server.py b/src/codeforerunner/mcp_server.py index 1258115..70bb68d 100644 --- a/src/codeforerunner/mcp_server.py +++ b/src/codeforerunner/mcp_server.py @@ -11,9 +11,9 @@ from typing import Any, Iterable from codeforerunner import __version__ as _pkg_version -from codeforerunner.bundle import find_prompts_root, resolve_bundle +from codeforerunner.bundle import find_prompts_root +from codeforerunner.prompt_session import Denial, PromptSession from codeforerunner.tasks import all_tasks as _all_tasks -from codeforerunner.tasks import scan_exempt_names as _scan_exempt_names PROTOCOL_VERSION = "2025-03-26" SERVER_NAME = "codeforerunner" @@ -85,11 +85,11 @@ def _handle(prompts_root: Path, msg: dict[str, Any], state: dict[str, Any]) -> d name = params.get("name") if not isinstance(name, str) or "/" in name or "\\" in name or ".." in name: return _err(req_id, -32602, f"invalid tool name: {name!r}") - task_path = prompts_root / "tasks" / f"{name}.md" - tasks_root = (prompts_root / "tasks").resolve() - if not task_path.resolve().is_relative_to(tasks_root) or not task_path.is_file(): - return _err(req_id, -32602, f"unknown tool: {name!r}") - if name not in _scan_exempt_names() and not state.get("scan_called"): + session = PromptSession(prompts_root, scan_satisfied=bool(state.get("scan_called"))) + decision = session.can_run(name) + if not decision.allowed: + if decision.reason is Denial.UNKNOWN_TASK: + return _err(req_id, -32602, f"unknown tool: {name!r}") return _err( req_id, -32000, @@ -98,7 +98,7 @@ def _handle(prompts_root: Path, msg: dict[str, Any], state: dict[str, Any]) -> d if name == "scan": state["scan_called"] = True try: - text = resolve_bundle(prompts_root, name) + text = session.bundle_for(name) except Exception as e: # pragma: no cover - defensive return _err(req_id, -32603, f"internal error: {e}") return _ok( diff --git a/src/codeforerunner/prompt_session.py b/src/codeforerunner/prompt_session.py new file mode 100644 index 0000000..4f797c5 --- /dev/null +++ b/src/codeforerunner/prompt_session.py @@ -0,0 +1,53 @@ +"""Prompt Session — run-scoped owner of task lookup and scan-first enforcement. + +CLI and MCP are thin adapters: they construct a session, ask whether a task +can run, and translate the structured Decision into their own surface +(stdout/exit codes for CLI, JSON-RPC for MCP). The session unifies the +scan-first *rule*; each adapter still computes its own scan-satisfied signal +and injects it (see docs/adr/0001). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum, auto +from pathlib import Path + +from codeforerunner import tasks +from codeforerunner.bundle import resolve_bundle + + +class Denial(Enum): + UNKNOWN_TASK = auto() + SCAN_REQUIRED = auto() + + +@dataclass(frozen=True) +class Decision: + allowed: bool + task: tasks.Task | None = None + reason: Denial | None = None + message: str | None = None + + +class PromptSession: + def __init__(self, prompts_root: Path, scan_satisfied: bool) -> None: + self._prompts_root = prompts_root + self._scan_done = scan_satisfied + + def mark_scan_done(self) -> None: + """Record that scan ran in this session (MCP adapter, after scan tool).""" + self._scan_done = True + + def can_run(self, name: str) -> Decision: + try: + task = tasks.get(name) + except KeyError: + return Decision(False, reason=Denial.UNKNOWN_TASK, message=f"unknown task: {name!r}") + if task.scan_exempt or self._scan_done: + return Decision(True, task=task) + return Decision(False, task=task, reason=Denial.SCAN_REQUIRED, message="scan-first required") + + def bundle_for(self, name: str) -> str: + """Resolve the prompt bundle text for *name*. Call only after can_run allows.""" + return resolve_bundle(self._prompts_root, name) diff --git a/tests/test_cli.py b/tests/test_cli.py index e53c413..f2f86ff 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -273,7 +273,7 @@ def test_get_bundle_catches_resolve_bundle_error(tmp_path, capsys, monkeypatch): monkeypatch.delenv("FORERUNNER_SCAN_DONE", raising=False) (tmp_path / ".forerunner").mkdir() (tmp_path / ".forerunner" / "scan.md").write_text("scan: {}\n", encoding="utf-8") - with patch("codeforerunner.cli._resolve_bundle", side_effect=FileNotFoundError("gone")): + with patch("codeforerunner.prompt_session.resolve_bundle", side_effect=FileNotFoundError("gone")): rc = main(["--repo", str(tmp_path), "doc", "readme"]) cap = capsys.readouterr() assert rc == 2 diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 6f4c007..73fd360 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -453,7 +453,7 @@ def _seed_prompts(path: Path) -> Path: (prompts / "tasks").mkdir() (prompts / "tasks" / "scan.md").write_text("# Scan\n", encoding="utf-8") (prompts / "tasks" / "init-agent-onboarding.md").write_text("# Onboarding\n", encoding="utf-8") - (prompts / "tasks" / "check-task.md").write_text("# Check Task\n", encoding="utf-8") + (prompts / "tasks" / "check.md").write_text("# Check Task\n", encoding="utf-8") return prompts @@ -466,7 +466,7 @@ def test_serve_allows_non_exempt_when_scan_artifact_present(tmp_path): out = io.StringIO() msgs = _rpc( {"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}}, - {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check-task"}}, + {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check"}}, ) serve(prompts_root, repo_root=repo_root, stdin=msgs, stdout=out, stderr=io.StringIO()) out.seek(0) @@ -483,7 +483,7 @@ def test_serve_blocks_non_exempt_when_scan_artifact_absent(tmp_path): out = io.StringIO() msgs = _rpc( {"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}}, - {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check-task"}}, + {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "check"}}, ) serve(prompts_root, repo_root=repo_root, stdin=msgs, stdout=out, stderr=io.StringIO()) out.seek(0) diff --git a/tests/test_prompt_session.py b/tests/test_prompt_session.py new file mode 100644 index 0000000..1a17b39 --- /dev/null +++ b/tests/test_prompt_session.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from pathlib import Path + +import codeforerunner +from codeforerunner.prompt_session import Denial, PromptSession + +PROMPTS_ROOT = Path(codeforerunner.__file__).parent / "prompts" + + +def test_non_exempt_task_denied_when_scan_not_satisfied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=False) + result = session.can_run("readme") + assert result.allowed is False + assert result.reason is Denial.SCAN_REQUIRED + + +def test_exempt_task_allowed_when_scan_not_satisfied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=False) + for name in ("scan", "init-agent-onboarding"): + result = session.can_run(name) + assert result.allowed is True, name + assert result.task.name == name + + +def test_non_exempt_task_allowed_when_scan_satisfied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=True) + result = session.can_run("readme") + assert result.allowed is True + assert result.task.name == "readme" + + +def test_unknown_task_denied(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=True) + result = session.can_run("not-a-real-task") + assert result.allowed is False + assert result.reason is Denial.UNKNOWN_TASK + + +def test_mark_scan_done_unblocks_non_exempt_task(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=False) + assert session.can_run("readme").allowed is False + session.mark_scan_done() + assert session.can_run("readme").allowed is True + + +def test_bundle_for_resolves_prompt_text(): + session = PromptSession(PROMPTS_ROOT, scan_satisfied=True) + text = session.bundle_for("readme") + assert "" in text