diff --git a/README.md b/README.md index 5936d00..238a980 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,9 @@ turn merges made by that token into ordinary `push` workflow runs, so DeployBot dispatches each configured CI workflow once after it merges a batch. GitHub can also suppress the usual `workflow_run` handoff after that token-driven CI run, so DeployBot explicitly dispatches each configured deployment workflow after -exact-main CI succeeds. CI workflows must accept `workflow_dispatch`. +exact-main CI succeeds. A compare-and-set Git ref leases each exact CI dispatch, +so concurrent followers cannot launch the same production deployment twice. +CI workflows must accept `workflow_dispatch`. Deployment workflows must accept `workflow_dispatch` inputs named `ci_sha` and `ci_run_id`, verify that run through the GitHub API, and deploy only when it is successful CI for the current base-branch head. Skipped deployment wake-ups diff --git a/adapters/claude-code/skills/deploybot/SKILL.md b/adapters/claude-code/skills/deploybot/SKILL.md index 268e465..e784689 100644 --- a/adapters/claude-code/skills/deploybot/SKILL.md +++ b/adapters/claude-code/skills/deploybot/SKILL.md @@ -7,6 +7,9 @@ description: Inspect and operate the full DeployBot delivery pipeline. Use when Read `.mergequeue.toml` before acting. Prefer the DeployBot MCP tools; fall back to the `deploybot` CLI. Treat GitHub as the durable source of truth. +If the task worktree does not contain the policy, use the repository's known +configured checkout or pass its path with `--config`; never run `deploybot init` +inside an existing task worktree merely to read status. ## Read Status diff --git a/adapters/codex/agent-merge-queue/skills/deploybot/SKILL.md b/adapters/codex/agent-merge-queue/skills/deploybot/SKILL.md index fa26b69..1180d09 100644 --- a/adapters/codex/agent-merge-queue/skills/deploybot/SKILL.md +++ b/adapters/codex/agent-merge-queue/skills/deploybot/SKILL.md @@ -7,6 +7,9 @@ description: Inspect and operate the full DeployBot delivery pipeline. Use when Read `.mergequeue.toml` before acting. Use the `deploybot` CLI directly and treat GitHub as the durable source of truth. +If the task worktree does not contain the policy, use the repository's known +configured checkout or pass its path with `--config`; never run `deploybot init` +inside an existing task worktree merely to read status. ## Read Status diff --git a/adapters/cursor/AGENTS.md b/adapters/cursor/AGENTS.md index 6a49494..a377a04 100644 --- a/adapters/cursor/AGENTS.md +++ b/adapters/cursor/AGENTS.md @@ -3,6 +3,8 @@ Read `.mergequeue.toml` and use the `deploybot` MCP tools. Keep changing pull requests draft and make the final ready head immutable. Address valid feedback from the configured review providers. +If a task worktree lacks the policy, pass the repository's configured policy +path explicitly; never initialize a second policy just to read status. For read-only status, use `pipeline_status` for the full delivery path, `queue_plan` for the merge queue, and `inspect_pull_request` for one PR. Report diff --git a/docs/reference.md b/docs/reference.md index d92b6e9..82df27b 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -15,6 +15,8 @@ current directory. `--repository` defaults to the repository resolved by the GitHub CLI. `--version` prints the installed version. Wherever `[PR]` appears, the selector may be a pull-request number, URL, or branch; when omitted, DeployBot resolves the pull request for the current branch. +If an existing task worktree lacks the policy, run from the configured checkout +or pass `--config PATH`; do not initialize a second policy just to inspect it. ### Setup and read-only inspection diff --git a/skills/deploybot/SKILL.md b/skills/deploybot/SKILL.md index 268e465..e784689 100644 --- a/skills/deploybot/SKILL.md +++ b/skills/deploybot/SKILL.md @@ -7,6 +7,9 @@ description: Inspect and operate the full DeployBot delivery pipeline. Use when Read `.mergequeue.toml` before acting. Prefer the DeployBot MCP tools; fall back to the `deploybot` CLI. Treat GitHub as the durable source of truth. +If the task worktree does not contain the policy, use the repository's known +configured checkout or pass its path with `--config`; never run `deploybot init` +inside an existing task worktree merely to read status. ## Read Status diff --git a/src/agent_merge_queue/cli.py b/src/agent_merge_queue/cli.py index 27e4575..99f9573 100755 --- a/src/agent_merge_queue/cli.py +++ b/src/agent_merge_queue/cli.py @@ -87,6 +87,8 @@ ) PULL_REQUEST_NUMBER = re.compile(r"#(\d+)\b") RELEASE_REPAIR_LEASE_PREFIX = "DeployBot release repair lease v1 " +DEPLOY_DISPATCH_LEASE_PREFIX = "DeployBot deployment dispatch lease v1 " +DEPLOY_DISPATCH_LEASE_BRANCH = "deploybot/deploy-dispatch" FAILED_CHECK_STATES = { "ACTION_REQUIRED", "CANCELLED", @@ -1289,6 +1291,13 @@ def dispatch_deploy_workflows( "to one active workflow" ) workflow_id = int(matches[0]["id"]) + claim_id = self.claim_deploy_dispatch( + ci_run=ci_run, + workflow_id=workflow_id, + workflow_name=name, + ) + if claim_id is None: + continue self._run( "workflow", "run", @@ -1302,6 +1311,12 @@ def dispatch_deploy_workflows( "-f", f"ci_run_id={ci_run_id}", ) + self.complete_deploy_dispatch( + ci_run=ci_run, + workflow_id=workflow_id, + workflow_name=name, + claim_id=claim_id, + ) dispatched.append( { "id": workflow_id, @@ -1312,6 +1327,196 @@ def dispatch_deploy_workflows( ) return dispatched + def deploy_dispatch_lease_branch( + self, *, ci_run: dict[str, Any], workflow_id: int + ) -> str: + ci_sha = str(ci_run.get("head_sha") or "") + ci_run_id = int(ci_run.get("id") or 0) + if not re.fullmatch(r"[0-9a-f]{40}", ci_sha) or not ci_run_id or not workflow_id: + raise QueueError("successful CI or deployment workflow identity is incomplete") + return ( + f"{DEPLOY_DISPATCH_LEASE_BRANCH}/{ci_sha[:12]}-" + f"{ci_run_id}-{workflow_id}" + ) + + def claim_deploy_dispatch( + self, + *, + ci_run: dict[str, Any], + workflow_id: int, + workflow_name: str, + ) -> str | None: + """Atomically reserve one exact deployment workflow dispatch.""" + self.require_actor(self.coordinator_logins, "dispatch a deployment") + ci_sha = str(ci_run.get("head_sha") or "") + ci_run_id = int(ci_run.get("id") or 0) + branch = self.deploy_dispatch_lease_branch( + ci_run=ci_run, workflow_id=workflow_id + ) + claimed_at = utc_now() + claimed_time = parse_time(claimed_at) + if claimed_time is None: # pragma: no cover - utc_now owns this invariant. + raise QueueError("deployment dispatch lease timestamp is invalid") + + base_commit = self._json( + "api", f"repos/{self.repository}/git/commits/{ci_sha}" + ) + base_tree = str((base_commit.get("tree") or {}).get("sha") or "") + if not base_tree: + raise QueueError("GitHub did not return the successful CI commit tree") + + claim_id = secrets.token_hex(16) + payload = { + "claimed_at": claimed_at, + "claim_id": claim_id, + "ci_run_id": ci_run_id, + "main_sha": ci_sha, + "schema": 1, + "state": "claimed", + "workflow_id": workflow_id, + "workflow_name": workflow_name, + } + for _attempt in range(3): + try: + ref = self._json( + "api", f"repos/{self.repository}/git/ref/heads/{branch}" + ) + except QueueError as error: + if "404" not in str(error) and "Not Found" not in str(error): + raise + lease_commit = self._json( + "api", + "--method", + "POST", + f"repos/{self.repository}/git/commits", + "-f", + f"message={DEPLOY_DISPATCH_LEASE_PREFIX}" + f"{json.dumps(payload, sort_keys=True)}", + "-f", + f"tree={base_tree}", + "-f", + f"parents[]={ci_sha}", + ) + lease_sha = str(lease_commit.get("sha") or "") + if not lease_sha: + raise QueueError("GitHub did not create the deployment dispatch lease") + try: + self._json( + "api", + "--method", + "POST", + f"repos/{self.repository}/git/refs", + "-f", + f"ref=refs/heads/{branch}", + "-f", + f"sha={lease_sha}", + ) + return claim_id + except QueueError as create_error: + if "Reference already exists" not in str(create_error): + raise + continue + + current_sha = str((ref.get("object") or {}).get("sha") or "") + if not current_sha: + raise QueueError("deployment dispatch lease has no readable head") + current_commit = self._json( + "api", f"repos/{self.repository}/git/commits/{current_sha}" + ) + current = self._deploy_dispatch_payload(current_commit) + identity = ( + current.get("schema") == 1 + and current.get("main_sha") == ci_sha + and int(current.get("ci_run_id") or 0) == ci_run_id + and int(current.get("workflow_id") or 0) == workflow_id + and current.get("workflow_name") == workflow_name + ) + if not identity: + raise QueueError("deployment dispatch lease has invalid ownership") + if current.get("state") == "dispatched": + return None + # A claimed reservation is deliberately permanent. GitHub's + # dispatch API returns no run ID, so after a process crash there + # is no exact evidence that distinguishes "accepted but not + # recorded" from "never sent". Automatic expiry could duplicate + # production work; recovery must use a new CI identity or explicit + # operator repair of this per-release ref. + return None + raise QueueError("deployment dispatch lease changed repeatedly; retry later") + + def _deploy_dispatch_payload(self, commit: dict[str, Any]) -> dict[str, Any]: + message = str(commit.get("message") or "") + if not message.startswith(DEPLOY_DISPATCH_LEASE_PREFIX): + raise QueueError("deployment dispatch lease has invalid ownership") + try: + payload = json.loads(message[len(DEPLOY_DISPATCH_LEASE_PREFIX) :]) + except json.JSONDecodeError as error: + raise QueueError("deployment dispatch lease has invalid ownership") from error + if not isinstance(payload, dict): + raise QueueError("deployment dispatch lease has invalid ownership") + return payload + + def complete_deploy_dispatch( + self, + *, + ci_run: dict[str, Any], + workflow_id: int, + workflow_name: str, + claim_id: str, + ) -> None: + branch = self.deploy_dispatch_lease_branch( + ci_run=ci_run, workflow_id=workflow_id + ) + ref = self._json("api", f"repos/{self.repository}/git/ref/heads/{branch}") + current_sha = str((ref.get("object") or {}).get("sha") or "") + if not current_sha: + raise QueueError("deployment dispatch lease has no readable head") + current_commit = self._json( + "api", f"repos/{self.repository}/git/commits/{current_sha}" + ) + current = self._deploy_dispatch_payload(current_commit) + if current.get("state") == "dispatched": + return + if current.get("claim_id") != claim_id: + raise QueueError("deployment dispatch lease ownership changed") + if ( + current.get("main_sha") != str(ci_run.get("head_sha") or "") + or int(current.get("ci_run_id") or 0) != int(ci_run.get("id") or 0) + or int(current.get("workflow_id") or 0) != workflow_id + or current.get("workflow_name") != workflow_name + ): + raise QueueError("deployment dispatch lease has invalid ownership") + completed = {**current, "completed_at": utc_now(), "state": "dispatched"} + current_tree = str((current_commit.get("tree") or {}).get("sha") or "") + if not current_tree: + raise QueueError("deployment dispatch lease has no readable tree") + lease_commit = self._json( + "api", + "--method", + "POST", + f"repos/{self.repository}/git/commits", + "-f", + f"message={DEPLOY_DISPATCH_LEASE_PREFIX}" + f"{json.dumps(completed, sort_keys=True)}", + "-f", + f"tree={current_tree}", + "-f", + f"parents[]={current_sha}", + ) + lease_sha = str(lease_commit.get("sha") or "") + if not lease_sha: + raise QueueError("GitHub did not complete the deployment dispatch lease") + self._json( + "api", + "--method", + "PATCH", + f"repos/{self.repository}/git/refs/heads/{branch}", + "-f", + f"sha={lease_sha}", + "-F", + "force=false", + ) + def recent_merged_pull_requests(self, limit: int) -> list[dict[str, Any]]: merged: list[dict[str, Any]] = [] page = 1 @@ -1684,12 +1889,21 @@ def create_integration_pull_request( except QueueError as error: if "Reference already exists" not in str(error): raise + ref = self._json("api", f"repos/{self.repository}/git/ref/heads/{branch}") + branch_head = str((ref.get("object") or {}).get("sha") or "") + if not branch_head: + raise QueueError(f"integration branch {branch} has no readable head") + else: + branch_head = base_sha merged_heads: list[str] = [] conflict: dict[str, Any] | None = None for entry in entries: + if self.is_ancestor(entry.head_sha, branch_head): + merged_heads.append(entry.head_sha) + continue try: - self._json( + merged = self._json( "api", "--method", "POST", @@ -1701,8 +1915,18 @@ def create_integration_pull_request( "-f", f"commit_message=DeployBot batch {batch_id}: PR #{entry.number}", ) + branch_head = str((merged or {}).get("sha") or branch_head) merged_heads.append(entry.head_sha) except QueueError as error: + if str(error) == "GitHub returned invalid JSON": + ref = self._json( + "api", f"repos/{self.repository}/git/ref/heads/{branch}" + ) + current_head = str((ref.get("object") or {}).get("sha") or "") + if current_head and self.is_ancestor(entry.head_sha, current_head): + branch_head = current_head + merged_heads.append(entry.head_sha) + continue conflict = { "number": entry.number, "head_sha": entry.head_sha, @@ -1970,6 +2194,8 @@ def snapshot( known_source_paths: list[str] | None = None, known_generated_paths: list[str] | None = None, defer_paths_until_ready: bool = False, + include_intent_paths: bool = False, + defer_review_details_for_irrelevant_drafts: bool = False, ) -> QueueEntry: fields = ",".join( ( @@ -2002,13 +2228,27 @@ def snapshot( comments = ( known_comments if known_comments is not None else self.comments(number) ) - reviews = self.reviews(number) if self.config.review_providers else [] + labels = sorted(str(label["name"]) for label in pull.get("labels") or []) + marker = queue_marker_for_client(self, comments) + intent = latest_intent(comments, self.trusted_logins) + defer_review_details = ( + defer_review_details_for_irrelevant_drafts + and bool(pull["isDraft"]) + and self.config.pipeline.intent_label not in labels + and self.config.queue_label not in labels + and not (intent and intent.get("state") == "requested") + and marker is None + ) + needs_reviews = any( + provider.kind == "github-approvals" + or (provider.kind == "bot" and provider.require_formal_review) + for provider in self.config.review_providers + ) + reviews = self.reviews(number) if needs_reviews and not defer_review_details else [] needs_threads = any( provider.kind == "bot" and provider.require_resolved_threads for provider in self.config.review_providers ) - threads = self.review_threads(number) if needs_threads else [] - marker = queue_marker_for_client(self, comments) head_sha = str(pull["headRefOid"]) check_rollup = list(pull.get("statusCheckRollup") or []) checks = merge_known_check_states(check_states(check_rollup), known_checks) @@ -2033,6 +2273,20 @@ def snapshot( source_paths, generated_paths = self.changed_paths(number) body = str(pull.get("body") or "") + threads = ( + self.review_threads(number) + if needs_threads and not defer_review_details + else [] + ) + review_verdicts = evaluate_reviews( + self.config.review_providers, + head_sha=head_sha, + checks=checks, + comments=comments, + reviews=reviews, + threads=threads, + ) + entry = QueueEntry( number=int(pull["number"]), title=str(pull["title"]), @@ -2045,16 +2299,9 @@ def snapshot( base_branch=str(pull["baseRefName"]), mergeable=str(pull.get("mergeable") or "UNKNOWN").upper(), merge_state=str(pull.get("mergeStateStatus") or "UNKNOWN").upper(), - labels=sorted(str(label["name"]) for label in pull.get("labels") or []), + labels=labels, checks=checks, - review_verdicts=evaluate_reviews( - self.config.review_providers, - head_sha=head_sha, - checks=checks, - comments=comments, - reviews=reviews, - threads=threads, - ), + review_verdicts=review_verdicts, source_paths=sorted(set(source_paths)), generated_paths=sorted(set(generated_paths)), dependencies=structured_dependencies( @@ -2072,7 +2319,17 @@ def snapshot( require_marker=require_marker, allow_blocked_label=allow_blocked_label, ) - if not paths_are_known and defer_paths_until_ready and entry.state == "ready": + if ( + not paths_are_known + and defer_paths_until_ready + and ( + entry.state == "ready" + or ( + include_intent_paths + and self.config.pipeline.intent_label in entry.labels + ) + ) + ): entry.source_paths, entry.generated_paths = self.changed_paths(number) return entry @@ -2401,6 +2658,9 @@ def inspect( require_marker=False, allow_blocked_label=True, known_comments=comments, + defer_paths_until_ready=True, + include_intent_paths=True, + defer_review_details_for_irrelevant_drafts=True, ), ) diff --git a/src/agent_merge_queue/config.py b/src/agent_merge_queue/config.py index 4934a61..52c74d4 100644 --- a/src/agent_merge_queue/config.py +++ b/src/agent_merge_queue/config.py @@ -575,7 +575,9 @@ def load_config(path: str | None = None, *, cwd: Path | None = None) -> QueueCon raw = config_path.read_bytes() except FileNotFoundError as error: raise ConfigError( - f"merge queue config not found: {config_path}; run `deploybot init`" + f"merge queue config not found: {config_path}; run from the configured " + "checkout, pass `--config PATH`, or use `deploybot init` only for a " + "new repository" ) from error try: payload = tomllib.loads(raw.decode("utf-8")) diff --git a/src/agent_merge_queue/pipeline.py b/src/agent_merge_queue/pipeline.py index 88b0e0d..35a8795 100644 --- a/src/agent_merge_queue/pipeline.py +++ b/src/agent_merge_queue/pipeline.py @@ -57,25 +57,30 @@ def release_state( ci_fence = parse_time( str((ci or {}).get("updated_at") or (ci or {}).get("created_at") or "") ) - deploy = latest_run( - [ - run - for run in runs - if not ( - str(run.get("status") or "") == "completed" - and str(run.get("conclusion") or "") == "skipped" - ) - and ( - ci_fence is None - or ( - (created_at := parse_time(str(run.get("created_at") or ""))) - is not None - and created_at >= ci_fence - ) + eligible_deploys = [ + run + for run in runs + if not ( + str(run.get("status") or "") == "completed" + and str(run.get("conclusion") or "") == "skipped" + ) + and ( + ci_fence is None + or ( + (created_at := parse_time(str(run.get("created_at") or ""))) + is not None + and created_at >= ci_fence ) - ], - config.deploy_workflows, - main_sha, + ) + ] + deploys = { + name: latest_run(eligible_deploys, (name,), main_sha) + for name in config.deploy_workflows + } + deploy = max( + (run for run in deploys.values() if run is not None), + key=lambda run: (str(run.get("created_at") or ""), int(run.get("id") or 0)), + default=None, ) active_ci = [ workflow_run(run) @@ -91,18 +96,31 @@ def release_state( ] ci_status = str((ci or {}).get("status") or "") ci_conclusion = str((ci or {}).get("conclusion") or "") - deploy_status = str((deploy or {}).get("status") or "") - deploy_conclusion = str((deploy or {}).get("conclusion") or "") + deploy_statuses = { + name: str((run or {}).get("status") or "") + for name, run in deploys.items() + } + deploy_conclusions = { + name: str((run or {}).get("conclusion") or "") + for name, run in deploys.items() + } if ci is None or ci_status != "completed": state = "testing" elif ci_conclusion != "success": state = "ci-failed" - elif deploy is None: + elif any( + deploy_statuses[name] == "completed" + and deploy_conclusions[name] != "success" + for name in config.deploy_workflows + if deploys[name] is not None + ): + state = "deploy-failed" + elif any(deploys[name] is None for name in config.deploy_workflows): state = "awaiting-deploy" - elif deploy_status != "completed": + elif any( + deploy_statuses[name] != "completed" for name in config.deploy_workflows + ): state = "deploying" - elif deploy_conclusion != "success": - state = "deploy-failed" else: state = "verified" return { @@ -110,6 +128,9 @@ def release_state( "main_sha": main_sha, "latest_ci": workflow_run(ci) if ci else None, "latest_deploy": workflow_run(deploy) if deploy else None, + "deployments": { + name: workflow_run(run) if run else None for name, run in deploys.items() + }, "active_ci": active_ci, "active_deployments": active_deploys, } diff --git a/tests/test_cli.py b/tests/test_cli.py index bb065c7..1710e3c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1495,6 +1495,196 @@ def test_waiting_promotion_defers_changed_file_fetch(self) -> None: self.assertEqual(value.state, "waiting") client.changed_paths.assert_not_called() + def test_status_keeps_intent_paths_while_deferring_other_waiting_paths(self) -> None: + client = object.__new__(GitHub) + client.config = CONFIG + client.repository = "example/repo" + client.trusted_logins = {"trusted"} + client.coordinator_logins = {"trusted"} + client.comments = Mock(return_value=[]) + client.changed_paths = Mock(return_value=(["a.py"], [])) + client._json = Mock( + return_value={ + "baseRefName": "main", + "body": "", + "headRefOid": "a" * 40, + "isDraft": False, + "labels": [{"name": "deploy-requested"}], + "mergeStateStatus": "CLEAN", + "mergeable": "MERGEABLE", + "number": 1, + "state": "OPEN", + "statusCheckRollup": [], + "title": "Waiting", + "url": "https://example.test/1", + } + ) + + value = client.snapshot( + 1, + require_marker=False, + allow_blocked_label=True, + defer_paths_until_ready=True, + include_intent_paths=True, + ) + + self.assertEqual(value.source_paths, ["a.py"]) + client.changed_paths.assert_called_once_with(1) + + def test_snapshot_defers_review_details_for_irrelevant_draft(self) -> None: + config = parse_config( + { + "queue": {"required_checks": ["CI"], "trusted_actors": ["trusted"]}, + "review": { + "providers": [ + { + "kind": "bot", + "name": "Review bot", + "login": "review-bot", + "check_name": "Review Bot", + "minimum_score": 4, + "score_pattern": r"Score:\s*(\d)", + "require_resolved_threads": True, + } + ] + }, + } + ) + client = object.__new__(GitHub) + client.config = config + client.repository = "example/repo" + client.trusted_logins = {"trusted"} + client.coordinator_logins = {"trusted"} + client.comments = Mock(return_value=[]) + client.reviews = Mock() + client.review_threads = Mock() + client.changed_paths = Mock(return_value=([], [])) + client._json = Mock( + return_value={ + "baseRefName": "main", + "body": "", + "headRefOid": "a" * 40, + "isDraft": True, + "labels": [], + "mergeStateStatus": "CLEAN", + "mergeable": "MERGEABLE", + "number": 1, + "state": "OPEN", + "statusCheckRollup": [], + "title": "Draft", + "url": "https://example.test/1", + } + ) + + value = client.snapshot( + 1, + require_marker=False, + defer_review_details_for_irrelevant_drafts=True, + ) + + self.assertEqual(value.state, "blocked") + client.reviews.assert_not_called() + client.review_threads.assert_not_called() + + client.comments.return_value = [ + { + "created_at": "2026-06-20T00:00:00Z", + "user": {"login": "trusted"}, + "body": intent_body( + intent_id="intent-1", + state="requested", + requested_at="2026-06-20T00:00:00Z", + requested_head="a" * 40, + ), + } + ] + client.review_threads.return_value = [] + client.snapshot( + 1, + require_marker=False, + defer_review_details_for_irrelevant_drafts=True, + ) + + client.review_threads.assert_called_once_with(1) + + def test_snapshot_reads_ready_bot_threads_while_another_provider_waits(self) -> None: + head = "a" * 40 + config = parse_config( + { + "queue": {"required_checks": ["CI"], "trusted_actors": ["trusted"]}, + "review": { + "providers": [ + { + "kind": "bot", + "name": "Review bot", + "login": "review-bot", + "check_name": "Review Bot", + "minimum_score": 4, + "score_pattern": r"Score:\s*(\d)", + "require_resolved_threads": True, + }, + { + "kind": "check", + "name": "Other review", + "check_name": "Other Review", + }, + ] + }, + } + ) + client = object.__new__(GitHub) + client.config = config + client.repository = "example/repo" + client.trusted_logins = {"trusted"} + client.coordinator_logins = {"trusted"} + client.comments = Mock( + return_value=[ + { + "created_at": "2026-06-20T00:00:00Z", + "user": {"login": "review-bot[bot]"}, + "body": f"Score: 5\ncommit {head}", + } + ] + ) + client.review_threads = Mock( + return_value=[ + { + "isResolved": False, + "isOutdated": False, + "comments": { + "nodes": [{"author": {"login": "review-bot[bot]"}}] + }, + } + ] + ) + client.changed_paths = Mock(return_value=([], [])) + client._json = Mock( + return_value={ + "baseRefName": "main", + "body": "", + "headRefOid": head, + "isDraft": False, + "labels": [], + "mergeStateStatus": "CLEAN", + "mergeable": "MERGEABLE", + "number": 1, + "state": "OPEN", + "statusCheckRollup": [ + {"name": "CI", "conclusion": "SUCCESS"}, + {"name": "Review Bot", "conclusion": "SUCCESS"}, + ], + "title": "Reviewed", + "url": "https://example.test/1", + } + ) + + value = client.snapshot(1, require_marker=False) + + self.assertEqual(value.state, "blocked") + self.assertIn("unresolved Review bot", "; ".join(value.reasons or [])) + self.assertIn("Other Review is not complete", value.reasons or []) + client.review_threads.assert_called_once_with(1) + def test_integration_snapshot_uses_exact_commit_check_fallback(self) -> None: head_sha = "a" * 40 marker = { @@ -2428,6 +2618,7 @@ def test_integration_can_require_non_actions_author(self) -> None: client.owner = "example" client.coordinator_logins = {"trusted"} client.base_sha = Mock(return_value="b" * 40) + client.is_ancestor = Mock(return_value=False) client._json = Mock( side_effect=[ {}, @@ -2462,6 +2653,7 @@ def test_integration_requires_app_author_to_be_a_coordinator(self) -> None: client.owner = "example" client.coordinator_logins = {"trusted"} client.base_sha = Mock(return_value="b" * 40) + client.is_ancestor = Mock(return_value=False) client._json = Mock( side_effect=[ {}, @@ -3621,6 +3813,7 @@ def test_integration_pr_contains_every_frozen_head(self) -> None: client.owner = "example" client.name = "repo" client.base_sha = Mock(return_value="b" * 40) + client.is_ancestor = Mock(return_value=False) client._json = Mock( side_effect=[ {}, @@ -3672,6 +3865,7 @@ def test_conflicted_integration_keeps_source_intents_discoverable(self) -> None: client.owner = "example" client.name = "repo" client.base_sha = Mock(return_value="b" * 40) + client.is_ancestor = Mock(return_value=False) client._json = Mock( side_effect=[ {}, @@ -3873,6 +4067,7 @@ def test_failed_integration_pr_creation_removes_its_orphan_branch(self) -> None: client.owner = "example" client.name = "repo" client.base_sha = Mock(return_value="b" * 40) + client.is_ancestor = Mock(return_value=False) client._json = Mock( side_effect=[ {}, @@ -5216,6 +5411,8 @@ def test_github_dispatches_deploy_with_exact_successful_ci(self) -> None: return_value=[{"id": 8, "name": "Deploy", "state": "active"}] ) client._run = Mock(return_value="") + client.claim_deploy_dispatch = Mock(return_value="claim-1") + client.complete_deploy_dispatch = Mock() sha = "a" * 40 result = client.dispatch_deploy_workflows(ci_run={"id": 42, "head_sha": sha}) @@ -5237,6 +5434,168 @@ def test_github_dispatches_deploy_with_exact_successful_ci(self) -> None: "-f", "ci_run_id=42", ) + client.complete_deploy_dispatch.assert_called_once_with( + ci_run={"id": 42, "head_sha": sha}, + workflow_id=8, + workflow_name="Deploy", + claim_id="claim-1", + ) + + def test_deploy_dispatch_lease_allows_only_the_first_follower(self) -> None: + client = object.__new__(GitHub) + client.repository = "example/repo" + client.coordinator_logins = {"trusted"} + client.require_actor = Mock(return_value="trusted") + sha = "a" * 40 + tree = "t" * 40 + lease = "l" * 40 + client._json = Mock( + side_effect=[ + {"tree": {"sha": tree}}, + QueueError("HTTP 404: Not Found"), + {"sha": lease}, + {}, + ] + ) + + claimed = client.claim_deploy_dispatch( + ci_run={"id": 42, "head_sha": sha}, + workflow_id=8, + workflow_name="Deploy", + ) + + self.assertIsNotNone(claimed) + client.require_actor.assert_called_once() + + def test_deploy_dispatch_lease_rejects_a_duplicate_follower(self) -> None: + client = object.__new__(GitHub) + client.repository = "example/repo" + client.coordinator_logins = {"trusted"} + client.require_actor = Mock(return_value="trusted") + sha = "a" * 40 + tree = "t" * 40 + lease = "l" * 40 + payload = { + "claimed_at": "2026-06-20T00:00:00Z", + "claim_id": "claim-1", + "ci_run_id": 42, + "main_sha": sha, + "schema": 1, + "state": "claimed", + "workflow_id": 8, + "workflow_name": "Deploy", + } + client._json = Mock( + side_effect=[ + {"tree": {"sha": tree}}, + {"object": {"sha": lease}}, + { + "message": "DeployBot deployment dispatch lease v1 " + + json.dumps(payload, sort_keys=True), + "tree": {"sha": tree}, + }, + ] + ) + + claimed = client.claim_deploy_dispatch( + ci_run={"id": 42, "head_sha": sha}, + workflow_id=8, + workflow_name="Deploy", + ) + + self.assertIsNone(claimed) + + def test_deploy_dispatch_leases_are_isolated_by_release_and_workflow(self) -> None: + client = object.__new__(GitHub) + first = client.deploy_dispatch_lease_branch( + ci_run={"id": 42, "head_sha": "a" * 40}, workflow_id=8 + ) + second_release = client.deploy_dispatch_lease_branch( + ci_run={"id": 43, "head_sha": "b" * 40}, workflow_id=8 + ) + second_workflow = client.deploy_dispatch_lease_branch( + ci_run={"id": 42, "head_sha": "a" * 40}, workflow_id=9 + ) + + self.assertEqual(len({first, second_release, second_workflow}), 3) + + def test_completed_deploy_dispatch_is_never_reclaimed(self) -> None: + client = object.__new__(GitHub) + client.repository = "example/repo" + client.coordinator_logins = {"trusted"} + client.require_actor = Mock(return_value="trusted") + sha = "a" * 40 + tree = "t" * 40 + lease = "l" * 40 + payload = { + "claimed_at": "2026-06-20T00:00:00Z", + "claim_id": "claim-1", + "ci_run_id": 42, + "main_sha": sha, + "schema": 1, + "state": "dispatched", + "workflow_id": 8, + "workflow_name": "Deploy", + } + client._json = Mock( + side_effect=[ + {"tree": {"sha": tree}}, + {"object": {"sha": lease}}, + { + "message": "DeployBot deployment dispatch lease v1 " + + json.dumps(payload, sort_keys=True), + "tree": {"sha": tree}, + }, + ] + ) + + claimed = client.claim_deploy_dispatch( + ci_run={"id": 42, "head_sha": sha}, + workflow_id=8, + workflow_name="Deploy", + ) + + self.assertIsNone(claimed) + + def test_claimed_dispatch_is_never_automatically_reclaimed(self) -> None: + client = object.__new__(GitHub) + client.repository = "example/repo" + client.coordinator_logins = {"trusted"} + client.require_actor = Mock(return_value="trusted") + sha = "a" * 40 + tree = "t" * 40 + lease = "l" * 40 + payload = { + "claimed_at": "2026-06-20T00:00:00Z", + "claim_id": "claim-1", + "ci_run_id": 42, + "main_sha": sha, + "schema": 1, + "state": "claimed", + "workflow_id": 8, + "workflow_name": "Deploy", + } + client._json = Mock( + side_effect=[ + {"tree": {"sha": tree}}, + {"object": {"sha": lease}}, + { + "message": "DeployBot deployment dispatch lease v1 " + + json.dumps(payload, sort_keys=True), + "tree": {"sha": tree}, + }, + ] + ) + client.workflow_runs_for_workflows = Mock() + + claimed = client.claim_deploy_dispatch( + ci_run={"id": 42, "head_sha": sha}, + workflow_id=8, + workflow_name="Deploy", + ) + + self.assertIsNone(claimed) + client.workflow_runs_for_workflows.assert_not_called() def test_pipeline_pause_blocks_every_merge_path(self) -> None: client = Mock() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c062175..bc7769c 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -77,6 +77,54 @@ def test_release_state_ignores_skipped_deployment_wakeups(self) -> None: self.assertEqual(value["state"], "awaiting-deploy") self.assertIsNone(value["latest_deploy"]) + def test_release_state_requires_every_configured_deployment_workflow(self) -> None: + sha = "a" * 40 + config = parse_config( + { + "queue": {"required_checks": ["CI"], "trusted_actors": ["trusted"]}, + "pipeline": {"deploy_workflows": ["Deploy API", "Deploy UI"]}, + } + ) + ci = { + "id": 1, + "name": "CI", + "head_sha": sha, + "status": "completed", + "conclusion": "success", + "created_at": "2026-06-20T00:00:00Z", + } + api = { + "id": 2, + "name": "Deploy API", + "head_sha": sha, + "status": "completed", + "conclusion": "success", + "created_at": "2026-06-20T00:01:00Z", + } + ui = { + "id": 3, + "name": "Deploy UI", + "head_sha": sha, + "status": "in_progress", + "conclusion": None, + "created_at": "2026-06-20T00:01:30Z", + } + + waiting = release_state(main_sha=sha, runs=[ci, api], config=config.pipeline) + deploying = release_state( + main_sha=sha, runs=[ci, api, ui], config=config.pipeline + ) + verified = release_state( + main_sha=sha, + runs=[ci, api, {**ui, "status": "completed", "conclusion": "success"}], + config=config.pipeline, + ) + + self.assertEqual(waiting["state"], "awaiting-deploy") + self.assertIsNone(waiting["deployments"]["Deploy UI"]) + self.assertEqual(deploying["state"], "deploying") + self.assertEqual(verified["state"], "verified") + def test_new_successful_ci_supersedes_an_older_failed_deploy(self) -> None: sha = "a" * 40 runs = [ @@ -156,6 +204,7 @@ def test_follow_dispatches_deploy_after_token_dispatched_ci(self) -> None: client.config = CONFIG client.base_sha.return_value = sha client.workflow_runs.side_effect = [[ci], [ci, deploy]] + client.claim_deploy_dispatch.return_value = True client.dispatch_deploy_workflows.return_value = [ {"id": 9, "name": "Deploy", "ci_sha": sha, "ci_run_id": 1} ] @@ -172,6 +221,41 @@ def test_follow_dispatches_deploy_after_token_dispatched_ci(self) -> None: self.assertEqual(result["state"], "verified") self.assertEqual(result["dispatched_deployments"][0]["id"], 9) + def test_follow_accepts_an_idempotent_empty_dispatch_result(self) -> None: + sha = "a" * 40 + ci = { + "id": 1, + "name": "CI", + "head_sha": sha, + "status": "completed", + "conclusion": "success", + "event": "workflow_dispatch", + "created_at": "2026-06-20T00:00:00Z", + } + deploy = { + "id": 2, + "name": "Deploy", + "head_sha": sha, + "status": "completed", + "conclusion": "success", + "event": "workflow_dispatch", + "created_at": "2026-06-20T00:01:00Z", + } + client = Mock() + client.config = CONFIG + client.base_sha.return_value = sha + client.workflow_runs.side_effect = [[ci], [ci, deploy]] + client.dispatch_deploy_workflows.return_value = [] + + with ( + patch("agent_merge_queue.pipeline.time.sleep"), + patch("agent_merge_queue.pipeline.time.monotonic", side_effect=[0, 1]), + ): + result = follow_release(client, timeout_seconds=10, poll_seconds=1) + + self.assertEqual(result["state"], "verified") + client.dispatch_deploy_workflows.assert_called_once() + def test_follow_absorbs_a_ci_rerun_during_failure_grace(self) -> None: sha = "a" * 40 failed = {