diff --git a/agent-workspace/agent_helpers.py b/agent-workspace/agent_helpers.py index 2d493c17..01aeb541 100644 --- a/agent-workspace/agent_helpers.py +++ b/agent-workspace/agent_helpers.py @@ -5,3 +5,267 @@ repo's default agent-workspace exists. """ +from __future__ import annotations + +import json as _json +import os as _os +import time as _time +from contextlib import contextmanager as _contextmanager +from pathlib import Path as _Path +from typing import Any as _Any + + +REVIEWOPS_TERMINAL_DOWNLOAD_STATUSES = frozenset( + { + "captured", + "downloaded", + "captured_markdown_export", + "captured_native_markdown", + } +) + + +class ReviewOpsRetrieverLockError(RuntimeError): + """Raised when another guarded ReviewOps retriever already owns the run lock.""" + + +class ReviewOpsRetrieverLock: + """Small atomic lock for one retriever per run directory. + + This deliberately does not send prompts or click UI. It only prevents a second + retrieval worker from running the same loop concurrently, preserving the + single-retriever / no-duplicate-send guard used by ReviewOps scripts. + """ + + def __init__(self, path: str | _os.PathLike[str], *, stale_after: float | None = None): + self.path = _Path(path) + self.stale_after = stale_after + self.acquired = False + + def acquire(self) -> "ReviewOpsRetrieverLock": + self.path.parent.mkdir(parents=True, exist_ok=True) + flags = _os.O_CREAT | _os.O_EXCL | _os.O_WRONLY + payload = { + "pid": _os.getpid(), + "created_at": _time.time(), + "argv_redacted": True, + } + try: + fd = _os.open(self.path, flags) + except FileExistsError as exc: + if self._break_stale_lock(): + return self.acquire() + raise ReviewOpsRetrieverLockError( + f"retriever lock already exists: {self.path}" + ) from exc + with _os.fdopen(fd, "w", encoding="utf-8") as f: + _json.dump(payload, f, ensure_ascii=False, indent=2) + f.write("\n") + self.acquired = True + return self + + def _break_stale_lock(self) -> bool: + if self.stale_after is None: + return False + try: + age = _time.time() - self.path.stat().st_mtime + except OSError: + return False + if age < self.stale_after: + return False + try: + self.path.unlink() + return True + except OSError: + return False + + def release(self) -> None: + if not self.acquired: + return + try: + self.path.unlink() + except FileNotFoundError: + pass + finally: + self.acquired = False + + def __enter__(self) -> "ReviewOpsRetrieverLock": + return self.acquire() + + def __exit__(self, exc_type, exc, tb) -> None: + self.release() + + +@_contextmanager +def reviewops_single_retriever(lock_file: str | _os.PathLike[str], *, stale_after: float | None = None): + """Context manager enforcing one guarded ReviewOps retriever at a time.""" + + lock = ReviewOpsRetrieverLock(lock_file, stale_after=stale_after) + lock.acquire() + try: + yield lock + finally: + lock.release() + + +def read_reviewops_status_file(status_file: str | _os.PathLike[str]) -> dict[str, _Any] | None: + """Read a JSON ReviewOps status file, returning None while it is absent/partial. + + Background retrievers can update a status file while a watcher polls it; this + helper treats FileNotFoundError and JSONDecodeError as transient states instead + of crashing the watchdog. + """ + + path = _Path(status_file) + try: + text = path.read_text(encoding="utf-8") + except FileNotFoundError: + return None + try: + value = _json.loads(text) + except _json.JSONDecodeError: + return None + if not isinstance(value, dict): + return None + return value + + +def _status_output_path(status: dict[str, _Any], explicit_output: str | _os.PathLike[str] | None) -> _Path | None: + if explicit_output is not None: + return _Path(explicit_output) + for key in ("output_file", "out_file", "path"): + value = status.get(key) + if value: + return _Path(str(value)) + return None + + +def _identity_guard_allows_completion( + status: dict[str, _Any], *, require_accepted: bool = False +) -> bool: + """Return False when a reported identity guard exists but did not accept. + + Older/local retrievers may not write identity-guard fields; those remain + caller-governed by default. For ReviewOps invocation templates that must prove + the current response before exiting, pass ``require_accepted=True`` so missing + or disabled identity-guard evidence fails closed too. If the fields are + present, the watchdog must not turn a stale or rejected export into a + successful completion merely because a file exists. + """ + + result = status.get("export_identity_guard_last_result") + accepted = isinstance(result, dict) and result.get("accepted") is True + guard_enabled = status.get("export_identity_guard_enabled") is True + if require_accepted: + return guard_enabled and accepted + if not guard_enabled: + return True + return accepted + + +def reviewops_download_is_complete( + status: dict[str, _Any] | None, + out_file: str | _os.PathLike[str] | None = None, + *, + terminal_statuses=REVIEWOPS_TERMINAL_DOWNLOAD_STATUSES, + min_bytes: int = 1, + require_identity_guard_accepted: bool = False, +) -> bool: + """True only when status says downloaded/captured and the output file exists. + + This is intentionally narrow. It does not click, export, send, or bypass + current-response identity checks. If the status file reports an enabled + identity guard, completion is allowed only when that guard accepted. Pass + ``require_identity_guard_accepted=True`` from ReviewOps retriever invocation + templates so completion requires all three conditions: terminal + downloaded/captured status, non-empty output file, and accepted identity guard. + """ + + if not status: + return False + if str(status.get("status", "")).strip() not in set(terminal_statuses): + return False + if not _identity_guard_allows_completion( + status, require_accepted=require_identity_guard_accepted + ): + return False + output = _status_output_path(status, out_file) + if output is None: + return False + try: + return output.is_file() and output.stat().st_size >= min_bytes + except OSError: + return False + + +def wait_for_reviewops_download( + status_file: str | _os.PathLike[str], + out_file: str | _os.PathLike[str] | None = None, + *, + timeout: float = 30.0, + interval: float = 1.0, + terminal_statuses=REVIEWOPS_TERMINAL_DOWNLOAD_STATUSES, + min_bytes: int = 1, + require_identity_guard_accepted: bool = False, +) -> dict[str, _Any]: + """Short-poll a retriever status file until download completion or timeout. + + Use this as an outer watchdog around long-running ReviewOps/Deep Research + retrievers. If the retriever writes `status: downloaded` (or a compatible + captured status) and the output file exists, this returns immediately so the + parent process can exit 0 and Hermes `notify_on_complete` fires. On timeout it + returns the last status dict with `watchdog_status: timeout` and does not click + or send anything. New ReviewOps invocation templates should pass + ``require_identity_guard_accepted=True`` so this exits only after the stale + export/current-response identity guard has accepted the output. + """ + + deadline = _time.time() + timeout + last_status: dict[str, _Any] | None = None + while _time.time() < deadline: + last_status = read_reviewops_status_file(status_file) + if reviewops_download_is_complete( + last_status, + out_file, + terminal_statuses=terminal_statuses, + min_bytes=min_bytes, + require_identity_guard_accepted=require_identity_guard_accepted, + ): + result = dict(last_status or {}) + result["watchdog_status"] = "downloaded" + return result + _time.sleep(interval) + result = dict(last_status or {}) + result.setdefault("status", "unknown") + result["watchdog_status"] = "timeout" + result["status_file"] = str(status_file) + output = _status_output_path(result, out_file) + if output is not None: + result["output_file"] = str(output) + return result + + +def exit_if_reviewops_downloaded( + status_file: str | _os.PathLike[str], + out_file: str | _os.PathLike[str] | None = None, + *, + code: int = 0, + require_identity_guard_accepted: bool = False, +) -> None: + """Exit the current retriever once status/output prove the download is done. + + Drop this near the top of a retriever loop after the identity/stale-export + guards have written a terminal status. It fixes the lifecycle bug where the + status file says downloaded and the output exists, but a background process + keeps polling forever. New ReviewOps invocation templates should pass + ``require_identity_guard_accepted=True`` and call this after the identity/stale + guard status has been updated. + """ + + status = read_reviewops_status_file(status_file) + if reviewops_download_is_complete( + status, + out_file, + require_identity_guard_accepted=require_identity_guard_accepted, + ): + raise SystemExit(code) diff --git a/agent-workspace/reviewops_retriever_invocation_template.py b/agent-workspace/reviewops_retriever_invocation_template.py new file mode 100644 index 00000000..61704188 --- /dev/null +++ b/agent-workspace/reviewops_retriever_invocation_template.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +"""Template wrapper for the next ReviewOps/Deep Research retriever invocation. + +This wrapper is lifecycle-only: it does not click UI, send prompts, export files, +read browser profiles, or inspect credentials. It preserves the single-retriever +lock and exits only when the retriever status proves all required completion +conditions: + +1. status-file status is downloaded/captured (or the supported captured variants), +2. out-file exists and is non-empty, +3. export_identity_guard_enabled is true and export_identity_guard_last_result.accepted is true. + +Example shape for the next invocation: + + python agent-workspace/reviewops_retriever_invocation_template.py \ + --status-file "$RUN_DIR/retrieve-status.json" \ + --out-file "$RUN_DIR/deep-research-response.md" \ + --lock-file "$RUN_DIR/.retriever-wrapper.lock" \ + -- python path/to/existing_retriever.py --same --args --as-before + +The command after ``--`` is the existing retriever. Use a wrapper-specific lock +file if that existing retriever already takes its own single-retriever lock; do +not point both wrapper and child at the same lock file. This template only +observes its status/output files and stops waiting once identity-guarded +completion is visible, instead of relying solely on an outer Hermes +notify_on_complete wait. +""" + +from __future__ import annotations + +import argparse +import subprocess +import sys +import time +from pathlib import Path + +from agent_helpers import ( + read_reviewops_status_file, + reviewops_download_is_complete, + reviewops_single_retriever, + wait_for_reviewops_download, +) + + +def _complete(status_file: Path, out_file: Path) -> bool: + status = read_reviewops_status_file(status_file) + return reviewops_download_is_complete( + status, + out_file, + require_identity_guard_accepted=True, + ) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--status-file", required=True, type=Path) + parser.add_argument("--out-file", required=True, type=Path) + parser.add_argument("--lock-file", type=Path) + parser.add_argument("--watchdog-timeout", type=float, default=30.0) + parser.add_argument("--watchdog-interval", type=float, default=1.0) + parser.add_argument("--stale-lock-after", type=float) + parser.add_argument("command", nargs=argparse.REMAINDER) + args = parser.parse_args(argv) + + lock_file = args.lock_file or (args.status_file.parent / ".retriever-wrapper.lock") + command = args.command[1:] if args.command[:1] == ["--"] else args.command + + with reviewops_single_retriever(lock_file, stale_after=args.stale_lock_after): + # Fast path: if a prior guarded retriever already completed this run, + # exit successfully rather than launching a duplicate retrieve/export. + if _complete(args.status_file, args.out_file): + return 0 + + if not command: + result = wait_for_reviewops_download( + args.status_file, + args.out_file, + timeout=args.watchdog_timeout, + interval=args.watchdog_interval, + require_identity_guard_accepted=True, + ) + return 0 if result.get("watchdog_status") == "downloaded" else 124 + + proc = subprocess.Popen(command) + try: + while proc.poll() is None: + if _complete(args.status_file, args.out_file): + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + return 0 + time.sleep(args.watchdog_interval) + + # A successful child exit is not sufficient for ReviewOps completion: + # the wrapper exists to require terminal status + non-empty output + + # accepted identity guard. Fail closed if the child exits without that + # proof, while preserving non-zero child failures for debugging. + if _complete(args.status_file, args.out_file): + return 0 + return int(proc.returncode) if proc.returncode else 124 + finally: + if proc.poll() is None: + proc.terminate() + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/tests/unit/test_agent_helpers.py b/tests/unit/test_agent_helpers.py new file mode 100644 index 00000000..275ff36f --- /dev/null +++ b/tests/unit/test_agent_helpers.py @@ -0,0 +1,316 @@ +import json +import os +import sys +import time +import importlib.util +from pathlib import Path + +import pytest + + +ROOT = Path(__file__).resolve().parents[2] +AGENT_HELPERS = ROOT / "agent-workspace" / "agent_helpers.py" +REVIEWOPS_TEMPLATE = ROOT / "agent-workspace" / "reviewops_retriever_invocation_template.py" + + +def load_agent_helpers(): + spec = importlib.util.spec_from_file_location("agent_helpers_under_test", AGENT_HELPERS) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +def load_reviewops_template(): + sys.path.insert(0, str(REVIEWOPS_TEMPLATE.parent)) + try: + spec = importlib.util.spec_from_file_location("reviewops_template_under_test", REVIEWOPS_TEMPLATE) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + finally: + sys.path.remove(str(REVIEWOPS_TEMPLATE.parent)) + + +def test_reviewops_download_is_complete_requires_terminal_status_and_output(tmp_path): + h = load_agent_helpers() + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + + assert h.reviewops_download_is_complete({"status": "downloaded", "output_file": str(out)}) is True + assert h.reviewops_download_is_complete({"status": "watching", "output_file": str(out)}) is False + assert h.reviewops_download_is_complete({"status": "downloaded", "output_file": str(tmp_path / "missing.md")}) is False + + +def test_reviewops_download_respects_enabled_identity_guard(tmp_path): + h = load_agent_helpers() + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + + rejected = { + "status": "downloaded", + "output_file": str(out), + "export_identity_guard_enabled": True, + "export_identity_guard_last_result": {"accepted": False, "reason": "rejected-heading:B3a"}, + } + assert h.reviewops_download_is_complete(rejected) is False + + accepted = { + "status": "downloaded", + "output_file": str(out), + "export_identity_guard_enabled": True, + "export_identity_guard_last_result": {"accepted": True}, + } + assert h.reviewops_download_is_complete(accepted) is True + + +def test_reviewops_download_template_mode_requires_accepted_identity_guard(tmp_path): + h = load_agent_helpers() + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + + missing_guard = {"status": "captured", "output_file": str(out)} + disabled_guard = { + "status": "captured", + "output_file": str(out), + "export_identity_guard_enabled": False, + "export_identity_guard_last_result": {"accepted": True}, + } + assert ( + h.reviewops_download_is_complete( + missing_guard, + require_identity_guard_accepted=True, + ) + is False + ) + assert ( + h.reviewops_download_is_complete( + disabled_guard, + require_identity_guard_accepted=True, + ) + is False + ) + + accepted = { + "status": "captured", + "output_file": str(out), + "export_identity_guard_enabled": True, + "export_identity_guard_last_result": {"accepted": True}, + } + assert ( + h.reviewops_download_is_complete( + accepted, + require_identity_guard_accepted=True, + ) + is True + ) + + +def test_exit_if_reviewops_downloaded_raises_system_exit_only_when_complete(tmp_path): + h = load_agent_helpers() + status_file = tmp_path / "status.json" + out = tmp_path / "response.md" + + status_file.write_text(json.dumps({"status": "downloaded", "output_file": str(out)}), encoding="utf-8") + h.exit_if_reviewops_downloaded(status_file) # no output file yet: do not exit + + out.write_text("ok", encoding="utf-8") + with pytest.raises(SystemExit) as exc: + h.exit_if_reviewops_downloaded(status_file) + assert exc.value.code == 0 + + +def test_wait_for_reviewops_download_returns_immediately_when_status_and_output_exist(tmp_path): + h = load_agent_helpers() + status_file = tmp_path / "status.json" + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + status_file.write_text(json.dumps({"status": "downloaded", "output_file": str(out)}), encoding="utf-8") + + result = h.wait_for_reviewops_download(status_file, timeout=5, interval=0.01) + + assert result["status"] == "downloaded" + assert result["watchdog_status"] == "downloaded" + + +def test_wait_for_reviewops_download_template_mode_waits_for_identity_acceptance(tmp_path): + h = load_agent_helpers() + status_file = tmp_path / "status.json" + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + status_file.write_text( + json.dumps({"status": "captured", "output_file": str(out)}), + encoding="utf-8", + ) + + result = h.wait_for_reviewops_download( + status_file, + timeout=0.01, + interval=0.01, + require_identity_guard_accepted=True, + ) + + assert result["status"] == "captured" + assert result["watchdog_status"] == "timeout" + + status_file.write_text( + json.dumps( + { + "status": "captured", + "output_file": str(out), + "export_identity_guard_enabled": True, + "export_identity_guard_last_result": {"accepted": True}, + } + ), + encoding="utf-8", + ) + + result = h.wait_for_reviewops_download( + status_file, + timeout=5, + interval=0.01, + require_identity_guard_accepted=True, + ) + + assert result["status"] == "captured" + assert result["watchdog_status"] == "downloaded" + + +def test_wait_for_reviewops_download_times_out_without_side_effects(tmp_path): + h = load_agent_helpers() + status_file = tmp_path / "status.json" + status_file.write_text(json.dumps({"status": "watching"}), encoding="utf-8") + + result = h.wait_for_reviewops_download(status_file, timeout=0.01, interval=0.01) + + assert result["status"] == "watching" + assert result["watchdog_status"] == "timeout" + assert result["status_file"] == str(status_file) + + +def test_reviewops_invocation_template_no_command_requires_identity_guard(tmp_path): + template = load_reviewops_template() + status_file = tmp_path / "status.json" + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + status_file.write_text(json.dumps({"status": "captured", "output_file": str(out)}), encoding="utf-8") + + code = template.main( + [ + "--status-file", + str(status_file), + "--out-file", + str(out), + "--watchdog-timeout", + "0.01", + "--watchdog-interval", + "0.01", + ] + ) + assert code == 124 + + status_file.write_text( + json.dumps( + { + "status": "captured", + "output_file": str(out), + "export_identity_guard_enabled": True, + "export_identity_guard_last_result": {"accepted": True}, + } + ), + encoding="utf-8", + ) + + code = template.main( + [ + "--status-file", + str(status_file), + "--out-file", + str(out), + "--watchdog-timeout", + "5", + "--watchdog-interval", + "0.01", + ] + ) + assert code == 0 + + +def test_reviewops_invocation_template_command_success_without_guard_fails_closed(tmp_path): + template = load_reviewops_template() + status_file = tmp_path / "status.json" + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + status_file.write_text(json.dumps({"status": "captured", "output_file": str(out)}), encoding="utf-8") + + code = template.main( + [ + "--status-file", + str(status_file), + "--out-file", + str(out), + "--", + sys.executable, + "-c", + "import sys; sys.exit(0)", + ] + ) + + assert code == 124 + + +def test_reviewops_invocation_template_default_lock_is_wrapper_specific(tmp_path): + template = load_reviewops_template() + status_file = tmp_path / "status.json" + out = tmp_path / "response.md" + out.write_text("ok", encoding="utf-8") + status_file.write_text(json.dumps({"status": "watching"}), encoding="utf-8") + child_lock = tmp_path / ".retriever.lock" + child_lock.write_text("child retriever owns this lock", encoding="utf-8") + + code = template.main( + [ + "--status-file", + str(status_file), + "--out-file", + str(out), + "--watchdog-timeout", + "0.01", + "--watchdog-interval", + "0.01", + ] + ) + + assert code == 124 + assert child_lock.read_text(encoding="utf-8") == "child retriever owns this lock" + assert not (tmp_path / ".retriever-wrapper.lock").exists() + + +def test_reviewops_single_retriever_lock_rejects_duplicate(tmp_path): + h = load_agent_helpers() + lock_file = tmp_path / "retriever.lock" + + with h.reviewops_single_retriever(lock_file): + assert lock_file.exists() + with pytest.raises(h.ReviewOpsRetrieverLockError): + with h.reviewops_single_retriever(lock_file): + pass + + assert not lock_file.exists() + + +def test_reviewops_single_retriever_lock_can_break_stale_lock(tmp_path): + h = load_agent_helpers() + lock_file = tmp_path / "retriever.lock" + lock_file.write_text("stale", encoding="utf-8") + old = time.time() - 120 + os.utime(lock_file, (old, old)) + + with h.reviewops_single_retriever(lock_file, stale_after=1): + payload = json.loads(lock_file.read_text(encoding="utf-8")) + assert payload["pid"] == os.getpid() + assert payload["argv_redacted"] is True + assert "argv" not in payload