From 1479342e6bbc6d8643accb926333b46e230c5365 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 22 Jun 2026 18:48:29 +0000 Subject: [PATCH] Fix DeployBot recovery deadlock from stale re-pause A failed exact-main release that an operator explicitly unpaused could never land its repair. The original CI failure lingers on the failed SHA until the repair merges, so any coordinator run (especially a workflow pinned to an older release) rereads that result, writes a fresh, byte identical pause, and overwrites the running recovery. The repair then sits behind a release that can only turn green once the repair merges. Three coupled fixes break the deadlock: - records.latest_control: a recovery now carries the exact SHA and reason it resumed, and an unconditional pause that merely restates that same already-recovered failure is ignored. A genuinely new failure (a different SHA, or a different reason such as a later deploy failure) still pauses normally, so concurrent-pause ownership is preserved. - command_react: while a recovery owns the current failed main, the release-admission fence no longer holds that SHA. The elected repair drains and advances main past the failed revision; the new main is then followed and verified normally. - command_unpause: after recording the durable recovery, DeployBot reacts immediately so the repair merges without waiting for the next delivery event or the five-minute reconciliation sweep. --no-wake opts out, and --follow/--dispatch-ci/--timeout shape the wake-up reaction. The recovery is durable, so a transient wake-up error is reported but never re-pauses the pipeline. Adds regressions covering the stale re-pause race, the reactor merging a repair during recovery, the scoping of that bypass, and the unpause wake. Co-authored-by: mberman84 --- docs/reference.md | 2 +- src/agent_merge_queue/cli.py | 57 ++++++- src/agent_merge_queue/records.py | 44 +++++- tests/test_cli.py | 261 ++++++++++++++++++++++++++++++- 4 files changed, 360 insertions(+), 4 deletions(-) diff --git a/docs/reference.md b/docs/reference.md index 6a6d078..b79e26a 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -61,7 +61,7 @@ never substitute its own thread ID. | `deploybot integrate [--all]` | Scaffold a cumulative integration PR for overlap groups, or the whole frozen batch with `--all`. | | `deploybot follow [--timeout SECONDS] [--poll SECONDS] [--json]` | Follow the newest exact base-branch head through CI, deployment, and HTTP verification. Defaults: 1800-second timeout and 10-second poll. | | `deploybot pause --reason TEXT` | Pause merging after a delivery failure. | -| `deploybot unpause --sha SHA --control-id ID` | Conditionally resume the matching failed release after fresh status revalidation and verified repair; a running record can clear only that unique pause, so changed control or advanced main fails closed. The original deploy instruction remains sufficient unless rollback, bypass, or mismatched recovery expands authority. | +| `deploybot unpause --sha SHA --control-id ID [--follow] [--dispatch-ci] [--timeout SECONDS] [--no-wake]` | Conditionally resume the matching failed release after fresh status revalidation and verified repair; a running record can clear only that unique pause, so changed control or advanced main fails closed. The recovery records the exact SHA and reason it resumed so a stale reread of the same failure cannot re-pause it. After recording the recovery, DeployBot immediately reacts so the elected repair merges without waiting for the next event or the five-minute sweep; `--no-wake` records the recovery only, and `--follow`/`--dispatch-ci`/`--timeout` shape that wake-up reaction. The original deploy instruction remains sufficient unless rollback, bypass, or mismatched recovery expands authority. | | `deploybot claim-release-repair --provider CLIENT --thread-id ID [--thread-url URL] [--sha SHA]` | Atomically claim the owner-encoded deterministic repair branch for the current failed exact-main release. Other threads recover the same owner from the ref instead of creating duplicate repair PRs. | Only a configured coordinator should run these operations. `react diff --git a/src/agent_merge_queue/cli.py b/src/agent_merge_queue/cli.py index cac0e07..711d54a 100755 --- a/src/agent_merge_queue/cli.py +++ b/src/agent_merge_queue/cli.py @@ -5456,6 +5456,16 @@ def command_react( if not isinstance(workflow_runs, list): workflow_runs = [] current_main_sha = client.base_sha() + # A failed release that an operator explicitly unpaused is being + # replaced, not verified. Its only fix is the elected repair, whose + # merge advances main past the failed SHA. Holding admission for that + # exact SHA would strand the repair behind a release that can never + # turn green, so let it drain while the recovery owns this main. + recovering_current_main = ( + control.get("state") == "running" + and bool(control.get("recovered_main_sha")) + and control.get("recovered_main_sha") == current_main_sha + ) release_before_merge = release_state( main_sha=current_main_sha, runs=workflow_runs, @@ -5527,7 +5537,7 @@ def command_react( release_is_verified = release_before_merge["state"] == "verified" if release_is_verified: client.record_verified_main(current_main_sha) - if has_release_owner and not release_is_verified: + if has_release_owner and not release_is_verified and not recovering_current_main: release = release_before_merge if follow: release = command_follow( @@ -5786,6 +5796,10 @@ def command_unpause( *, main_sha: str, control_id: str, + wake: bool = True, + follow: bool = False, + timeout_seconds: int = 1800, + dispatch_ci: bool = False, ) -> None: control = client.pipeline_control() if control.get("state") != "paused": @@ -5830,6 +5844,23 @@ def command_unpause( # verification; binding `running` to the failed SHA forever would instead # re-pause the repair merge and strand takeover workers. print(f"DeployBot pipeline is running for recovered main {main_sha}") + if not wake: + return + # The recovery control is a registry comment, which does not raise a + # coordinator wake-up event. Drive one reaction now so the elected repair + # merges immediately instead of idling until the next delivery event or the + # five-minute reconciliation sweep. The recovery is already durable, so a + # transient reaction error must not strand it; the normal event flow still + # completes the merge on the next wake-up. + try: + command_react( + client, + follow=follow, + timeout_seconds=timeout_seconds, + dispatch_ci=dispatch_ci, + ) + except QueueError as error: + print(f"recovery wake-up reaction deferred to the next event: {error}") def command_claim_release_repair( @@ -6018,6 +6049,24 @@ def build_parser() -> argparse.ArgumentParser: ) unpause.add_argument("--sha", required=True, dest="main_sha") unpause.add_argument("--control-id", required=True) + unpause.add_argument( + "--no-wake", + action="store_false", + dest="wake", + help="only record the recovery; do not react immediately", + ) + unpause.add_argument( + "--follow", + action="store_true", + help="follow the recovered main through deployment in the wake reaction", + ) + unpause.add_argument( + "--dispatch-ci", + action="store_true", + dest="dispatch_ci", + help="dispatch configured CI after the wake reaction merges the repair", + ) + unpause.add_argument("--timeout", type=int, default=1800) claim_repair = subparsers.add_parser( "claim-release-repair", help="atomically claim ownership of the current failed release", @@ -6143,10 +6192,16 @@ def main(argv: list[str] | None = None) -> int: elif arguments.command == "pause": command_control(client, state="paused", reason=arguments.reason) elif arguments.command == "unpause": + if arguments.timeout < 1: + raise QueueError("--timeout must be positive") command_unpause( client, main_sha=arguments.main_sha, control_id=arguments.control_id, + wake=arguments.wake, + follow=arguments.follow, + timeout_seconds=arguments.timeout, + dispatch_ci=arguments.dispatch_ci, ) elif arguments.command == "claim-release-repair": command_claim_release_repair( diff --git a/src/agent_merge_queue/records.py b/src/agent_merge_queue/records.py index e4c0ea5..62ababa 100644 --- a/src/agent_merge_queue/records.py +++ b/src/agent_merge_queue/records.py @@ -158,6 +158,13 @@ def latest_control( and state.get("control_id") == requires_control_id ): continue + if not requires_control_id and _restates_recovered_failure(state, value): + # A lagging worker (for example a workflow pinned to an older + # release) can reread the already-recovered failure and write a + # fresh unconditional pause for the same SHA. Preserve the active + # recovery so the elected repair can still merge instead of + # latching the pipeline back onto the failed release. + continue state = value continue if value.get("state") != "running": @@ -177,10 +184,45 @@ def latest_control( state.get("state") == "paused" and state.get("control_id") == resumed_control_id ): - state = value + state = _resume_with_recovery_context(state, value) return state +def _resume_with_recovery_context( + resumed_pause: dict[str, Any], resume: dict[str, Any] +) -> dict[str, Any]: + """Record which failure a recovery resumed so a stale re-pause cannot win.""" + recovery = dict(resume) + if "recovered_main_sha" not in recovery and resumed_pause.get("main_sha"): + recovery["recovered_main_sha"] = resumed_pause["main_sha"] + if "recovered_reason" not in recovery and resumed_pause.get("reason"): + recovery["recovered_reason"] = resumed_pause["reason"] + return recovery + + +def _restates_recovered_failure( + state: dict[str, Any], pause: dict[str, Any] +) -> bool: + """True when an unconditional pause merely repeats an already-recovered failure. + + A recovery (the running record an unpause writes) records the exact SHA and + failure reason it resumed. The original failing CI result lingers until the + repair merges, so any worker that rereads it produces a byte-identical pause + for that same SHA and reason. That restatement is stale and must not clear + the recovery; a genuinely new failure (a different SHA or a different + reason such as a later deploy failure) still pauses normally. + """ + recovered_main_sha = state.get("recovered_main_sha") + recovered_reason = state.get("recovered_reason") + return bool( + state.get("state") == "running" + and recovered_main_sha + and recovered_reason + and pause.get("main_sha") == recovered_main_sha + and pause.get("reason") == recovered_reason + ) + + @dataclass(frozen=True) class ThreadRecord: provider: str diff --git a/tests/test_cli.py b/tests/test_cli.py index 2fef07c..d7301df 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2168,6 +2168,95 @@ def test_reactor_ignores_rerun_after_same_main_was_verified(self) -> None: self.assertNotEqual(result.get("state"), "release-held") promote.assert_called_once() + def test_reactor_merges_repair_while_recovering_failed_main(self) -> None: + # Regression: after an operator unpauses a failed main, the reactor must + # let the elected repair drain even though the current main CI is still + # red. Holding admission for the recovered SHA would strand the repair + # behind a release that can only go green once that repair merges. + sha = "79cbedf65707fe2cbb0de92d7a933f45e550cc44" + client = Mock() + client.config = CONFIG + client.pipeline_control.return_value = { + "state": "running", + "control_id": "resume-1", + "resumes_control_id": "pause-1", + "recovered_main_sha": sha, + } + client.base_sha.return_value = sha + client.verified_main_sha.return_value = "f" * 40 + client.workflow_runs.return_value = [ + { + "id": 7, + "name": "CI", + "head_sha": sha, + "status": "completed", + "conclusion": "failure", + } + ] + frozen = FreezeResult(None, [], [], [], []) + with ( + patch( + "agent_merge_queue.cli.reconcile_externally_merged_threads", + return_value=[], + ), + patch("agent_merge_queue.cli.settle_integration_checks", return_value=[]), + patch("agent_merge_queue.cli.promote_integrations", return_value=[]), + patch( + "agent_merge_queue.cli.command_promote", + return_value={"promoted": [], "waiting": [], "blocked": []}, + ) as promote, + patch( + "agent_merge_queue.cli.command_drain", + return_value={"merged": []}, + ), + patch("agent_merge_queue.cli.command_follow") as follow, + patch("agent_merge_queue.cli.freeze_queue", return_value=frozen), + redirect_stdout(io.StringIO()), + ): + result = command_react(client, follow=False, timeout_seconds=10) + + self.assertNotEqual(result.get("state"), "release-held") + promote.assert_called_once() + # The recovered SHA must not be re-followed (and thus re-paused) inside + # the admission fence; that is exactly the stale pause that deadlocked. + follow.assert_not_called() + client.set_pipeline_control.assert_not_called() + + def test_reactor_still_holds_failed_main_without_recovery(self) -> None: + # Scoping guard: the recovery bypass only applies to the exact unpaused + # SHA. A plain running pipeline that finds a red main still holds. + sha = "a" * 40 + client = Mock() + client.config = CONFIG + client.pipeline_control.return_value = {"state": "running"} + client.base_sha.return_value = sha + client.verified_main_sha.return_value = "f" * 40 + client.workflow_runs.return_value = [ + { + "id": 7, + "name": "CI", + "head_sha": sha, + "status": "completed", + "conclusion": "failure", + } + ] + with ( + patch( + "agent_merge_queue.cli.reconcile_externally_merged_threads", + return_value=[], + ), + patch("agent_merge_queue.cli.command_promote") as promote, + patch( + "agent_merge_queue.cli.command_follow", + return_value={"state": "ci-failed", "main_sha": sha}, + ), + redirect_stdout(io.StringIO()), + ): + result = command_react(client, follow=True, timeout_seconds=10) + + self.assertEqual(result["state"], "release-held") + promote.assert_not_called() + def test_reactor_holds_newly_merged_revision_before_ci_is_visible(self) -> None: sha = "a" * 40 client = Mock() @@ -6007,7 +6096,10 @@ def test_unpause_compare_and_sets_matching_failed_release(self) -> None: client.base_sha.return_value = sha client.set_pipeline_control.return_value = "resume-1" - with redirect_stdout(io.StringIO()): + with ( + patch("agent_merge_queue.cli.command_react") as react, + redirect_stdout(io.StringIO()), + ): command_unpause( client, main_sha=sha, @@ -6017,6 +6109,85 @@ def test_unpause_compare_and_sets_matching_failed_release(self) -> None: client.set_pipeline_control.assert_called_once_with( "running", None, resumes_control_id=control_id ) + # The recovery wakes the coordinator immediately so the elected repair + # merges without waiting for another delivery event or the sweep. + react.assert_called_once_with( + client, follow=False, timeout_seconds=1800, dispatch_ci=False + ) + + def test_unpause_can_skip_the_immediate_wake(self) -> None: + sha = "a" * 40 + control_id = "pause-1" + client = Mock() + client.pipeline_control.side_effect = [ + { + "state": "paused", + "reason": f"ci-failed on {sha}", + "control_id": control_id, + "main_sha": sha, + }, + { + "state": "running", + "control_id": "resume-1", + "resumes_control_id": control_id, + }, + ] + client.base_sha.return_value = sha + client.set_pipeline_control.return_value = "resume-1" + + with ( + patch("agent_merge_queue.cli.command_react") as react, + redirect_stdout(io.StringIO()), + ): + command_unpause( + client, + main_sha=sha, + control_id=control_id, + wake=False, + ) + + react.assert_not_called() + + def test_unpause_wake_failure_does_not_undo_durable_recovery(self) -> None: + sha = "a" * 40 + control_id = "pause-1" + client = Mock() + client.pipeline_control.side_effect = [ + { + "state": "paused", + "reason": f"ci-failed on {sha}", + "control_id": control_id, + "main_sha": sha, + }, + { + "state": "running", + "control_id": "resume-1", + "resumes_control_id": control_id, + }, + ] + client.base_sha.return_value = sha + client.set_pipeline_control.return_value = "resume-1" + + stream = io.StringIO() + with ( + patch( + "agent_merge_queue.cli.command_react", + side_effect=QueueError("transient GitHub error"), + ), + redirect_stdout(stream), + ): + command_unpause( + client, + main_sha=sha, + control_id=control_id, + ) + + # The recovery is already durable; a wake failure is reported but does + # not raise or re-pause the pipeline. + self.assertIn("deferred to the next event", stream.getvalue()) + client.set_pipeline_control.assert_called_once_with( + "running", None, resumes_control_id=control_id + ) def test_unpause_rejects_changed_pause_record(self) -> None: sha = "a" * 40 @@ -6257,6 +6428,94 @@ def test_pipeline_control_ignores_stale_conditional_repause(self) -> None: self.assertEqual(control["state"], "paused") self.assertEqual(control["control_id"], "pause-2") + def test_pipeline_control_preserves_recovery_against_stale_repause(self) -> None: + # Regression: a workflow pinned to an older DeployBot rereads the still + # failing CI for an already-recovered SHA and writes a fresh, byte + # identical pause. That stale restatement must not clear the recovery, + # or the elected repair can never merge to produce a passing main. + sha = "79cbedf65707fe2cbb0de92d7a933f45e550cc44" + client = object.__new__(GitHub) + client.coordinator_logins = {"coordinator"} + records = [ + control_body( + state="paused", + control_id="pause-1", + reason=f"ci-failed on {sha}", + main_sha=sha, + ), + control_body( + state="running", + control_id="resume-1", + resumes_control_id="pause-1", + ), + control_body( + state="paused", + control_id="pause-stale", + reason=f"ci-failed on {sha}", + main_sha=sha, + ), + ] + client.registry_comments = Mock( + return_value=[ + { + "id": index, + "created_at": f"2026-06-21T17:17:{index:02d}Z", + "user": {"login": "coordinator"}, + "body": body, + } + for index, body in enumerate(records, start=1) + ] + ) + + control = client.pipeline_control() + + self.assertEqual(control["state"], "running") + self.assertEqual(control["resumes_control_id"], "pause-1") + self.assertEqual(control["recovered_main_sha"], sha) + + def test_pipeline_control_repauses_when_a_new_release_fails(self) -> None: + # A genuinely new failure (a different SHA, or a different reason such as + # a later deploy failure) must still pause even after a prior recovery. + sha = "a" * 40 + newer = "b" * 40 + client = object.__new__(GitHub) + client.coordinator_logins = {"coordinator"} + records = [ + control_body( + state="paused", + control_id="pause-1", + reason=f"ci-failed on {sha}", + main_sha=sha, + ), + control_body( + state="running", + control_id="resume-1", + resumes_control_id="pause-1", + ), + control_body( + state="paused", + control_id="pause-2", + reason=f"ci-failed on {newer}", + main_sha=newer, + ), + ] + client.registry_comments = Mock( + return_value=[ + { + "id": index, + "created_at": f"2026-06-21T17:17:{index:02d}Z", + "user": {"login": "coordinator"}, + "body": body, + } + for index, body in enumerate(records, start=1) + ] + ) + + control = client.pipeline_control() + + self.assertEqual(control["state"], "paused") + self.assertEqual(control["control_id"], "pause-2") + def test_pipeline_control_migrates_legacy_pause_with_comment_identity(self) -> None: sha = "a" * 40 client = object.__new__(GitHub)