Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ never substitute its own thread ID.
| `deploybot integrate [--all]` | Scaffold a cumulative integration PR for overlap groups, or the whole frozen batch with `--all`. |
| `deploybot follow [--timeout SECONDS] [--poll SECONDS] [--json]` | Follow the newest exact base-branch head through CI, deployment, and HTTP verification. Defaults: 1800-second timeout and 10-second poll. |
| `deploybot pause --reason TEXT` | Pause merging after a delivery failure. |
| `deploybot unpause --sha SHA --control-id ID` | Conditionally resume the matching failed release after fresh status revalidation and verified repair; a running record can clear only that unique pause, so changed control or advanced main fails closed. The original deploy instruction remains sufficient unless rollback, bypass, or mismatched recovery expands authority. |
| `deploybot unpause --sha SHA --control-id ID [--follow] [--dispatch-ci] [--timeout SECONDS] [--no-wake]` | Conditionally resume the matching failed release after fresh status revalidation and verified repair; a running record can clear only that unique pause, so changed control or advanced main fails closed. The recovery records the exact SHA and reason it resumed so a stale reread of the same failure cannot re-pause it. After recording the recovery, DeployBot immediately reacts so the elected repair merges without waiting for the next event or the five-minute sweep; `--no-wake` records the recovery only, and `--follow`/`--dispatch-ci`/`--timeout` shape that wake-up reaction. The original deploy instruction remains sufficient unless rollback, bypass, or mismatched recovery expands authority. |
| `deploybot claim-release-repair --provider CLIENT --thread-id ID [--thread-url URL] [--sha SHA]` | Atomically claim the owner-encoded deterministic repair branch for the current failed exact-main release. Other threads recover the same owner from the ref instead of creating duplicate repair PRs. |

Only a configured coordinator should run these operations. `react
Expand Down
57 changes: 56 additions & 1 deletion src/agent_merge_queue/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5456,6 +5456,16 @@ def command_react(
if not isinstance(workflow_runs, list):
workflow_runs = []
current_main_sha = client.base_sha()
# A failed release that an operator explicitly unpaused is being
# replaced, not verified. Its only fix is the elected repair, whose
# merge advances main past the failed SHA. Holding admission for that
# exact SHA would strand the repair behind a release that can never
# turn green, so let it drain while the recovery owns this main.
recovering_current_main = (
control.get("state") == "running"
and bool(control.get("recovered_main_sha"))
and control.get("recovered_main_sha") == current_main_sha
)
release_before_merge = release_state(
main_sha=current_main_sha,
runs=workflow_runs,
Expand Down Expand Up @@ -5527,7 +5537,7 @@ def command_react(
release_is_verified = release_before_merge["state"] == "verified"
if release_is_verified:
client.record_verified_main(current_main_sha)
if has_release_owner and not release_is_verified:
if has_release_owner and not release_is_verified and not recovering_current_main:
release = release_before_merge
if follow:
release = command_follow(
Expand Down Expand Up @@ -5786,6 +5796,10 @@ def command_unpause(
*,
main_sha: str,
control_id: str,
wake: bool = True,
follow: bool = False,
timeout_seconds: int = 1800,
dispatch_ci: bool = False,
) -> None:
control = client.pipeline_control()
if control.get("state") != "paused":
Expand Down Expand Up @@ -5830,6 +5844,23 @@ def command_unpause(
# verification; binding `running` to the failed SHA forever would instead
# re-pause the repair merge and strand takeover workers.
print(f"DeployBot pipeline is running for recovered main {main_sha}")
if not wake:
return
# The recovery control is a registry comment, which does not raise a
# coordinator wake-up event. Drive one reaction now so the elected repair
# merges immediately instead of idling until the next delivery event or the
# five-minute reconciliation sweep. The recovery is already durable, so a
# transient reaction error must not strand it; the normal event flow still
# completes the merge on the next wake-up.
try:
command_react(
client,
follow=follow,
timeout_seconds=timeout_seconds,
dispatch_ci=dispatch_ci,
)
except QueueError as error:
print(f"recovery wake-up reaction deferred to the next event: {error}")


def command_claim_release_repair(
Expand Down Expand Up @@ -6018,6 +6049,24 @@ def build_parser() -> argparse.ArgumentParser:
)
unpause.add_argument("--sha", required=True, dest="main_sha")
unpause.add_argument("--control-id", required=True)
unpause.add_argument(
"--no-wake",
action="store_false",
dest="wake",
help="only record the recovery; do not react immediately",
)
unpause.add_argument(
"--follow",
action="store_true",
help="follow the recovered main through deployment in the wake reaction",
)
unpause.add_argument(
"--dispatch-ci",
action="store_true",
dest="dispatch_ci",
help="dispatch configured CI after the wake reaction merges the repair",
)
unpause.add_argument("--timeout", type=int, default=1800)
claim_repair = subparsers.add_parser(
"claim-release-repair",
help="atomically claim ownership of the current failed release",
Expand Down Expand Up @@ -6143,10 +6192,16 @@ def main(argv: list[str] | None = None) -> int:
elif arguments.command == "pause":
command_control(client, state="paused", reason=arguments.reason)
elif arguments.command == "unpause":
if arguments.timeout < 1:
raise QueueError("--timeout must be positive")
command_unpause(
client,
main_sha=arguments.main_sha,
control_id=arguments.control_id,
wake=arguments.wake,
follow=arguments.follow,
timeout_seconds=arguments.timeout,
dispatch_ci=arguments.dispatch_ci,
)
elif arguments.command == "claim-release-repair":
command_claim_release_repair(
Expand Down
44 changes: 43 additions & 1 deletion src/agent_merge_queue/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ def latest_control(
and state.get("control_id") == requires_control_id
):
continue
if not requires_control_id and _restates_recovered_failure(state, value):
# A lagging worker (for example a workflow pinned to an older
# release) can reread the already-recovered failure and write a
# fresh unconditional pause for the same SHA. Preserve the active
# recovery so the elected repair can still merge instead of
# latching the pipeline back onto the failed release.
continue
state = value
continue
if value.get("state") != "running":
Expand All @@ -177,10 +184,45 @@ def latest_control(
state.get("state") == "paused"
and state.get("control_id") == resumed_control_id
):
state = value
state = _resume_with_recovery_context(state, value)
return state


def _resume_with_recovery_context(
resumed_pause: dict[str, Any], resume: dict[str, Any]
) -> dict[str, Any]:
"""Record which failure a recovery resumed so a stale re-pause cannot win."""
recovery = dict(resume)
if "recovered_main_sha" not in recovery and resumed_pause.get("main_sha"):
recovery["recovered_main_sha"] = resumed_pause["main_sha"]
if "recovered_reason" not in recovery and resumed_pause.get("reason"):
recovery["recovered_reason"] = resumed_pause["reason"]
return recovery


def _restates_recovered_failure(
state: dict[str, Any], pause: dict[str, Any]
) -> bool:
"""True when an unconditional pause merely repeats an already-recovered failure.

A recovery (the running record an unpause writes) records the exact SHA and
failure reason it resumed. The original failing CI result lingers until the
repair merges, so any worker that rereads it produces a byte-identical pause
for that same SHA and reason. That restatement is stale and must not clear
the recovery; a genuinely new failure (a different SHA or a different
reason such as a later deploy failure) still pauses normally.
"""
recovered_main_sha = state.get("recovered_main_sha")
recovered_reason = state.get("recovered_reason")
return bool(
state.get("state") == "running"
and recovered_main_sha
and recovered_reason
and pause.get("main_sha") == recovered_main_sha
and pause.get("reason") == recovered_reason
)


@dataclass(frozen=True)
class ThreadRecord:
provider: str
Expand Down
Loading