diff --git a/docs/verifier-schema.v0.1.json b/docs/verifier-schema.v0.1.json index a64fa8cb..6c881798 100644 --- a/docs/verifier-schema.v0.1.json +++ b/docs/verifier-schema.v0.1.json @@ -135,6 +135,56 @@ "title": "VerifierCapabilityReview", "type": "object" }, + "VerifierFixTask": { + "additionalProperties": false, + "description": "The single repair task a verify run hands to whoever acts next.\n\nRouting is deterministic and projected from the head scan \u2014 never an LLM\njudgment. ``coding_agent`` + ``safe_to_attempt=True`` means the gating\ngaps are mechanical (every gating finding is ``autofix_safe``): the agent\nmay fix them and re-run ``verification_command``. ``human`` +\n``safe_to_attempt=False`` means an authority gap a coding agent must not\ninvent its way past \u2014 missing approval/idempotency evidence, a weakened\npolicy, or a touched trust root. ``forbidden_shortcuts`` are the\nreward-hacking moves that are never acceptable for either actor.", + "properties": { + "actor": { + "enum": [ + "coding_agent", + "human" + ], + "title": "Actor", + "type": "string" + }, + "forbidden_shortcuts": { + "items": { + "type": "string" + }, + "title": "Forbidden Shortcuts", + "type": "array" + }, + "instructions": { + "items": { + "type": "string" + }, + "title": "Instructions", + "type": "array" + }, + "safe_to_attempt": { + "title": "Safe To Attempt", + "type": "boolean" + }, + "verification_command": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Verification Command" + } + }, + "required": [ + "actor", + "safe_to_attempt" + ], + "title": "VerifierFixTask", + "type": "object" + }, "VerifierHumanReview": { "additionalProperties": false, "description": "Whether a human must review before merge, and why.", @@ -331,6 +381,17 @@ ], "default": null }, + "fix_task": { + "anyOf": [ + { + "$ref": "#/$defs/VerifierFixTask" + }, + { + "type": "null" + } + ], + "default": null + }, "head_exit_code": { "default": 0, "title": "Head Exit Code", diff --git a/src/agents_shipgate/cli/verify/fix_task.py b/src/agents_shipgate/cli/verify/fix_task.py new file mode 100644 index 00000000..f4c88197 --- /dev/null +++ b/src/agents_shipgate/cli/verify/fix_task.py @@ -0,0 +1,174 @@ +"""Deterministic ``fix_task`` projection for ``agents-shipgate verify``. + +The fix_task is the single repair instruction a verify run hands to whoever +must act next. It is a pure projection of the head scan: *who* acts and +*whether it is safe* is decided by the release decision plus the per-finding +``autofix_safe`` / ``requires_human_review`` signals — never by a model. The +agent route fires only when every gating finding is mechanically fixable; any +authority gap (missing approval/idempotency evidence, a weakened policy, a +touched trust root, or degraded evidence) routes to a human so a coding agent +cannot invent its way past it. +""" + +from __future__ import annotations + +import shlex + +from agents_shipgate.schemas.report import Finding, ReadinessReport +from agents_shipgate.schemas.verifier import ( + MergeVerdict, + VerifierCapabilityReview, + VerifierFixTask, +) + +# Reward-hacking moves that are never acceptable, for either actor. Kept in +# sync with the PR-comment guardrail language (cli/verify/pr_comment.py). +FORBIDDEN_SHORTCUTS: tuple[str, ...] = ( + "Do not suppress the finding (checks.ignore in shipgate.yaml).", + "Do not lower severity or add a waiver just to pass the gate.", + "Do not invent or assume approval, idempotency, or audit evidence you " + "cannot prove from the code.", + "Do not weaken the release policy, CI gate, or agent instructions that " + "evaluate this change.", +) + +_MAX_INSTRUCTIONS = 5 + + +def build_fix_task( + report: ReadinessReport | None, + *, + merge_verdict: MergeVerdict, + capability_review: VerifierCapabilityReview | None, + base_ref: str | None, + head_ref: str, +) -> VerifierFixTask | None: + """Project the head scan onto a single repair task. + + Returns ``None`` when there is nothing to fix (mergeable, or no head + release decision to reason about). + """ + if merge_verdict == "mergeable": + return None + + verification_command = _verification_command(base_ref, head_ref) + + # No completed head decision (scan skipped/failed → ``unknown``) but the PR + # is not mergeable: there are no findings to route on, so fail closed to a + # human who must investigate why the scan did not complete. Emitting a task + # here (rather than None) keeps the contract uniform — every non-mergeable + # verdict carries a fix_task. + if report is None or report.release_decision is None or capability_review is None: + return VerifierFixTask( + actor="human", + safe_to_attempt=False, + instructions=[ + "Shipgate could not produce a release decision for this PR; a " + "human must investigate why the scan did not complete and " + "re-run before merge." + ], + forbidden_shortcuts=list(FORBIDDEN_SHORTCUTS), + verification_command=verification_command, + ) + + gating = _gating_findings(report) + + # The coding-agent route is the only non-human outcome and it MUST fail + # closed: every gating finding has to be explicitly mechanical + # (``autofix_safe is True`` AND ``requires_human_review is False``). A + # finding whose routing fields are ``None``/``False`` — stale, plugin, or + # legacy — is treated as an authority gap and never silently marked + # agent-safe. + mechanical = bool(gating) and all( + finding.autofix_safe is True and finding.requires_human_review is False + for finding in gating + ) + authority_escalation = ( + capability_review.policy_weakened + or capability_review.trust_root_touched + or merge_verdict in {"insufficient_evidence", "unknown"} + ) + if mechanical and not authority_escalation: + return VerifierFixTask( + actor="coding_agent", + safe_to_attempt=True, + instructions=_mechanical_instructions(gating), + forbidden_shortcuts=list(FORBIDDEN_SHORTCUTS), + verification_command=verification_command, + ) + + return VerifierFixTask( + actor="human", + safe_to_attempt=False, + instructions=_human_instructions(report, capability_review, gating), + forbidden_shortcuts=list(FORBIDDEN_SHORTCUTS), + verification_command=verification_command, + ) + + +def _gating_findings(report: ReadinessReport) -> list[Finding]: + """The active findings driving blockers / review_items, in decision order.""" + decision = report.release_decision + assert decision is not None # guarded by build_fix_task + by_id = {f.id: f for f in report.findings if f.id} + by_fingerprint = {f.fingerprint: f for f in report.findings if f.fingerprint} + out: list[Finding] = [] + seen: set[int] = set() + for item in [*decision.blockers, *decision.review_items]: + finding = (by_id.get(item.id) if item.id else None) or ( + by_fingerprint.get(item.fingerprint) if item.fingerprint else None + ) + if finding is not None and id(finding) not in seen: + out.append(finding) + seen.add(id(finding)) + return out + + +def _human_instructions( + report: ReadinessReport, + capability_review: VerifierCapabilityReview, + gating: list[Finding], +) -> list[str]: + decision = report.release_decision + assert decision is not None + out: list[str] = [decision.reason] + if capability_review.policy_weakened: + out.append( + "A human must approve the release-policy change in this PR; the " + "coding agent that made the change cannot self-approve it." + ) + if capability_review.trust_root_touched: + out.append( + "A human must review the touched release trust root (manifest, CI " + "gate, agent instructions, or trigger catalog) before merge." + ) + # List every gating finding's recommendation — a human-routed task owns the + # whole decision, including findings whose routing fields were ambiguous. + out.extend(finding.recommendation for finding in gating if finding.recommendation) + return _dedupe_cap(out) + + +def _mechanical_instructions(gating: list[Finding]) -> list[str]: + return _dedupe_cap([finding.recommendation for finding in gating if finding.recommendation]) + + +def _dedupe_cap(items: list[str]) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for item in items: + if item and item not in seen: + seen.add(item) + out.append(item) + return out[:_MAX_INSTRUCTIONS] + + +def _verification_command(base_ref: str | None, head_ref: str) -> str: + # Refs come from CLI / GitHub branch inputs and a valid git ref may contain + # shell metacharacters (e.g. ``;``); quote them so the emitted command is + # safe to run when an agent or human copies it verbatim. + base = shlex.quote(base_ref or "origin/main") + head = shlex.quote(head_ref or "HEAD") + return f"agents-shipgate verify --base {base} --head {head} --json" + + +__all__ = ["FORBIDDEN_SHORTCUTS", "build_fix_task"] diff --git a/src/agents_shipgate/cli/verify/orchestrator.py b/src/agents_shipgate/cli/verify/orchestrator.py index fff58504..73a7bcdf 100644 --- a/src/agents_shipgate/cli/verify/orchestrator.py +++ b/src/agents_shipgate/cli/verify/orchestrator.py @@ -20,13 +20,15 @@ MergeVerdict, VerifierArtifact, VerifierBaseStatus, + VerifierFixTask, VerifierHumanReview, VerifierNextAction, - map_merge_verdict, + merge_verdict_for, ) from agents_shipgate.triggers import evaluate from .capability_review import build_capability_review +from .fix_task import build_fix_task from .git import ( archive_tree, diff_context, @@ -513,12 +515,6 @@ def _map_optional_tree_path( return tree_dir / relative -def _merge_verdict(*, decision: str | None, head_status: str) -> MergeVerdict: - if decision is not None: - return map_merge_verdict(decision) - return "mergeable" if head_status == "skipped" else "unknown" - - def _can_merge_without_human( *, merge_verdict: MergeVerdict, release_decision: ReleaseDecision | None ) -> bool: @@ -552,7 +548,7 @@ def _human_review( def _first_next_action( *, merge_verdict: MergeVerdict, - human_review_required: bool, + fix_task: VerifierFixTask | None, agent_summary: AgentSummary | None, reason: str | None, ) -> VerifierNextAction: @@ -563,17 +559,40 @@ def _first_next_action( command=None, why="No agent-capability changes gate this PR; safe to merge.", ) - actor = "human" if human_review_required else "coding_agent" + # The fix_task is the single repair contract; the headline next-step must + # not contradict it. Borrow the agent summary's concrete action (e.g. an + # apply-patches command) only when its implied actor agrees with the + # fix_task routing — otherwise derive the pointer from the fix_task so that + # actor, command, and why all come from one source. + actor = fix_task.actor if fix_task is not None else "human" recommended = ( agent_summary.first_recommended_action if agent_summary is not None else None ) if recommended is not None: - return VerifierNextAction( - actor=actor, - kind=recommended.kind, - command=recommended.command, - why=recommended.why, + # The PR comment infers a recommendation's actor the same way: a + # runnable command implies the coding agent, an info note a human. + recommended_actor = "coding_agent" if recommended.kind == "command" else "human" + if fix_task is None or recommended_actor == actor: + return VerifierNextAction( + actor=actor, + kind=recommended.kind, + command=recommended.command, + why=recommended.why, + ) + if fix_task is not None: + why = ( + fix_task.instructions[0] + if fix_task.instructions + else (reason or "Human review required before merge.") ) + if actor == "coding_agent": + return VerifierNextAction( + actor=actor, + kind="command", + command=fix_task.verification_command, + why=why, + ) + return VerifierNextAction(actor=actor, kind="review", command=None, why=why) return VerifierNextAction( actor=actor, kind="review", @@ -642,11 +661,19 @@ def _build_verifier( include_scan_artifacts=report is not None, ) decision = release_decision_model.decision if release_decision_model else None - merge_verdict = _merge_verdict(decision=decision, head_status=head_status) + merge_verdict = merge_verdict_for(decision=decision, head_status=head_status) human_review = _human_review( merge_verdict=merge_verdict, release_decision=release_decision_model ) agent_summary_model = report.agent_summary if report is not None else None + capability_review = build_capability_review(report) if report is not None else None + fix_task = build_fix_task( + report, + merge_verdict=merge_verdict, + capability_review=capability_review, + base_ref=base, + head_ref=head, + ) return VerifierArtifact( workspace=str(git_root), config=_display_path(config_path, git_root), @@ -675,7 +702,7 @@ def _build_verifier( if report is not None and report.reviewer_summary is not None else None ), - capability_review=build_capability_review(report) if report is not None else {}, + capability_review=capability_review if capability_review is not None else {}, mode=_verifier_mode( ci_mode=ci_mode, report=report, @@ -696,10 +723,11 @@ def _build_verifier( human_review=human_review, first_next_action=_first_next_action( merge_verdict=merge_verdict, - human_review_required=human_review.required, + fix_task=fix_task, agent_summary=agent_summary_model, reason=release_decision_model.reason if release_decision_model else None, ), + fix_task=fix_task, artifacts=artifacts, ) diff --git a/src/agents_shipgate/cli/verify/pr_comment.py b/src/agents_shipgate/cli/verify/pr_comment.py index 92dc1d8f..de3b234f 100644 --- a/src/agents_shipgate/cli/verify/pr_comment.py +++ b/src/agents_shipgate/cli/verify/pr_comment.py @@ -7,6 +7,7 @@ VerifierArtifact, VerifierCapabilityChange, VerifierCapabilityReview, + VerifierFixTask, ) from .capability_review import TRUST_ROOT_CHECK_ID @@ -82,7 +83,7 @@ def _render_capability_review_comment( lines.extend(_capability_change_table(review)) lines.extend(_trust_root_warning_lines(review, report)) - lines.extend(_required_before_merge_lines(report, review)) + lines.extend(_required_before_merge_lines(report, review, verifier.fix_task)) lines.extend(_artifact_lines(verifier)) return _truncate("\n".join(lines), 6000) @@ -233,8 +234,25 @@ def _trust_root_warning_lines( def _required_before_merge_lines( report: ReadinessReport, review: VerifierCapabilityReview, + fix_task: VerifierFixTask | None = None, ) -> list[str]: lines = ["", "### Required before merge"] + # When verify produced a fix_task it is the authoritative repair contract; + # render it verbatim so the human surface and verifier.json never tell + # different stories about who acts next and whether it is safe. + if fix_task is not None: + who = "Coding agent" if fix_task.actor == "coding_agent" else "Human" + safety = ( + "safe for the coding agent to attempt" + if fix_task.safe_to_attempt + else "human authority required — a coding agent must not self-resolve" + ) + lines.append(f"Actor: {who} ({safety}).") + for index, instruction in enumerate(fix_task.instructions[:6], start=1): + lines.append(f"{index}. {_escape(instruction)}") + if fix_task.verification_command: + lines.append(f"Then re-verify: {_code(fix_task.verification_command)}") + return lines items: list[str] = [] if review.trust_root_touched or review.policy_weakened: items.append("Human: review the release-gate or policy change before merge.") diff --git a/src/agents_shipgate/schemas/capability_change.py b/src/agents_shipgate/schemas/capability_change.py index 3f9fb03b..9fa055fb 100644 --- a/src/agents_shipgate/schemas/capability_change.py +++ b/src/agents_shipgate/schemas/capability_change.py @@ -35,7 +35,7 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator -from agents_shipgate.schemas.common import Confidence +from agents_shipgate.schemas.common import Confidence, ReleaseDecisionStatus # --- Capability change ------------------------------------------------------- @@ -327,12 +327,11 @@ def _sort_lists(self) -> HumanAck: # --- Verifier summary -------------------------------------------------------- -VerifierVerdict = Literal[ - "blocked", - "review_required", - "insufficient_evidence", - "passed", -] +# Back-compat alias for the canonical verdict vocabulary. ``VerifierSummary.verdict`` +# MUST mirror ``release_decision.decision`` (Principle 2), so it reuses the exact +# same ``ReleaseDecisionStatus`` enum rather than re-spelling it. Kept as a named +# alias because callers and tests import ``VerifierVerdict`` directly. +VerifierVerdict = ReleaseDecisionStatus class VerifierCapabilityDeltaSummary(BaseModel): diff --git a/src/agents_shipgate/schemas/common.py b/src/agents_shipgate/schemas/common.py index 40571e76..82d5279b 100644 --- a/src/agents_shipgate/schemas/common.py +++ b/src/agents_shipgate/schemas/common.py @@ -7,6 +7,19 @@ Severity = Literal["info", "low", "medium", "high", "critical"] Confidence = Literal["low", "medium", "high"] BaselineStatus = Literal["new", "matched", "resolved"] +# The canonical release-verdict vocabulary — the ONE enum the whole system +# gates on. ``build_release_decision()`` is the only place that computes it; +# every other "verdict"/"decision" field (AgentSummary, ReviewerSummary, +# VerifierSummary, ReleaseConsequence) re-uses this exact alias so the +# vocabulary can never be re-spelled or drift out of lockstep. The +# agent-facing ``MergeVerdict`` (schemas/verifier.py) is a deterministic +# projection of this via ``map_merge_verdict()``. +ReleaseDecisionStatus = Literal[ + "blocked", + "review_required", + "insufficient_evidence", + "passed", +] # v0.15: per-finding provenance kind. Independent of `confidence` — # `confidence` records how sure a rule is; `provenance_kind` records # *what kind of rule fired* (and what artifact it inspected). Lets diff --git a/src/agents_shipgate/schemas/report.py b/src/agents_shipgate/schemas/report.py index 5632dd46..f0d075f6 100644 --- a/src/agents_shipgate/schemas/report.py +++ b/src/agents_shipgate/schemas/report.py @@ -17,6 +17,7 @@ BaselineStatus, Confidence, ProvenanceKind, + ReleaseDecisionStatus, Severity, SourceReference, ) @@ -149,14 +150,11 @@ class BaselineSummary(BaseModel): # v0.8: release_decision block — see docs/STABILITY.md for the -# divergence contract with summary.status (which stays baseline-blind -# for backwards compatibility). -ReleaseDecisionStatus = Literal[ - "blocked", - "review_required", - "insufficient_evidence", - "passed", -] +# divergence contract with summary.status (which stays baseline-blind for +# backwards compatibility). The ``ReleaseDecisionStatus`` verdict vocabulary +# is now defined once in schemas/common.py and imported above, so the report +# summaries, ReleaseConsequence, and the verifier projection all share the +# exact same enum (one decision engine — no re-spelling, no drift). class ReleaseDecisionItem(BaseModel): @@ -386,12 +384,7 @@ class AgentSummary(BaseModel): model_config = ConfigDict(extra="forbid") - verdict: Literal[ - "blocked", - "review_required", - "insufficient_evidence", - "passed", - ] + verdict: ReleaseDecisionStatus headline: str blocker_count: int = 0 review_item_count: int = 0 @@ -464,15 +457,11 @@ class ReviewerSummary(BaseModel): model_config = ConfigDict(extra="forbid") - # Mirror the release verdict for at-a-glance context. Same enum as - # ``AgentSummary.verdict`` so a downstream consumer can switch on - # either block without re-deriving. - verdict: Literal[ - "blocked", - "review_required", - "insufficient_evidence", - "passed", - ] + # Mirror the release verdict for at-a-glance context. The exact same + # ``ReleaseDecisionStatus`` alias as ``AgentSummary.verdict`` and + # ``release_decision.decision`` so a downstream consumer can switch on + # any block without re-deriving — and the vocabulary cannot drift. + verdict: ReleaseDecisionStatus headline: str # Per-lens activity counts. Each is the cheapest "did this lens diff --git a/src/agents_shipgate/schemas/verifier.py b/src/agents_shipgate/schemas/verifier.py index c84191d1..5b9f030b 100644 --- a/src/agents_shipgate/schemas/verifier.py +++ b/src/agents_shipgate/schemas/verifier.py @@ -2,7 +2,9 @@ from typing import Any, Literal -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, model_validator + +from agents_shipgate.schemas.common import ReleaseDecisionStatus VerifierBaseStatus = Literal[ "not_requested", @@ -32,7 +34,14 @@ "none", ] -_DECISION_TO_VERDICT: dict[str, MergeVerdict] = { +# The projection from the canonical release verdict (``ReleaseDecisionStatus``, +# the ONE thing ``build_release_decision`` computes) onto the agent-facing +# ``MergeVerdict``. Keyed with ``ReleaseDecisionStatus`` so a key that is not a +# real release status is a type error, and covered by a totality test +# (tests/test_verdict_contract.py) so adding a release status without a mapping +# fails CI rather than silently falling back. This dict is the only bridge +# between the two vocabularies. +_DECISION_TO_VERDICT: dict[ReleaseDecisionStatus, MergeVerdict] = { "passed": "mergeable", "review_required": "human_review_required", "insufficient_evidence": "insufficient_evidence", @@ -41,10 +50,30 @@ def map_merge_verdict(decision: str | None) -> MergeVerdict: - """Project ``release_decision.decision`` onto a merge verdict.""" + """Project ``release_decision.decision`` onto a merge verdict. + + ``None`` (no head scan / no decision) is ``unknown``. A decision string + outside the canonical vocabulary fails safe to ``human_review_required`` + rather than ``mergeable`` — an unrecognized verdict must never auto-pass. + """ if decision is None: return "unknown" - return _DECISION_TO_VERDICT.get(decision, "human_review_required") + return _DECISION_TO_VERDICT.get(decision, "human_review_required") # type: ignore[arg-type] + + +def merge_verdict_for(*, decision: str | None, head_status: str) -> MergeVerdict: + """Single authority for deriving a ``MergeVerdict`` for a verify run. + + When the head scan produced a ``release_decision`` the verdict is a pure + projection of it (``map_merge_verdict``). With no decision the verdict + reflects *why*: a skipped head (Shipgate had nothing to gate) is + ``mergeable``; any other no-decision state (scan failed, or not yet run) + is ``unknown``. Centralized here so the orchestrator — or any future + caller — cannot invent a second, inconsistent rule. + """ + if decision is not None: + return map_merge_verdict(decision) + return "mergeable" if head_status == "skipped" else "unknown" class VerifierNextAction(BaseModel): @@ -67,6 +96,39 @@ class VerifierHumanReview(BaseModel): why: str | None = None +class VerifierFixTask(BaseModel): + """The single repair task a verify run hands to whoever acts next. + + Routing is deterministic and projected from the head scan — never an LLM + judgment. ``coding_agent`` + ``safe_to_attempt=True`` means the gating + gaps are mechanical (every gating finding is ``autofix_safe``): the agent + may fix them and re-run ``verification_command``. ``human`` + + ``safe_to_attempt=False`` means an authority gap a coding agent must not + invent its way past — missing approval/idempotency evidence, a weakened + policy, or a touched trust root. ``forbidden_shortcuts`` are the + reward-hacking moves that are never acceptable for either actor. + """ + + model_config = ConfigDict(extra="forbid") + + actor: Literal["coding_agent", "human"] + safe_to_attempt: bool + instructions: list[str] = Field(default_factory=list) + forbidden_shortcuts: list[str] = Field(default_factory=list) + verification_command: str | None = None + + @model_validator(mode="after") + def _human_tasks_are_never_agent_safe(self) -> VerifierFixTask: + # The anti-reward-hacking guarantee: an authority gap routed to a + # human can never be marked safe for a coding agent to attempt. + if self.actor == "human" and self.safe_to_attempt: + raise ValueError( + "VerifierFixTask with actor='human' must have " + "safe_to_attempt=False (authority gaps are not agent-safe)." + ) + return self + + class VerifierCapabilityChange(BaseModel): """One reviewer-facing capability change projected for verifier output.""" @@ -139,8 +201,39 @@ class VerifierArtifact(BaseModel): headline: str | None = None human_review: VerifierHumanReview | None = None first_next_action: VerifierNextAction | None = None + fix_task: VerifierFixTask | None = None artifacts: dict[str, str] = Field(default_factory=dict) + @model_validator(mode="after") + def _verdict_projects_release_decision(self) -> VerifierArtifact: + """Lock the one-decision-engine contract structurally. + + Whenever a head ``release_decision`` is present, the agent-facing + ``merge_verdict`` and the convenience ``decision`` copy MUST be exact + projections of it — never an independently computed second opinion. + Construction-time enforcement makes an inconsistent artifact + impossible to emit. (No release_decision — skipped / failed / preview + — is left unconstrained: there is no substrate to project.) + """ + if self.release_decision is None: + return self + substrate = self.release_decision.get("decision") + if self.decision != substrate: + raise ValueError( + "VerifierArtifact.decision must equal " + "release_decision['decision'] (one decision engine): " + f"{self.decision!r} != {substrate!r}" + ) + expected = map_merge_verdict(substrate) + if self.merge_verdict != expected: + raise ValueError( + "VerifierArtifact.merge_verdict must be the projection of " + f"release_decision['decision']={substrate!r} via " + f"map_merge_verdict (expected {expected!r}, got " + f"{self.merge_verdict!r})" + ) + return self + __all__ = [ "CapabilityChangeBucket", @@ -150,8 +243,10 @@ class VerifierArtifact(BaseModel): "VerifierBaseStatus", "VerifierCapabilityChange", "VerifierCapabilityReview", + "VerifierFixTask", "VerifierHeadStatus", "VerifierHumanReview", "VerifierNextAction", "map_merge_verdict", + "merge_verdict_for", ] diff --git a/tests/test_fix_task_contract.py b/tests/test_fix_task_contract.py new file mode 100644 index 00000000..1c382389 --- /dev/null +++ b/tests/test_fix_task_contract.py @@ -0,0 +1,400 @@ +"""PR-B contract: deterministic fix_task routing (mechanical vs authority). + +`fix_task` is the single repair instruction a verify run hands to whoever +acts next. Its routing is a pure projection of the head scan — never a model +judgment — and the split is the product's core safety boundary: a coding +agent may fix mechanical gaps, but an authority gap (approval/idempotency +evidence it cannot prove, a weakened policy, a touched trust root, degraded +evidence) must route to a human so the agent cannot invent its way to green. +""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from agents_shipgate.cli.verify.fix_task import build_fix_task +from agents_shipgate.cli.verify.orchestrator import _first_next_action +from agents_shipgate.schemas.report import ( + AgentSummary, + AgentSummaryAction, + BaselineDelta, + EvidenceCoverageDecision, + FailPolicy, + Finding, + ReadinessReport, + ReleaseDecision, + ReleaseDecisionItem, + ReportSummary, + ToolSurfaceSummary, +) +from agents_shipgate.schemas.verifier import ( + VerifierCapabilityReview, + VerifierFixTask, + map_merge_verdict, +) + + +def _finding( + fid: str, + *, + requires_human_review: bool, + autofix_safe: bool, + recommendation: str = "Add the missing control.", + severity: str = "medium", + blocks_release: bool = False, +) -> Finding: + return Finding( + id=fid, + check_id="SHIP-TEST", + title=f"finding {fid}", + severity=severity, # type: ignore[arg-type] + category="action_surface", + evidence={}, + recommendation=recommendation, + blocks_release=blocks_release, + requires_human_review=requires_human_review, + autofix_safe=autofix_safe, + ) + + +def _item(finding: Finding) -> ReleaseDecisionItem: + return ReleaseDecisionItem( + id=finding.id, + check_id=finding.check_id, + severity=finding.severity, + title=finding.title, + ) + + +def _report(*, decision: str, findings, blockers=(), review_items=()) -> ReadinessReport: + return ReadinessReport( + run_id="r", + project={"name": "p"}, + agent={"name": "a"}, + environment={"target": "local"}, + summary=ReportSummary(status="clean"), + release_decision=ReleaseDecision( + decision=decision, # type: ignore[arg-type] + reason="2 findings require review.", + blockers=[_item(f) for f in blockers], + review_items=[_item(f) for f in review_items], + evidence_coverage=EvidenceCoverageDecision( + level="static", + human_review_recommended=False, + source_warning_count=0, + low_confidence_tool_count=0, + ), + baseline_delta=BaselineDelta(enabled=False), + fail_policy=FailPolicy( + ci_mode="advisory", + fail_on=["critical"], + new_findings_only=False, + would_fail_ci=False, + exit_code=0, + ), + ), + tool_surface=ToolSurfaceSummary(total_tools=0, high_risk_tools=0), + findings=list(findings), + ) + + +def _review(*, policy_weakened=False, trust_root_touched=False) -> VerifierCapabilityReview: + return VerifierCapabilityReview( + policy_weakened=policy_weakened, trust_root_touched=trust_root_touched + ) + + +def _fix_task(report, *, capability_review=None, base_ref="origin/main", head_ref="HEAD"): + decision = report.release_decision.decision + return build_fix_task( + report, + merge_verdict=map_merge_verdict(decision), + capability_review=capability_review or _review(), + base_ref=base_ref, + head_ref=head_ref, + ) + + +# --- Routing ---------------------------------------------------------------- + + +def test_mergeable_has_no_fix_task() -> None: + report = _report(decision="passed", findings=[]) + assert ( + build_fix_task( + report, + merge_verdict="mergeable", + capability_review=_review(), + base_ref="origin/main", + head_ref="HEAD", + ) + is None + ) + + +def test_mechanical_review_routes_to_coding_agent() -> None: + f = _finding( + "F1", + requires_human_review=False, + autofix_safe=True, + recommendation="Add an owner field from CODEOWNERS.", + ) + task = _fix_task(_report(decision="review_required", findings=[f], review_items=[f])) + assert task is not None + assert task.actor == "coding_agent" + assert task.safe_to_attempt is True + assert "Add an owner field from CODEOWNERS." in task.instructions + + +def test_authority_review_routes_to_human() -> None: + f = _finding("F1", requires_human_review=True, autofix_safe=False) + task = _fix_task(_report(decision="review_required", findings=[f], review_items=[f])) + assert task is not None + assert task.actor == "human" + assert task.safe_to_attempt is False + + +def test_blocked_but_mechanical_routes_by_autofix_not_verdict() -> None: + # Routing is by the per-finding autofix_safe signal, not the verdict label. + f = _finding( + "F1", + requires_human_review=False, + autofix_safe=True, + severity="critical", + blocks_release=True, + ) + task = _fix_task(_report(decision="blocked", findings=[f], blockers=[f])) + assert task is not None + assert task.actor == "coding_agent" + + +def test_policy_weakened_forces_human_even_when_mechanical() -> None: + f = _finding("F1", requires_human_review=False, autofix_safe=True) + task = _fix_task( + _report(decision="review_required", findings=[f], review_items=[f]), + capability_review=_review(policy_weakened=True), + ) + assert task is not None + assert task.actor == "human" + assert task.safe_to_attempt is False + assert any("self-approve" in line for line in task.instructions) + + +def test_trust_root_touched_forces_human_even_when_mechanical() -> None: + f = _finding("F1", requires_human_review=False, autofix_safe=True) + task = _fix_task( + _report(decision="review_required", findings=[f], review_items=[f]), + capability_review=_review(trust_root_touched=True), + ) + assert task is not None + assert task.actor == "human" + + +def test_insufficient_evidence_forces_human() -> None: + task = _fix_task(_report(decision="insufficient_evidence", findings=[])) + assert task is not None + assert task.actor == "human" + assert task.safe_to_attempt is False + + +# --- Instructions / guardrails / verification ------------------------------- + + +def test_forbidden_shortcuts_and_verification_command_present() -> None: + f = _finding("F1", requires_human_review=True, autofix_safe=False) + task = _fix_task( + _report(decision="blocked", findings=[f], blockers=[f]), + base_ref="origin/main", + head_ref="HEAD", + ) + assert task is not None + assert task.verification_command == ( + "agents-shipgate verify --base origin/main --head HEAD --json" + ) + assert task.forbidden_shortcuts + assert any("suppress" in shortcut for shortcut in task.forbidden_shortcuts) + + +def test_instructions_are_deduped_and_capped() -> None: + findings = [ + _finding(f"F{i}", requires_human_review=True, autofix_safe=False, recommendation="Same rec.") + for i in range(8) + ] + task = _fix_task(_report(decision="blocked", findings=findings, blockers=findings)) + assert task is not None + assert task.instructions.count("Same rec.") == 1 + assert len(task.instructions) <= 5 + + +# --- Consistency with first_next_action ------------------------------------- + + +def test_first_next_action_actor_matches_fix_task() -> None: + mech = _finding("F1", requires_human_review=False, autofix_safe=True) + mech_task = _fix_task( + _report(decision="review_required", findings=[mech], review_items=[mech]) + ) + assert ( + _first_next_action( + merge_verdict="human_review_required", + fix_task=mech_task, + agent_summary=None, + reason="r", + ).actor + == "coding_agent" + ) + + auth = _finding("F2", requires_human_review=True, autofix_safe=False) + auth_task = _fix_task(_report(decision="blocked", findings=[auth], blockers=[auth])) + assert ( + _first_next_action( + merge_verdict="blocked", + fix_task=auth_task, + agent_summary=None, + reason="r", + ).actor + == "human" + ) + + +# --- VerifierFixTask schema validator (anti-reward-hacking) ------------------ + + +def test_human_fix_task_cannot_be_agent_safe() -> None: + with pytest.raises(ValidationError): + VerifierFixTask(actor="human", safe_to_attempt=True) + + +def test_human_fix_task_unsafe_is_valid() -> None: + VerifierFixTask(actor="human", safe_to_attempt=False) + + +def test_coding_agent_fix_task_safe_is_valid() -> None: + VerifierFixTask(actor="coding_agent", safe_to_attempt=True) + + +# --- Fail-closed routing (review feedback) ---------------------------------- + + +def test_finding_with_unknown_routing_fields_fails_closed_to_human() -> None: + # autofix_safe / requires_human_review default to None on stale, plugin, or + # legacy findings; an unresolved finding must never be marked agent-safe. + f = Finding( + id="F1", + check_id="SHIP-TEST", + title="legacy finding", + severity="medium", + category="action_surface", + evidence={}, + recommendation="Investigate.", + blocks_release=False, + # autofix_safe and requires_human_review left at their None defaults. + ) + task = _fix_task(_report(decision="review_required", findings=[f], review_items=[f])) + assert task is not None + assert task.actor == "human" + assert task.safe_to_attempt is False + + +def test_finding_autofix_false_routes_to_human() -> None: + f = _finding("F1", requires_human_review=False, autofix_safe=False) + task = _fix_task(_report(decision="review_required", findings=[f], review_items=[f])) + assert task is not None + assert task.actor == "human" + + +def test_unknown_verdict_without_report_emits_human_fix_task() -> None: + # No head report (scan failed → unknown) still yields a human fix_task so + # the contract is uniform: every non-mergeable verdict carries one. + task = build_fix_task( + None, + merge_verdict="unknown", + capability_review=None, + base_ref="origin/main", + head_ref="HEAD", + ) + assert task is not None + assert task.actor == "human" + assert task.safe_to_attempt is False + assert task.verification_command is not None + + +def test_mergeable_without_report_has_no_fix_task() -> None: + assert ( + build_fix_task( + None, + merge_verdict="mergeable", + capability_review=None, + base_ref="origin/main", + head_ref="HEAD", + ) + is None + ) + + +def test_verification_command_quotes_shell_metacharacters() -> None: + # ';' is a valid git ref character, so an unquoted command would be + # injectable when an agent or human runs the suggested string. + f = _finding("F1", requires_human_review=True, autofix_safe=False) + task = _fix_task( + _report(decision="blocked", findings=[f], blockers=[f]), + head_ref="foo;rm -rf /", + ) + assert task is not None + assert task.verification_command is not None + assert "--head foo;rm" not in task.verification_command + assert "'foo;rm -rf /'" in task.verification_command + + +# --- first_next_action / fix_task coherence --------------------------------- + + +def test_first_next_action_ignores_recommendation_that_contradicts_fix_task() -> None: + # fix_task says the coding agent can act, but the agent summary points a + # human at a review surface (info kind). The next-action must follow the + # fix_task, not borrow the contradictory recommendation. + mech = _finding("F1", requires_human_review=False, autofix_safe=True) + task = _fix_task( + _report(decision="review_required", findings=[mech], review_items=[mech]) + ) + summary = AgentSummary( + verdict="review_required", + headline="h", + first_recommended_action=AgentSummaryAction( + kind="info", command=None, why="A human must look at this." + ), + ) + action = _first_next_action( + merge_verdict="human_review_required", + fix_task=task, + agent_summary=summary, + reason="r", + ) + assert action.actor == "coding_agent" + assert action.why != "A human must look at this." + assert action.command == task.verification_command + + +def test_first_next_action_borrows_consistent_recommendation() -> None: + mech = _finding("F1", requires_human_review=False, autofix_safe=True) + task = _fix_task( + _report(decision="review_required", findings=[mech], review_items=[mech]) + ) + summary = AgentSummary( + verdict="review_required", + headline="h", + first_recommended_action=AgentSummaryAction( + kind="command", + command="agents-shipgate apply-patches --confidence high --apply", + why="Apply the safe manifest patches.", + ), + ) + action = _first_next_action( + merge_verdict="human_review_required", + fix_task=task, + agent_summary=summary, + reason="r", + ) + assert action.actor == "coding_agent" + assert action.command == "agents-shipgate apply-patches --confidence high --apply" diff --git a/tests/test_verdict_contract.py b/tests/test_verdict_contract.py new file mode 100644 index 00000000..11a5883a --- /dev/null +++ b/tests/test_verdict_contract.py @@ -0,0 +1,167 @@ +"""PR-A contract lock: one verdict engine, one drift-proof projection. + +These tests are the structural guarantee behind the "one decision engine" +discipline. ``build_release_decision()`` computes the canonical +``ReleaseDecisionStatus``; everything else — the report summary blocks +(``AgentSummary`` / ``ReviewerSummary`` / ``VerifierSummary``), the +``ReleaseConsequence`` lens, and the agent-facing ``MergeVerdict`` in +verifier.json — must be a *projection* of it that cannot silently drift. If +any of these fail, two parts of the system can disagree about whether a PR +can merge, which is exactly the failure this product exists to prevent. +""" + +from __future__ import annotations + +from typing import get_args + +import pytest +from pydantic import ValidationError + +from agents_shipgate.schemas.capability_change import VerifierSummary, VerifierVerdict +from agents_shipgate.schemas.common import ReleaseDecisionStatus +from agents_shipgate.schemas.report import ( + AgentSummary, + ReleaseConsequence, + ReleaseDecision, + ReviewerSummary, +) +from agents_shipgate.schemas.verifier import ( + _DECISION_TO_VERDICT, + VerifierArtifact, + map_merge_verdict, + merge_verdict_for, +) + +CANONICAL = set(get_args(ReleaseDecisionStatus)) + + +# --- One vocabulary: every verdict surface shares the canonical enum -------- + + +def test_canonical_vocabulary_is_the_expected_four() -> None: + assert CANONICAL == { + "blocked", + "review_required", + "insufficient_evidence", + "passed", + } + + +@pytest.mark.parametrize( + "model, field", + [ + (ReleaseDecision, "decision"), + (ReleaseConsequence, "decision"), + (AgentSummary, "verdict"), + (ReviewerSummary, "verdict"), + (VerifierSummary, "verdict"), + ], +) +def test_every_verdict_field_uses_the_canonical_enum(model, field) -> None: + members = set(get_args(model.model_fields[field].annotation)) + assert members == CANONICAL, ( + f"{model.__name__}.{field} re-spells the verdict vocabulary instead " + "of reusing ReleaseDecisionStatus; the enums can now drift apart." + ) + + +def test_verifier_verdict_alias_is_the_canonical_enum() -> None: + assert set(get_args(VerifierVerdict)) == CANONICAL + + +# --- The projection is total and drift-proof -------------------------------- + + +def test_projection_is_total_over_release_status() -> None: + for status in get_args(ReleaseDecisionStatus): + assert status in _DECISION_TO_VERDICT, ( + f"release status {status!r} has no explicit MergeVerdict mapping; " + "add it to _DECISION_TO_VERDICT (do not rely on the fail-safe " + "fallback for a known status)." + ) + + +def test_projection_table_is_pinned() -> None: + # Pin the exact bridge so changing a mapping is a deliberate, reviewed edit. + assert _DECISION_TO_VERDICT == { + "passed": "mergeable", + "review_required": "human_review_required", + "insufficient_evidence": "insufficient_evidence", + "blocked": "blocked", + } + + +def test_map_merge_verdict_none_is_unknown() -> None: + assert map_merge_verdict(None) == "unknown" + + +def test_map_merge_verdict_unknown_status_fails_safe_not_mergeable() -> None: + # An out-of-contract decision string must never auto-pass. + assert map_merge_verdict("definitely-not-a-status") == "human_review_required" + + +@pytest.mark.parametrize( + "decision, head_status, expected", + [ + ("passed", "succeeded", "mergeable"), + ("review_required", "succeeded", "human_review_required"), + ("insufficient_evidence", "succeeded", "insufficient_evidence"), + ("blocked", "succeeded", "blocked"), + (None, "skipped", "mergeable"), # nothing to gate + (None, "succeeded", "unknown"), # ran but produced no decision + (None, "failed", "unknown"), # scan failed + ], +) +def test_merge_verdict_for_matrix(decision, head_status, expected) -> None: + assert merge_verdict_for(decision=decision, head_status=head_status) == expected + + +# --- The artifact cannot disagree with its substrate (structural lock) ------ + + +def _artifact(**overrides) -> VerifierArtifact: + base: dict = { + "workspace": "/tmp/w", + "config": "shipgate.yaml", + "head_status": "succeeded", + } + base.update(overrides) + return VerifierArtifact(**base) + + +@pytest.mark.parametrize("status", sorted(CANONICAL)) +def test_consistent_artifact_is_accepted(status) -> None: + art = _artifact( + release_decision={"decision": status}, + decision=status, + merge_verdict=map_merge_verdict(status), + ) + assert art.merge_verdict == map_merge_verdict(status) + + +def test_artifact_rejects_merge_verdict_inconsistent_with_decision() -> None: + with pytest.raises(ValidationError): + _artifact( + release_decision={"decision": "blocked"}, + decision="blocked", + merge_verdict="mergeable", # lie: blocked must project to "blocked" + ) + + +def test_artifact_rejects_top_level_decision_mismatch() -> None: + with pytest.raises(ValidationError): + _artifact( + release_decision={"decision": "passed"}, + decision="blocked", # disagrees with the substrate + merge_verdict="mergeable", + ) + + +def test_artifact_without_release_decision_is_unconstrained() -> None: + # preview / skipped / failed paths have no substrate to project. + art = _artifact(release_decision=None, merge_verdict="unknown") + assert art.merge_verdict == "unknown" + art2 = _artifact( + release_decision=None, head_status="skipped", merge_verdict="mergeable" + ) + assert art2.merge_verdict == "mergeable"