Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions docs/verifier-schema.v0.1.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,56 @@
"title": "VerifierCapabilityReview",
"type": "object"
},
"VerifierFixTask": {
"additionalProperties": false,
"description": "The single repair task a verify run hands to whoever acts next.\n\nRouting is deterministic and projected from the head scan \u2014 never an LLM\njudgment. ``coding_agent`` + ``safe_to_attempt=True`` means the gating\ngaps are mechanical (every gating finding is ``autofix_safe``): the agent\nmay fix them and re-run ``verification_command``. ``human`` +\n``safe_to_attempt=False`` means an authority gap a coding agent must not\ninvent its way past \u2014 missing approval/idempotency evidence, a weakened\npolicy, or a touched trust root. ``forbidden_shortcuts`` are the\nreward-hacking moves that are never acceptable for either actor.",
"properties": {
"actor": {
"enum": [
"coding_agent",
"human"
],
"title": "Actor",
"type": "string"
},
"forbidden_shortcuts": {
"items": {
"type": "string"
},
"title": "Forbidden Shortcuts",
"type": "array"
},
"instructions": {
"items": {
"type": "string"
},
"title": "Instructions",
"type": "array"
},
"safe_to_attempt": {
"title": "Safe To Attempt",
"type": "boolean"
},
"verification_command": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Verification Command"
}
},
"required": [
"actor",
"safe_to_attempt"
],
"title": "VerifierFixTask",
"type": "object"
},
"VerifierHumanReview": {
"additionalProperties": false,
"description": "Whether a human must review before merge, and why.",
Expand Down Expand Up @@ -331,6 +381,17 @@
],
"default": null
},
"fix_task": {
"anyOf": [
{
"$ref": "#/$defs/VerifierFixTask"
},
{
"type": "null"
}
],
"default": null
},
"head_exit_code": {
"default": 0,
"title": "Head Exit Code",
Expand Down
174 changes: 174 additions & 0 deletions src/agents_shipgate/cli/verify/fix_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Deterministic ``fix_task`` projection for ``agents-shipgate verify``.

The fix_task is the single repair instruction a verify run hands to whoever
must act next. It is a pure projection of the head scan: *who* acts and
*whether it is safe* is decided by the release decision plus the per-finding
``autofix_safe`` / ``requires_human_review`` signals — never by a model. The
agent route fires only when every gating finding is mechanically fixable; any
authority gap (missing approval/idempotency evidence, a weakened policy, a
touched trust root, or degraded evidence) routes to a human so a coding agent
cannot invent its way past it.
"""

from __future__ import annotations

import shlex

from agents_shipgate.schemas.report import Finding, ReadinessReport
from agents_shipgate.schemas.verifier import (
MergeVerdict,
VerifierCapabilityReview,
VerifierFixTask,
)

# Reward-hacking moves that are never acceptable, for either actor. Kept in
# sync with the PR-comment guardrail language (cli/verify/pr_comment.py).
FORBIDDEN_SHORTCUTS: tuple[str, ...] = (
"Do not suppress the finding (checks.ignore in shipgate.yaml).",
"Do not lower severity or add a waiver just to pass the gate.",
"Do not invent or assume approval, idempotency, or audit evidence you "
"cannot prove from the code.",
"Do not weaken the release policy, CI gate, or agent instructions that "
"evaluate this change.",
)

_MAX_INSTRUCTIONS = 5


def build_fix_task(
report: ReadinessReport | None,
*,
merge_verdict: MergeVerdict,
capability_review: VerifierCapabilityReview | None,
base_ref: str | None,
head_ref: str,
) -> VerifierFixTask | None:
"""Project the head scan onto a single repair task.

Returns ``None`` when there is nothing to fix (mergeable, or no head
release decision to reason about).
"""
if merge_verdict == "mergeable":
return None

verification_command = _verification_command(base_ref, head_ref)

# No completed head decision (scan skipped/failed → ``unknown``) but the PR
# is not mergeable: there are no findings to route on, so fail closed to a
# human who must investigate why the scan did not complete. Emitting a task
# here (rather than None) keeps the contract uniform — every non-mergeable
# verdict carries a fix_task.
if report is None or report.release_decision is None or capability_review is None:
return VerifierFixTask(
actor="human",
safe_to_attempt=False,
instructions=[
"Shipgate could not produce a release decision for this PR; a "
"human must investigate why the scan did not complete and "
"re-run before merge."
],
forbidden_shortcuts=list(FORBIDDEN_SHORTCUTS),
verification_command=verification_command,
)

gating = _gating_findings(report)

# The coding-agent route is the only non-human outcome and it MUST fail
# closed: every gating finding has to be explicitly mechanical
# (``autofix_safe is True`` AND ``requires_human_review is False``). A
# finding whose routing fields are ``None``/``False`` — stale, plugin, or
# legacy — is treated as an authority gap and never silently marked
# agent-safe.
mechanical = bool(gating) and all(
finding.autofix_safe is True and finding.requires_human_review is False
for finding in gating
)
authority_escalation = (
capability_review.policy_weakened
or capability_review.trust_root_touched
or merge_verdict in {"insufficient_evidence", "unknown"}
)
if mechanical and not authority_escalation:
return VerifierFixTask(
actor="coding_agent",
safe_to_attempt=True,
instructions=_mechanical_instructions(gating),
forbidden_shortcuts=list(FORBIDDEN_SHORTCUTS),
verification_command=verification_command,
)

return VerifierFixTask(
actor="human",
safe_to_attempt=False,
instructions=_human_instructions(report, capability_review, gating),
forbidden_shortcuts=list(FORBIDDEN_SHORTCUTS),
verification_command=verification_command,
)


def _gating_findings(report: ReadinessReport) -> list[Finding]:
"""The active findings driving blockers / review_items, in decision order."""
decision = report.release_decision
assert decision is not None # guarded by build_fix_task
by_id = {f.id: f for f in report.findings if f.id}
by_fingerprint = {f.fingerprint: f for f in report.findings if f.fingerprint}
out: list[Finding] = []
seen: set[int] = set()
for item in [*decision.blockers, *decision.review_items]:
finding = (by_id.get(item.id) if item.id else None) or (
by_fingerprint.get(item.fingerprint) if item.fingerprint else None
)
if finding is not None and id(finding) not in seen:
out.append(finding)
seen.add(id(finding))
return out


def _human_instructions(
report: ReadinessReport,
capability_review: VerifierCapabilityReview,
gating: list[Finding],
) -> list[str]:
decision = report.release_decision
assert decision is not None
out: list[str] = [decision.reason]
if capability_review.policy_weakened:
out.append(
"A human must approve the release-policy change in this PR; the "
"coding agent that made the change cannot self-approve it."
)
if capability_review.trust_root_touched:
out.append(
"A human must review the touched release trust root (manifest, CI "
"gate, agent instructions, or trigger catalog) before merge."
)
# List every gating finding's recommendation — a human-routed task owns the
# whole decision, including findings whose routing fields were ambiguous.
out.extend(finding.recommendation for finding in gating if finding.recommendation)
return _dedupe_cap(out)


def _mechanical_instructions(gating: list[Finding]) -> list[str]:
return _dedupe_cap([finding.recommendation for finding in gating if finding.recommendation])


def _dedupe_cap(items: list[str]) -> list[str]:
seen: set[str] = set()
out: list[str] = []
for item in items:
if item and item not in seen:
seen.add(item)
out.append(item)
return out[:_MAX_INSTRUCTIONS]


def _verification_command(base_ref: str | None, head_ref: str) -> str:
# Refs come from CLI / GitHub branch inputs and a valid git ref may contain
# shell metacharacters (e.g. ``;``); quote them so the emitted command is
# safe to run when an agent or human copies it verbatim.
base = shlex.quote(base_ref or "origin/main")
head = shlex.quote(head_ref or "HEAD")
return f"agents-shipgate verify --base {base} --head {head} --json"


__all__ = ["FORBIDDEN_SHORTCUTS", "build_fix_task"]
62 changes: 45 additions & 17 deletions src/agents_shipgate/cli/verify/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
MergeVerdict,
VerifierArtifact,
VerifierBaseStatus,
VerifierFixTask,
VerifierHumanReview,
VerifierNextAction,
map_merge_verdict,
merge_verdict_for,
)
from agents_shipgate.triggers import evaluate

from .capability_review import build_capability_review
from .fix_task import build_fix_task
from .git import (
archive_tree,
diff_context,
Expand Down Expand Up @@ -513,12 +515,6 @@ def _map_optional_tree_path(
return tree_dir / relative


def _merge_verdict(*, decision: str | None, head_status: str) -> MergeVerdict:
if decision is not None:
return map_merge_verdict(decision)
return "mergeable" if head_status == "skipped" else "unknown"


def _can_merge_without_human(
*, merge_verdict: MergeVerdict, release_decision: ReleaseDecision | None
) -> bool:
Expand Down Expand Up @@ -552,7 +548,7 @@ def _human_review(
def _first_next_action(
*,
merge_verdict: MergeVerdict,
human_review_required: bool,
fix_task: VerifierFixTask | None,
agent_summary: AgentSummary | None,
reason: str | None,
) -> VerifierNextAction:
Expand All @@ -563,17 +559,40 @@ def _first_next_action(
command=None,
why="No agent-capability changes gate this PR; safe to merge.",
)
actor = "human" if human_review_required else "coding_agent"
# The fix_task is the single repair contract; the headline next-step must
# not contradict it. Borrow the agent summary's concrete action (e.g. an
# apply-patches command) only when its implied actor agrees with the
# fix_task routing — otherwise derive the pointer from the fix_task so that
# actor, command, and why all come from one source.
actor = fix_task.actor if fix_task is not None else "human"
recommended = (
agent_summary.first_recommended_action if agent_summary is not None else None
)
if recommended is not None:
return VerifierNextAction(
actor=actor,
kind=recommended.kind,
command=recommended.command,
why=recommended.why,
# The PR comment infers a recommendation's actor the same way: a
# runnable command implies the coding agent, an info note a human.
recommended_actor = "coding_agent" if recommended.kind == "command" else "human"
if fix_task is None or recommended_actor == actor:
return VerifierNextAction(
actor=actor,
kind=recommended.kind,
command=recommended.command,
why=recommended.why,
)
if fix_task is not None:
why = (
fix_task.instructions[0]
if fix_task.instructions
else (reason or "Human review required before merge.")
)
if actor == "coding_agent":
return VerifierNextAction(
actor=actor,
kind="command",
command=fix_task.verification_command,
why=why,
)
return VerifierNextAction(actor=actor, kind="review", command=None, why=why)
return VerifierNextAction(
actor=actor,
kind="review",
Expand Down Expand Up @@ -642,11 +661,19 @@ def _build_verifier(
include_scan_artifacts=report is not None,
)
decision = release_decision_model.decision if release_decision_model else None
merge_verdict = _merge_verdict(decision=decision, head_status=head_status)
merge_verdict = merge_verdict_for(decision=decision, head_status=head_status)
human_review = _human_review(
merge_verdict=merge_verdict, release_decision=release_decision_model
)
agent_summary_model = report.agent_summary if report is not None else None
capability_review = build_capability_review(report) if report is not None else None
fix_task = build_fix_task(
report,
merge_verdict=merge_verdict,
capability_review=capability_review,
base_ref=base,
head_ref=head,
)
return VerifierArtifact(
workspace=str(git_root),
config=_display_path(config_path, git_root),
Expand Down Expand Up @@ -675,7 +702,7 @@ def _build_verifier(
if report is not None and report.reviewer_summary is not None
else None
),
capability_review=build_capability_review(report) if report is not None else {},
capability_review=capability_review if capability_review is not None else {},
mode=_verifier_mode(
ci_mode=ci_mode,
report=report,
Expand All @@ -696,10 +723,11 @@ def _build_verifier(
human_review=human_review,
first_next_action=_first_next_action(
merge_verdict=merge_verdict,
human_review_required=human_review.required,
fix_task=fix_task,
agent_summary=agent_summary_model,
reason=release_decision_model.reason if release_decision_model else None,
),
fix_task=fix_task,
artifacts=artifacts,
)

Expand Down
Loading