From 99e058cf8d83f0b367b41a221aa222610a46d1fe Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 17:34:28 +0800 Subject: [PATCH 01/14] fix(team): hand off after single-chat team creation --- crates/aionui-team/src/prompts/team_guide.rs | 34 +++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/crates/aionui-team/src/prompts/team_guide.rs b/crates/aionui-team/src/prompts/team_guide.rs index 690fd9683..fb626bb25 100644 --- a/crates/aionui-team/src/prompts/team_guide.rs +++ b/crates/aionui-team/src/prompts/team_guide.rs @@ -55,11 +55,11 @@ If case 2 applies, ask at most once whether the user wants to bring in a Team. K | Tester | Write and run tests | {agent_type} | (model from list) | 4. **Output the table as a normal text message and END YOUR TURN.** Do NOT call `aion_create_team` or any other tool (including ask_user) in this turn. Wait for the user to reply in their next message with explicit confirmation (e.g. \"ok\", \"go ahead\", \"确认\") before proceeding. 5. After user confirms → call `aion_create_team`. The summary MUST include both the goal and the confirmed team configuration. (The system automatically sets the correct agent type — you do NOT need to pass agentType.) -6. After `aion_create_team` returns → you ARE now the team Leader. The system navigates to the team page automatically. **Immediately** use `team_spawn_agent` to create each teammate from the confirmed configuration table. Then use `team_send_message` to assign initial tasks to each spawned teammate. Do NOT end your turn until all teammates are spawned and tasked. +6. After `aion_create_team` returns → the Team has been created, the current conversation has been bound as Leader, and the system navigates to the Team page automatically. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output a brief handoff telling the user to continue from the Team page, then END YOUR TURN. The user's next message from the Team page will start the first formal `TeamRun`. 7. User declines or wants changes → adjust or proceed solo. Do not mention Team again unless the user asks. ### Tool constraint -Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: use team tools (`team_spawn_agent`, `team_send_message`, `team_members`, `team_task_create`, etc.) to manage your team."; +Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn. Team tools are only for normal Team runtime after the Team page accepts the user's first Team message and an active `TeamRun` exists."; /// Build the Team Guide prompt for a solo agent. /// @@ -128,14 +128,40 @@ If case 2 applies, ask at most once whether the user wants to bring in a Team. K | Tester | Write and run tests | claude | (model from list) |\n\ 4. **Output the table as a normal text message and END YOUR TURN.** Do NOT call `aion_create_team` or any other tool (including ask_user) in this turn. Wait for the user to reply in their next message with explicit confirmation (e.g. \"ok\", \"go ahead\", \"确认\") before proceeding.\n\ 5. After user confirms → call `aion_create_team`. The summary MUST include both the goal and the confirmed team configuration. (The system automatically sets the correct agent type — you do NOT need to pass agentType.)\n\ -6. After `aion_create_team` returns → you ARE now the team Leader. The system navigates to the team page automatically. **Immediately** use `team_spawn_agent` to create each teammate from the confirmed configuration table. Then use `team_send_message` to assign initial tasks to each spawned teammate. Do NOT end your turn until all teammates are spawned and tasked.\n\ +6. After `aion_create_team` returns → the Team has been created, the current conversation has been bound as Leader, and the system navigates to the Team page automatically. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output a brief handoff telling the user to continue from the Team page, then END YOUR TURN. The user's next message from the Team page will start the first formal `TeamRun`.\n\ 7. User declines or wants changes → adjust or proceed solo. Do not mention Team again unless the user asks.\n\ \n\ ### Tool constraint\n\ -Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: use team tools (`team_spawn_agent`, `team_send_message`, `team_members`, `team_task_create`, etc.) to manage your team."; +Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn. Team tools are only for normal Team runtime after the Team page accepts the user's first Team message and an active `TeamRun` exists."; assert_eq!(prompt, expected); } + #[test] + fn team_guide_prompt_hands_off_after_create_team() { + let prompt = build_team_guide_prompt("claude", None); + + assert!(prompt.contains( + "After `aion_create_team` returns → the Team has been created, the current conversation has been bound as Leader, and the system navigates to the Team page automatically." + )); + assert!(prompt.contains( + "Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn." + )); + assert!(prompt.contains( + "The user's next message from the Team page will start the first formal `TeamRun`." + )); + assert!(prompt.contains( + "After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn." + )); + assert!( + !prompt.contains("Your team tools (team_spawn_agent, team_send_message, etc.) are now active."), + "prompt must not claim Team tools are active immediately after creation" + ); + assert!( + !prompt.contains("Immediately proceed to spawn teammates as planned"), + "prompt must not ask the solo agent to spawn teammates in the same solo turn" + ); + } + #[test] fn team_guide_prompt_with_preset_leader_label() { let prompt = build_team_guide_prompt("gemini", Some("Word Creator")); From fba81c14cac5c14e6b6591d59965b114c753a043 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 17:35:18 +0800 Subject: [PATCH 02/14] fix(team): return single-chat handoff next step --- crates/aionui-team/src/guide/server.rs | 37 ++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/crates/aionui-team/src/guide/server.rs b/crates/aionui-team/src/guide/server.rs index a0a2cfd25..c6c80652a 100644 --- a/crates/aionui-team/src/guide/server.rs +++ b/crates/aionui-team/src/guide/server.rs @@ -169,6 +169,14 @@ async fn handle_tool_request( // Tool implementations // --------------------------------------------------------------------------- +fn build_create_team_handoff_next_step(summary: &str) -> String { + format!( + "Team was created and the Team page is open. End this solo turn now. \ + Do not call any `team_*` tools from this solo turn. Tell the user to continue from the Team page; \ + their next message there will start the first formal `TeamRun`. Task summary: {summary}" + ) +} + async fn exec_create_team( request_body: &serde_json::Value, args: &serde_json::Value, @@ -268,11 +276,7 @@ async fn exec_create_team( "name": team.name, "route": route, "status": "team_created", - "next_step": format!( - "You are now the team Leader. Your team tools (team_spawn_agent, team_send_message, etc.) are now active. \ - Immediately proceed to spawn teammates as planned. Task summary: {}", - params.summary - ) + "next_step": build_create_team_handoff_next_step(¶ms.summary) }) } @@ -384,6 +388,29 @@ mod tests { use std::time::Duration; use tokio::time::timeout; + #[test] + fn create_team_next_step_tells_solo_agent_to_end_turn() { + let next_step = build_create_team_handoff_next_step("Build a research and implementation team"); + + assert!(next_step.contains("Team was created and the Team page is open.")); + assert!(next_step.contains("End this solo turn now.")); + assert!(next_step.contains("Do not call any `team_*` tools from this solo turn.")); + assert!(next_step.contains("their next message there will start the first formal `TeamRun`.")); + assert!(next_step.contains("Task summary: Build a research and implementation team")); + assert!( + !next_step.contains("team_spawn_agent"), + "next_step must not name spawn as an immediately available action" + ); + assert!( + !next_step.contains("team_send_message"), + "next_step must not name send_message as an immediately available action" + ); + assert!( + !next_step.contains("tools are now active"), + "next_step must not claim Team tools are active immediately after creation" + ); + } + #[tokio::test] async fn start_returns_positive_port_and_token() { let server = GuideMcpServer::start().await.expect("start should succeed"); From 32b8ecc12348f34f365efe79f1d70677a2932920 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 17:36:30 +0800 Subject: [PATCH 03/14] fix(team): clarify guide team tool handoff guard --- crates/aionui-team/src/guide/server.rs | 87 ++++++++++++++++++++ crates/aionui-team/src/prompts/team_guide.rs | 10 +-- 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/crates/aionui-team/src/guide/server.rs b/crates/aionui-team/src/guide/server.rs index c6c80652a..29a9a9550 100644 --- a/crates/aionui-team/src/guide/server.rs +++ b/crates/aionui-team/src/guide/server.rs @@ -177,6 +177,26 @@ fn build_create_team_handoff_next_step(summary: &str) -> String { ) } +const NO_ACTIVE_TEAM_RUN_FOR_RUN_SCOPED_WAKE: &str = "no active team run for run-scoped wake"; +const GUIDE_NO_ACTIVE_TEAM_RUN_HANDOFF_ERROR: &str = + "Team was created, but no TeamRun is active yet. End this solo turn and continue from the Team page."; + +fn is_run_scoped_guide_team_tool(tool_name: &str) -> bool { + matches!( + tool_name, + "team_send_message" + | "team_spawn_agent" + | "team_task_create" + | "team_task_update" + | "team_rename_agent" + | "team_shutdown_agent" + ) +} + +fn guide_no_active_team_run_handoff_response() -> serde_json::Value { + serde_json::json!({ "error": GUIDE_NO_ACTIVE_TEAM_RUN_HANDOFF_ERROR }) +} + async fn exec_create_team( request_body: &serde_json::Value, args: &serde_json::Value, @@ -322,6 +342,29 @@ async fn exec_team_tool( } }; + if is_run_scoped_guide_team_tool(tool_name) { + match svc.require_active_team_run_for_team_work(&team_id).await { + Ok(()) => {} + Err(crate::TeamError::InvalidRequest(message)) if message == NO_ACTIVE_TEAM_RUN_FOR_RUN_SCOPED_WAKE => { + warn!( + tool = tool_name, + team_id = %team_id, + "Guide HTTP: run-scoped team tool refused because no active TeamRun exists" + ); + return guide_no_active_team_run_handoff_response(); + } + Err(error) => { + warn!( + tool = tool_name, + team_id = %team_id, + error = %error, + "Guide HTTP: active TeamRun check failed before forwarding team tool" + ); + return serde_json::json!({"error": error.to_string()}); + } + } + } + let svc_weak = Arc::downgrade(&svc); let result = crate::mcp::server::dispatch_tool( tool_name, @@ -411,6 +454,50 @@ mod tests { ); } + #[test] + fn run_scoped_guide_team_tools_are_classified_for_handoff_guard() { + for tool_name in [ + "team_send_message", + "team_spawn_agent", + "team_task_create", + "team_task_update", + "team_rename_agent", + "team_shutdown_agent", + ] { + assert!( + is_run_scoped_guide_team_tool(tool_name), + "{tool_name} should require an active TeamRun in the Guide forwarding path" + ); + } + + for tool_name in [ + "team_members", + "team_task_list", + "team_list_models", + "team_describe_assistant", + ] { + assert!( + !is_run_scoped_guide_team_tool(tool_name), + "{tool_name} is read-only/catalog-style and should not use the run-scoped handoff guard" + ); + } + } + + #[test] + fn guide_no_active_team_run_handoff_error_is_clear() { + let response = guide_no_active_team_run_handoff_response(); + let error = response + .get("error") + .and_then(serde_json::Value::as_str) + .expect("error string"); + + assert_eq!( + error, + "Team was created, but no TeamRun is active yet. End this solo turn and continue from the Team page." + ); + assert!(!error.contains("no active team run for run-scoped wake")); + } + #[tokio::test] async fn start_returns_positive_port_and_token() { let server = GuideMcpServer::start().await.expect("start should succeed"); diff --git a/crates/aionui-team/src/prompts/team_guide.rs b/crates/aionui-team/src/prompts/team_guide.rs index fb626bb25..a6f2d86ff 100644 --- a/crates/aionui-team/src/prompts/team_guide.rs +++ b/crates/aionui-team/src/prompts/team_guide.rs @@ -146,12 +146,10 @@ Before team creation: use **only** `aion_create_team` and `aion_list_models`. Af assert!(prompt.contains( "Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn." )); - assert!(prompt.contains( - "The user's next message from the Team page will start the first formal `TeamRun`." - )); - assert!(prompt.contains( - "After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn." - )); + assert!(prompt.contains("The user's next message from the Team page will start the first formal `TeamRun`.")); + assert!( + prompt.contains("After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn.") + ); assert!( !prompt.contains("Your team tools (team_spawn_agent, team_send_message, etc.) are now active."), "prompt must not claim Team tools are active immediately after creation" From da8d90f484a75d6771f5b2862819fb22035effd1 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 19:36:49 +0800 Subject: [PATCH 04/14] fix(team): accept create-team handoff response --- .../aionui-app/src/commands/cmd_team_guide.rs | 113 +++++++++++++++++- 1 file changed, 109 insertions(+), 4 deletions(-) diff --git a/crates/aionui-app/src/commands/cmd_team_guide.rs b/crates/aionui-app/src/commands/cmd_team_guide.rs index 90f699320..1e9dbcfcf 100644 --- a/crates/aionui-app/src/commands/cmd_team_guide.rs +++ b/crates/aionui-app/src/commands/cmd_team_guide.rs @@ -500,12 +500,95 @@ mod tests { assert!(!serialized.contains("conv-secret-123")); } + #[tokio::test] + async fn forward_tool_create_team_object_success_returns_success_text() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/tool")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "teamId": "team-1", + "name": "Dev Team", + "route": "/team/team-1", + "status": "team_created", + "next_step": "Team was created and the Team page is open. End this solo turn now.", + }))) + .mount(&mock_server) + .await; + + let server = guide_server_for_port(mock_server.address().port()); + let result = server + .forward_tool("aion_create_team", &json!({"summary": "redacted"})) + .await; + + assert_ne!(result.is_error, Some(true)); + let text = first_text(&result); + assert!(text.contains("team_created")); + assert!(text.contains("team-1")); + assert!(text.contains("End this solo turn now")); + } + + #[tokio::test] + async fn forward_tool_create_team_malformed_object_remains_unexpected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/tool")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "name": "Dev Team", + "route": "/team/team-1", + "status": "team_created", + "next_step": "Team was created and the Team page is open. End this solo turn now.", + }))) + .mount(&mock_server) + .await; + + let server = guide_server_for_port(mock_server.address().port()); + let result = server + .forward_tool("aion_create_team", &json!({"summary": "redacted"})) + .await; + + assert_eq!(result.is_error, Some(true)); + assert_eq!(first_text(&result), "unexpected local guide tool response"); + assert_eq!( + result.structured_content.as_ref().unwrap()["code"], + "MCP_TOOL_RESPONSE_UNEXPECTED" + ); + } + + #[tokio::test] + async fn forward_tool_create_team_legacy_result_body_remains_unexpected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/tool")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "result": "legacy create-team success", + }))) + .mount(&mock_server) + .await; + + let server = guide_server_for_port(mock_server.address().port()); + let result = server + .forward_tool("aion_create_team", &json!({"summary": "redacted"})) + .await; + + assert_eq!(result.is_error, Some(true)); + assert_eq!(first_text(&result), "unexpected local guide tool response"); + assert_eq!( + result.structured_content.as_ref().unwrap()["code"], + "MCP_TOOL_RESPONSE_UNEXPECTED" + ); + } + #[tokio::test] async fn forward_tool_unexpected_2xx_body_returns_unexpected_without_echoing_body() { let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path("/tool")) - .respond_with(ResponseTemplate::new(200).set_body_string("unexpected raw body with team-secret-789")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "teamId": "team-secret-789", + "route": "/team/team-secret-789", + "status": "team_created", + "next_step": "Team was created and the Team page is open. End this solo turn now.", + }))) .mount(&mock_server) .await; @@ -520,7 +603,7 @@ mod tests { ); let serialized = serde_json::to_string(&result).unwrap(); assert!(!serialized.contains("team-secret-789")); - assert!(!serialized.contains("unexpected raw body")); + assert!(!serialized.contains("team_created")); } #[tokio::test] @@ -599,8 +682,8 @@ impl GuideServer { match resp.text().await { Ok(text) => { if let Ok(v) = serde_json::from_str::(&text) { - if let Some(result) = v.get("result").and_then(|r| r.as_str()) { - return tool_success(result.to_owned()); + if let Some(result) = parse_tool_success_text(tool_name, &v) { + return tool_success(result); } if v.get("error").is_some() { return tool_error( @@ -694,3 +777,25 @@ fn extract_nested_code(value: &serde_json::Value, path: &[&str]) -> Option None, } } + +fn parse_tool_success_text(tool_name: &str, value: &serde_json::Value) -> Option { + if tool_name == "aion_create_team" { + return is_create_team_success_body(value).then(|| serde_json::to_string(value).ok())?; + } + + if let Some(result) = value.get("result").and_then(|result| result.as_str()) { + return Some(result.to_owned()); + } + + None +} + +fn is_create_team_success_body(value: &serde_json::Value) -> bool { + value.get("status").and_then(|status| status.as_str()) == Some("team_created") + && value.get("teamId").and_then(|team_id| team_id.as_str()).is_some() + && value.get("route").and_then(|route| route.as_str()).is_some() + && value + .get("next_step") + .and_then(|next_step| next_step.as_str()) + .is_some() +} From 5435536565ce5be165e5cdf29fbcf34e9b0eb62a Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 20:36:06 +0800 Subject: [PATCH 05/14] fix(team): retry busy handoff turns within team run --- crates/aionui-team/src/event_loop.rs | 16 ++++- crates/aionui-team/src/team_run.rs | 48 +++++++++++++ crates/aionui-team/tests/e2e_team_flow.rs | 88 +++++++++++++++++++++++ 3 files changed, 151 insertions(+), 1 deletion(-) diff --git a/crates/aionui-team/src/event_loop.rs b/crates/aionui-team/src/event_loop.rs index d684b78fc..4277ce9e3 100644 --- a/crates/aionui-team/src/event_loop.rs +++ b/crates/aionui-team/src/event_loop.rs @@ -12,7 +12,8 @@ use tracing::{debug, info, warn}; use crate::mailbox::Mailbox; use crate::ports::{ - AgentTurnExecutionPort, AgentTurnRequest, AgentTurnSource, AgentTurnStarted, AgentTurnStartedCallback, + AgentTurnExecutionError, AgentTurnExecutionPort, AgentTurnRequest, AgentTurnSource, AgentTurnStarted, + AgentTurnStartedCallback, }; use crate::scheduler::TeammateManager; use crate::session::TeamSession; @@ -118,6 +119,10 @@ struct TurnExecution { turn_id: Option, } +fn is_retryable_start_skip(error: &AgentTurnExecutionError) -> bool { + matches!(error, AgentTurnExecutionError::Skipped { reason } if reason.contains("already running")) +} + /// The event loop for one agent slot. Spawned as a tokio task. /// /// Flow: @@ -335,6 +340,15 @@ async fn execute_turn(ctx: &AgentLoopContext, input: &crate::session::WakeInput) { if started_seen.load(Ordering::SeqCst) { ctx.session.team_run_manager().complete_failed().await; + } else if is_retryable_start_skip(&e) { + ctx.session + .team_run_manager() + .retry_child_start_later(&reservation.reservation_id, &e.to_string()) + .await; + let _ = ctx.scheduler.set_status(&ctx.slot_id, TeammateStatus::Idle).await; + tokio::time::sleep(Duration::from_millis(250)).await; + ctx.registry.notify(&ctx.slot_id); + return None; } else { ctx.session .team_run_manager() diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index 522e7f144..73f35d67e 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -40,6 +40,8 @@ pub struct StartingChildReservation { pub slot_id: String, pub role: TeamRunTargetRole, pub conversation_id: String, + pub(crate) wake_source: TeamWakeSource, + pub(crate) message_id: Option, pub state: StartingReservationState, } @@ -525,6 +527,8 @@ impl TeamRunManager { slot_id: pending.slot_id.clone(), role, conversation_id: conversation_id.to_owned(), + wake_source: pending.source, + message_id: pending.message_id.clone(), state: StartingReservationState::Starting, }; run.starting_reservations @@ -548,6 +552,50 @@ impl TeamRunManager { Some(reservation) } + pub async fn retry_child_start_later(&self, reservation_id: &str, reason: &str) -> Option { + let mut guard = self.state.lock().await; + let run = guard.as_mut().filter(|run| run.is_active())?; + let reservation = match run.starting_reservations.remove(reservation_id) { + Some(reservation) => reservation, + None => { + warn!( + team_id = %self.team_id, + reservation_id, + error = %reason, + "team_run reservation retry ignored because reservation is missing" + ); + return None; + } + }; + + let pending = PendingWake { + slot_id: reservation.slot_id.clone(), + role: reservation.role, + source: reservation.wake_source, + message_id: reservation.message_id.clone(), + }; + run.pending_wakes + .entry(reservation.slot_id.clone()) + .or_default() + .push_front(pending); + let payload = run.payload(); + info!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + reservation_id = %reservation.reservation_id, + slot_id = %reservation.slot_id, + error = %reason, + slot_pending_wake_count = run.pending_wake_count_for_slot(&reservation.slot_id), + pending_wake_count = payload.pending_wake_count, + starting_child_count = payload.starting_child_count, + active_child_count = payload.active_child_count, + "team_run reservation deferred for retry" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload.clone()); + Some(payload) + } + pub(crate) async fn peek_next_pending_wake(&self, slot_id: &str) -> Option { let guard = self.state.lock().await; guard diff --git a/crates/aionui-team/tests/e2e_team_flow.rs b/crates/aionui-team/tests/e2e_team_flow.rs index cc99c3df9..4db6d6f89 100644 --- a/crates/aionui-team/tests/e2e_team_flow.rs +++ b/crates/aionui-team/tests/e2e_team_flow.rs @@ -21,6 +21,7 @@ mod common; use std::collections::HashMap; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; @@ -121,6 +122,50 @@ impl AgentTurnExecutionPort for ErrorBeforeStartTurnPort { } } +#[derive(Default)] +struct SkippedOnceThenSuccessTurnPort { + attempts: AtomicUsize, + requests: Arc>>, +} + +impl SkippedOnceThenSuccessTurnPort { + fn requests(&self) -> Arc>> { + self.requests.clone() + } +} + +#[async_trait] +impl AgentTurnExecutionPort for SkippedOnceThenSuccessTurnPort { + async fn run_agent_turn(&self, request: AgentTurnRequest) -> Result { + let attempt = self.attempts.fetch_add(1, Ordering::SeqCst); + self.requests.lock().unwrap().push(request.clone()); + if attempt == 0 { + return Err(AgentTurnExecutionError::Skipped { + reason: format!("conversation {} is already running", request.conversation_id), + }); + } + + let turn_id = format!("turn-retry-{attempt}"); + if let (Some(team_run_id), Some(on_started)) = (request.team_run_id.clone(), request.on_started.as_ref()) { + on_started(AgentTurnStarted { + team_run_id, + slot_id: request.slot_id.clone(), + role: request.role.clone(), + conversation_id: request.conversation_id.clone(), + turn_id: turn_id.clone(), + }) + .await; + } + + Ok(AgentTurnOutcome { + conversation_id: request.conversation_id, + turn_id, + status: AgentTurnStatus::Completed, + runtime: None, + }) + } +} + struct StartedThenFailedTurnPort; #[async_trait] @@ -698,6 +743,16 @@ async fn wait_for_event(broadcaster: &RecordingBroadcaster, name: &str) { panic!("timed out waiting for event {name}"); } +async fn wait_for_turn_request_count(requests: &Arc>>, count: usize) { + for _ in 0..100 { + if requests.lock().unwrap().len() >= count { + return; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("timed out waiting for {count} turn requests"); +} + fn register_test_event_loops(session: &Arc) { let registry = session.event_loops().clone(); for agent in two_agents() { @@ -1593,6 +1648,39 @@ async fn s9b_event_loop_fails_run_when_turn_errors_before_started() { session.stop(); } +#[tokio::test] +async fn s9b_retryable_busy_before_start_retains_team_run_for_retry() { + let broadcaster = Arc::new(RecordingBroadcaster::new()); + let turn_port = Arc::new(SkippedOnceThenSuccessTurnPort::default()); + let requests = turn_port.requests(); + let session = + setup_session_with_runtime_ports(turn_port, Arc::new(NoopCancellationPort), broadcaster.clone()).await; + + session + .send_message("user input to team", None) + .await + .expect("send_message must succeed"); + + wait_for_turn_request_count(&requests, 2).await; + let request_log = requests.lock().unwrap(); + let first_team_run_id = request_log[0] + .team_run_id + .as_deref() + .expect("first attempt should belong to TeamRun"); + let second_team_run_id = request_log[1] + .team_run_id + .as_deref() + .expect("retry must still belong to TeamRun"); + assert_eq!(second_team_run_id, first_team_run_id); + assert_eq!( + session.team_run_manager().active_run_id().await, + None, + "run should complete after the retry succeeds" + ); + + session.stop(); +} + #[tokio::test] async fn s9c_event_loop_fails_run_when_started_turn_returns_failed() { let broadcaster = Arc::new(RecordingBroadcaster::new()); From 844e026a5e9b845ffb7e6aa40a01aded4dacc4c0 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 20:56:39 +0800 Subject: [PATCH 06/14] fix(team): retry handoff turns after runtime release --- .../src/router/team_conversation_adapters.rs | 53 +++++++++++---- .../aionui-conversation/src/runtime_state.rs | 47 +++++++++++++ crates/aionui-team/src/event_loop.rs | 2 - crates/aionui-team/tests/e2e_team_flow.rs | 68 ++++++++----------- 4 files changed, 116 insertions(+), 54 deletions(-) diff --git a/crates/aionui-app/src/router/team_conversation_adapters.rs b/crates/aionui-app/src/router/team_conversation_adapters.rs index 50526cddd..2eb114056 100644 --- a/crates/aionui-app/src/router/team_conversation_adapters.rs +++ b/crates/aionui-app/src/router/team_conversation_adapters.rs @@ -16,6 +16,7 @@ use aionui_team::{ TeamConversationProvisioningPort, TeamError, TeamProjectionMessageStore, }; use async_trait::async_trait; +use tracing::info; pub struct TeamConversationAdapters { conversation_service: ConversationService, @@ -73,18 +74,42 @@ impl AgentTurnExecutionPort for TeamConversationAdapters { > }); - let outcome = self - .conversation_service - .run_agent_turn(ConversationAgentTurnRequest { - user_id: request.user_id, - conversation_id: request.conversation_id, - content: request.content, - files: request.files, - inject_skills: Vec::new(), - on_started, - }) - .await - .map_err(map_conversation_turn_error)?; + let conversation_id = request.conversation_id.clone(); + let outcome = loop { + match self + .conversation_service + .run_agent_turn(ConversationAgentTurnRequest { + user_id: request.user_id.clone(), + conversation_id: conversation_id.clone(), + content: request.content.clone(), + files: request.files.clone(), + inject_skills: Vec::new(), + on_started: on_started.clone(), + }) + .await + { + Ok(outcome) => break outcome, + Err(error) if is_retryable_conversation_busy(&error) => { + info!( + conversation_id = %conversation_id, + team_run_id = ?request.team_run_id, + slot_id = %request.slot_id, + "team conversation turn waiting for active conversation turn to release" + ); + self.conversation_service + .runtime_state() + .wait_until_unclaimed(&conversation_id) + .await; + info!( + conversation_id = %conversation_id, + team_run_id = ?request.team_run_id, + slot_id = %request.slot_id, + "team conversation turn retrying after active conversation turn released" + ); + } + Err(error) => return Err(map_conversation_turn_error(error)), + } + }; Ok(AgentTurnOutcome { conversation_id: outcome.conversation_id, @@ -275,6 +300,10 @@ impl TeamConversationLookupPort for TeamConversationAdapters { } } +fn is_retryable_conversation_busy(error: &ConversationError) -> bool { + matches!(error, ConversationError::Busy { reason } if reason.contains("already running")) +} + fn map_conversation_create_error(error: ConversationError) -> TeamError { match error { ConversationError::WorkspacePathUnavailable { path } => TeamError::WorkspacePathUnavailable(path), diff --git a/crates/aionui-conversation/src/runtime_state.rs b/crates/aionui-conversation/src/runtime_state.rs index f047853d6..a0269916c 100644 --- a/crates/aionui-conversation/src/runtime_state.rs +++ b/crates/aionui-conversation/src/runtime_state.rs @@ -5,6 +5,7 @@ use std::{ use aionui_api_types::{ConversationRuntimeStateKind, ConversationRuntimeSummary}; use aionui_common::ConversationStatus; +use tokio::sync::Notify; use tracing::{info, warn}; use crate::ConversationError; @@ -12,6 +13,7 @@ use crate::ConversationError; #[derive(Debug, Default)] pub struct ConversationRuntimeStateService { state: Mutex, + release_notify: Notify, } #[derive(Debug, Default)] @@ -112,6 +114,16 @@ impl ConversationRuntimeStateService { .and_then(|state| state.active_turns.get(conversation_id).cloned()) } + pub async fn wait_until_unclaimed(&self, conversation_id: &str) { + loop { + let notified = self.release_notify.notified(); + if !self.is_claimed(conversation_id) { + return; + } + notified.await; + } + } + pub fn mark_deleting(&self, conversation_id: &str) -> bool { match self.state.lock() { Ok(mut state) => { @@ -305,6 +317,8 @@ impl ConversationRuntimeStateService { deleting = was_deleting, "conversation runtime turn claim released" ); + drop(state); + self.release_notify.notify_waiters(); was_deleting } Err(_) => { @@ -417,6 +431,39 @@ mod tests { assert!(state.try_claim_turn("conv-1", "turn-2").is_ok()); } + #[tokio::test] + async fn wait_until_unclaimed_completes_after_active_claim_releases() { + let state = Arc::new(ConversationRuntimeStateService::default()); + let mut claim = state + .try_claim_turn("conv-1", "turn-1") + .expect("claim should be created"); + + let waiter = { + let state = state.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + state.wait_until_unclaimed("conv-1").await; + let _ = tx.send(()); + }); + rx + }; + tokio::pin!(waiter); + + assert!( + tokio::time::timeout(std::time::Duration::from_millis(20), &mut waiter) + .await + .is_err(), + "waiter must stay pending while the claim is active" + ); + + let _ = claim.release(); + assert!(!state.is_claimed("conv-1")); + tokio::time::timeout(std::time::Duration::from_secs(1), &mut waiter) + .await + .expect("waiter should finish after release") + .expect("waiter task should send completion"); + } + #[test] fn deleting_rejects_new_turn_claims() { let state = Arc::new(ConversationRuntimeStateService::default()); diff --git a/crates/aionui-team/src/event_loop.rs b/crates/aionui-team/src/event_loop.rs index 4277ce9e3..5b24c5e4c 100644 --- a/crates/aionui-team/src/event_loop.rs +++ b/crates/aionui-team/src/event_loop.rs @@ -346,8 +346,6 @@ async fn execute_turn(ctx: &AgentLoopContext, input: &crate::session::WakeInput) .retry_child_start_later(&reservation.reservation_id, &e.to_string()) .await; let _ = ctx.scheduler.set_status(&ctx.slot_id, TeammateStatus::Idle).await; - tokio::time::sleep(Duration::from_millis(250)).await; - ctx.registry.notify(&ctx.slot_id); return None; } else { ctx.session diff --git a/crates/aionui-team/tests/e2e_team_flow.rs b/crates/aionui-team/tests/e2e_team_flow.rs index 4db6d6f89..a92cf14da 100644 --- a/crates/aionui-team/tests/e2e_team_flow.rs +++ b/crates/aionui-team/tests/e2e_team_flow.rs @@ -21,7 +21,6 @@ mod common; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; @@ -123,45 +122,22 @@ impl AgentTurnExecutionPort for ErrorBeforeStartTurnPort { } #[derive(Default)] -struct SkippedOnceThenSuccessTurnPort { - attempts: AtomicUsize, +struct SkippedBusyTurnPort { requests: Arc>>, } -impl SkippedOnceThenSuccessTurnPort { +impl SkippedBusyTurnPort { fn requests(&self) -> Arc>> { self.requests.clone() } } #[async_trait] -impl AgentTurnExecutionPort for SkippedOnceThenSuccessTurnPort { +impl AgentTurnExecutionPort for SkippedBusyTurnPort { async fn run_agent_turn(&self, request: AgentTurnRequest) -> Result { - let attempt = self.attempts.fetch_add(1, Ordering::SeqCst); self.requests.lock().unwrap().push(request.clone()); - if attempt == 0 { - return Err(AgentTurnExecutionError::Skipped { - reason: format!("conversation {} is already running", request.conversation_id), - }); - } - - let turn_id = format!("turn-retry-{attempt}"); - if let (Some(team_run_id), Some(on_started)) = (request.team_run_id.clone(), request.on_started.as_ref()) { - on_started(AgentTurnStarted { - team_run_id, - slot_id: request.slot_id.clone(), - role: request.role.clone(), - conversation_id: request.conversation_id.clone(), - turn_id: turn_id.clone(), - }) - .await; - } - - Ok(AgentTurnOutcome { - conversation_id: request.conversation_id, - turn_id, - status: AgentTurnStatus::Completed, - runtime: None, + Err(AgentTurnExecutionError::Skipped { + reason: format!("conversation {} is already running", request.conversation_id), }) } } @@ -1649,9 +1625,9 @@ async fn s9b_event_loop_fails_run_when_turn_errors_before_started() { } #[tokio::test] -async fn s9b_retryable_busy_before_start_retains_team_run_for_retry() { +async fn s9b_retryable_busy_before_start_retains_team_run_without_timer_retry() { let broadcaster = Arc::new(RecordingBroadcaster::new()); - let turn_port = Arc::new(SkippedOnceThenSuccessTurnPort::default()); + let turn_port = Arc::new(SkippedBusyTurnPort::default()); let requests = turn_port.requests(); let session = setup_session_with_runtime_ports(turn_port, Arc::new(NoopCancellationPort), broadcaster.clone()).await; @@ -1661,22 +1637,34 @@ async fn s9b_retryable_busy_before_start_retains_team_run_for_retry() { .await .expect("send_message must succeed"); - wait_for_turn_request_count(&requests, 2).await; + wait_for_turn_request_count(&requests, 1).await; + tokio::time::sleep(std::time::Duration::from_millis(350)).await; let request_log = requests.lock().unwrap(); let first_team_run_id = request_log[0] .team_run_id .as_deref() - .expect("first attempt should belong to TeamRun"); - let second_team_run_id = request_log[1] - .team_run_id - .as_deref() - .expect("retry must still belong to TeamRun"); - assert_eq!(second_team_run_id, first_team_run_id); + .expect("first attempt should belong to TeamRun") + .to_owned(); + assert_eq!( + request_log.len(), + 1, + "Team event loop should not use a timer to retry retryable busy skips" + ); + drop(request_log); + assert_eq!( session.team_run_manager().active_run_id().await, - None, - "run should complete after the retry succeeds" + Some(first_team_run_id.clone()), + "run should remain active for a state-driven retry" ); + let payload = session + .team_run_manager() + .current_payload() + .await + .expect("retryable busy skip should keep the active run"); + assert_eq!(payload.team_run_id, first_team_run_id); + assert_eq!(payload.pending_wake_count, 1); + assert_eq!(payload.starting_child_count, 0); session.stop(); } From a829eed54fac124b8dc16ea1c8cf486edd9038c7 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:23:44 +0800 Subject: [PATCH 07/14] feat(team): add team run operation leases --- crates/aionui-team/src/team_run.rs | 329 ++++++++++++++++++++++++++++- 1 file changed, 328 insertions(+), 1 deletion(-) diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index 73f35d67e..96fa94867 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -6,7 +6,7 @@ use aionui_api_types::{ }; use aionui_common::{TimestampMs, generate_id, now_ms}; use tokio::sync::Mutex; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use crate::error::TeamError; use crate::events::{ @@ -72,6 +72,27 @@ pub(crate) enum WakeRecordDecision { Suppressed, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct TeamRunOperationLease { + pub lease_id: String, + pub team_run_id: String, + pub slot_id: String, + pub role: TeamRunTargetRole, + pub wake_source: TeamWakeSource, + pub accepted_as_new_run: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ActiveOperationLease { + lease: TeamRunOperationLease, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum TeamRunWakeAcquireOutcome { + Accepted(TeamRunOperationLease), + Suppressed, +} + #[derive(Debug, Clone)] pub struct PauseSlotOutcome { pub team_run_id: String, @@ -100,6 +121,7 @@ struct TeamRunRecord { starting_reservations: HashMap, pending_wakes: HashMap>, slot_wake_gate: SlotWakeGate, + active_operation_leases: HashMap, } impl TeamRunRecord { @@ -118,6 +140,10 @@ impl TeamRunRecord { .count() } + fn active_operation_lease_count(&self) -> usize { + self.active_operation_leases.len() + } + fn role_for_slot(&self, slot_id: &str) -> Option { self.active_child_turns .get(slot_id) @@ -219,6 +245,48 @@ impl TeamRunRecord { } } +fn new_operation_lease( + run: &mut TeamRunRecord, + slot_id: &str, + role: TeamRunTargetRole, + wake_source: TeamWakeSource, + accepted_as_new_run: bool, +) -> TeamRunOperationLease { + let lease = TeamRunOperationLease { + lease_id: generate_id(), + team_run_id: run.team_run_id.clone(), + slot_id: slot_id.to_owned(), + role, + wake_source, + accepted_as_new_run, + }; + run.active_operation_leases.insert( + lease.lease_id.clone(), + ActiveOperationLease { + lease: lease.clone(), + }, + ); + lease +} + +fn push_pending_wake_locked( + run: &mut TeamRunRecord, + slot_id: String, + role: TeamRunTargetRole, + source: TeamWakeSource, + message_id: Option, +) { + run.pending_wakes + .entry(slot_id.clone()) + .or_default() + .push_back(PendingWake { + slot_id, + role, + source, + message_id, + }); +} + #[derive(Clone)] pub struct TeamRunManager { team_id: String, @@ -289,6 +357,7 @@ impl TeamRunManager { starting_reservations: HashMap::new(), pending_wakes: HashMap::new(), slot_wake_gate: SlotWakeGate::default(), + active_operation_leases: HashMap::new(), }; let ack = record.ack(target_slot_id, target_role, message_id); let payload = record.payload(); @@ -306,6 +375,189 @@ impl TeamRunManager { Ok(ack) } + pub(crate) async fn acquire_user_message_wake( + &self, + slot_id: &str, + role: TeamRunTargetRole, + ) -> Result<(TeamRunAckResponse, TeamRunOperationLease), TeamError> { + let mut guard = self.state.lock().await; + + if let Some(run) = guard.as_mut().filter(|r| r.is_active()) { + if run.active_child_turns.contains_key(slot_id) || run.starting_child_count_for_slot(slot_id) > 0 { + info!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + slot_id, + role = ?role, + "team_run user wake acquire rejected because slot is busy" + ); + return Err(TeamError::SlotBusy(slot_id.to_owned())); + } + + let source = TeamWakeSource::UserIntervention; + let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + let lease = new_operation_lease(run, slot_id, role.clone(), source, false); + let ack = run.ack(slot_id, role, None); + info!( + team_id = %self.team_id, + team_run_id = %lease.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + accepted_as_new_run = lease.accepted_as_new_run, + "team_run operation lease acquired" + ); + return Ok((ack, lease)); + } + + if let Some(cancelling) = guard.as_ref().filter(|r| matches!(r.status, TeamRunStatus::Cancelling)) { + return Err(TeamError::InvalidRequest(format!( + "team run {} is cancelling", + cancelling.team_run_id + ))); + } + + let mut record = TeamRunRecord { + team_run_id: generate_id(), + team_id: self.team_id.clone(), + target_slot_id: slot_id.to_owned(), + target_role: role.clone(), + status: TeamRunStatus::Accepted, + started_at: None, + completed_at: None, + cancelled_at: None, + cancel_reason: None, + active_child_turns: HashMap::new(), + starting_reservations: HashMap::new(), + pending_wakes: HashMap::new(), + slot_wake_gate: SlotWakeGate::default(), + active_operation_leases: HashMap::new(), + }; + let lease = new_operation_lease( + &mut record, + slot_id, + role.clone(), + TeamWakeSource::UserMessage, + true, + ); + let ack = record.ack(slot_id, role, None); + let payload = record.payload(); + *guard = Some(record); + drop(guard); + + info!( + team_id = %self.team_id, + team_run_id = %ack.team_run_id, + target_slot_id = %ack.target_slot_id, + target_role = ?ack.target_role, + "team_run accepted" + ); + info!( + team_id = %self.team_id, + team_run_id = %lease.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + accepted_as_new_run = lease.accepted_as_new_run, + "team_run operation lease acquired" + ); + self.emitter.broadcast_team_run(TEAM_RUN_ACCEPTED_EVENT, payload); + Ok((ack, lease)) + } + + pub(crate) async fn commit_operation_lease( + &self, + lease_id: &str, + trigger_message_id: Option, + ) -> Result<(), TeamError> { + let mut guard = self.state.lock().await; + let Some(run) = guard.as_mut().filter(|r| r.is_active()) else { + error!( + team_id = %self.team_id, + lease_id, + "team_run operation lease commit failed because no active run exists" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + let Some(active) = run.active_operation_leases.remove(lease_id) else { + error!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id, + "team_run operation lease commit failed because lease is missing" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + + let lease = active.lease; + push_pending_wake_locked( + run, + lease.slot_id.clone(), + lease.role.clone(), + lease.wake_source, + trigger_message_id, + ); + let payload = run.payload(); + info!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + pending_wake_count = payload.pending_wake_count, + active_operation_lease_count = run.active_operation_lease_count(), + "team_run operation lease committed" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Ok(()) + } + + pub(crate) async fn abort_operation_lease(&self, lease_id: &str, reason: &str) -> Result<(), TeamError> { + let mut guard = self.state.lock().await; + let Some(run) = guard.as_mut().filter(|r| r.is_active()) else { + error!( + team_id = %self.team_id, + lease_id, + reason, + "team_run operation lease abort failed because no active run exists" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + let Some(active) = run.active_operation_leases.remove(lease_id) else { + error!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id, + reason, + "team_run operation lease abort failed because lease is missing" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + let payload = run.payload(); + warn!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id = %active.lease.lease_id, + slot_id = %active.lease.slot_id, + wake_source = %active.lease.wake_source, + reason, + active_operation_lease_count = run.active_operation_lease_count(), + "team_run operation lease aborted" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Ok(()) + } + pub async fn active_run_id(&self) -> Option { let guard = self.state.lock().await; guard.as_ref().filter(|r| r.is_active()).map(|r| r.team_run_id.clone()) @@ -1124,6 +1376,7 @@ fn maybe_complete_locked(run: &mut TeamRunRecord, emitter: &TeamEventEmitter) -> if run.pending_wake_count() > 0 || !run.starting_reservations.is_empty() || !run.active_child_turns.is_empty() + || run.active_operation_lease_count() > 0 || run.has_retained_wake_gate_work() { emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, run.payload()); @@ -1157,6 +1410,7 @@ fn maybe_cancelled_locked(run: &mut TeamRunRecord, emitter: &TeamEventEmitter) - if run.pending_wake_count() > 0 || !run.starting_reservations.is_empty() || !run.active_child_turns.is_empty() + || run.active_operation_lease_count() > 0 || run.has_retained_wake_gate_work() { emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, run.payload()); @@ -1235,6 +1489,79 @@ mod tests { .expect("slot work must exist") } + #[tokio::test] + async fn lease_keeps_run_active_until_commit() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user message should create run and lease"); + + assert_eq!(lease.team_run_id, ack.team_run_id); + assert_eq!(lease.slot_id, "lead"); + assert_eq!(lease.wake_source, TeamWakeSource::UserMessage); + assert!(lease.accepted_as_new_run); + + let completed = manager.maybe_complete().await; + assert!(completed.is_none(), "active lease must retain the run"); + assert_eq!( + manager.active_run_id().await.as_deref(), + Some(ack.team_run_id.as_str()) + ); + + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-1".into())) + .await + .expect("commit should convert lease to pending wake"); + + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .expect("committed wake should be claimable"); + assert_eq!(reservation.message_id.as_deref(), Some("mailbox-1")); + } + + #[tokio::test] + async fn abort_lease_releases_completion_hold() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user message should create run and lease"); + + manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await + .expect("abort should remove lease"); + + let completed = manager + .maybe_complete() + .await + .expect("run should complete after aborted only lease"); + assert_eq!(completed.team_run_id, ack.team_run_id); + assert_eq!(completed.status, TeamRunStatus::Completed); + } + + #[tokio::test] + async fn commit_missing_lease_returns_internal_consistency_error() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("create active run"); + + let err = manager + .commit_operation_lease("missing-lease", Some("mailbox-1".into())) + .await + .expect_err("missing lease is a contract violation"); + + assert!(matches!( + err, + TeamError::InvalidRequest(message) + if message == "team run operation lease missing: missing-lease" + )); + } + #[tokio::test] async fn cancel_run_clears_paused_gate_and_reaches_cancelled_terminal() { let (manager, bc) = manager(); From b7f7ec669137183ee345478d605dddc7da49f43d Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:26:28 +0800 Subject: [PATCH 08/14] feat(team): centralize wake acquire policy --- crates/aionui-team/src/team_run.rs | 420 ++++++++++++++++++++++++++++- 1 file changed, 419 insertions(+), 1 deletion(-) diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index 96fa94867..cfe174087 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -93,6 +93,28 @@ pub(crate) enum TeamRunWakeAcquireOutcome { Suppressed, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TeamRunSlotState { + Busy, + Pending, + Paused, + Idle, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AcquirePolicyDecision { + Accept, + Suppress(&'static str), + RejectSlotBusy, + RejectInvalid(&'static str), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TeamRunWakeIntent { + ExternalRequest, + SchedulerWakeTarget, +} + #[derive(Debug, Clone)] pub struct PauseSlotOutcome { pub team_run_id: String, @@ -144,6 +166,29 @@ impl TeamRunRecord { self.active_operation_leases.len() } + fn slot_run_state(&self, slot_id: &str) -> TeamRunSlotState { + if self.active_child_turns.contains_key(slot_id) || self.starting_child_count_for_slot(slot_id) > 0 { + return TeamRunSlotState::Busy; + } + if self.pending_wake_count_for_slot(slot_id) > 0 { + return TeamRunSlotState::Pending; + } + if self.slot_wake_gate.snapshot_for_slot(slot_id).paused { + return TeamRunSlotState::Paused; + } + TeamRunSlotState::Idle + } + + fn has_spawn_welcome_for_slot(&self, slot_id: &str) -> bool { + self.pending_wakes.get(slot_id).is_some_and(|wakes| { + wakes + .iter() + .any(|wake| wake.source == TeamWakeSource::SpawnWelcome) + }) || self.active_operation_leases.values().any(|active| { + active.lease.slot_id == slot_id && active.lease.wake_source == TeamWakeSource::SpawnWelcome + }) + } + fn role_for_slot(&self, slot_id: &str) -> Option { self.active_child_turns .get(slot_id) @@ -287,6 +332,59 @@ fn push_pending_wake_locked( }); } +fn acquire_policy( + source: TeamWakeSource, + slot_state: TeamRunSlotState, + has_spawn_welcome: bool, + intent: TeamRunWakeIntent, +) -> AcquirePolicyDecision { + match source { + TeamWakeSource::UserMessage | TeamWakeSource::UserIntervention => match slot_state { + TeamRunSlotState::Busy => AcquirePolicyDecision::RejectSlotBusy, + TeamRunSlotState::Pending | TeamRunSlotState::Paused | TeamRunSlotState::Idle => { + AcquirePolicyDecision::Accept + } + }, + TeamWakeSource::McpSendMessage => match slot_state { + TeamRunSlotState::Paused => AcquirePolicyDecision::Suppress("paused_slot_background_wake"), + TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Idle => { + AcquirePolicyDecision::Accept + } + }, + TeamWakeSource::SpawnWelcome => { + if has_spawn_welcome { + return AcquirePolicyDecision::Suppress("duplicate_spawn_welcome"); + } + match slot_state { + TeamRunSlotState::Busy => AcquirePolicyDecision::RejectInvalid("spawn welcome target is already busy"), + TeamRunSlotState::Pending => AcquirePolicyDecision::Suppress("spawn_welcome_already_pending"), + TeamRunSlotState::Paused | TeamRunSlotState::Idle => AcquirePolicyDecision::Accept, + } + } + TeamWakeSource::McpShutdownRequest => AcquirePolicyDecision::Accept, + TeamWakeSource::IdleNotification | TeamWakeSource::InterruptedNotification => match slot_state { + TeamRunSlotState::Idle if intent == TeamRunWakeIntent::SchedulerWakeTarget => { + AcquirePolicyDecision::Accept + } + TeamRunSlotState::Idle => { + AcquirePolicyDecision::Suppress("background_notification_without_wake_target") + } + TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Paused => { + AcquirePolicyDecision::Suppress("background_notification_deduped") + } + }, + TeamWakeSource::CrashNotification + | TeamWakeSource::InactivityTimeout + | TeamWakeSource::SpawnAttachFailure + | TeamWakeSource::ShutdownRejected => match slot_state { + TeamRunSlotState::Paused => AcquirePolicyDecision::Suppress("paused_slot_recovery_wake"), + TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Idle => { + AcquirePolicyDecision::Accept + } + }, + } +} + #[derive(Clone)] pub struct TeamRunManager { team_id: String, @@ -383,7 +481,7 @@ impl TeamRunManager { let mut guard = self.state.lock().await; if let Some(run) = guard.as_mut().filter(|r| r.is_active()) { - if run.active_child_turns.contains_key(slot_id) || run.starting_child_count_for_slot(slot_id) > 0 { + if matches!(run.slot_run_state(slot_id), TeamRunSlotState::Busy) { info!( team_id = %self.team_id, team_run_id = %run.team_run_id, @@ -558,6 +656,90 @@ impl TeamRunManager { Ok(()) } + async fn acquire_run_scoped_wake_with_intent( + &self, + slot_id: &str, + role: TeamRunTargetRole, + source: TeamWakeSource, + intent: TeamRunWakeIntent, + ) -> Result { + let mut guard = self.state.lock().await; + let Some(run) = guard.as_mut().filter(|r| r.is_active()) else { + warn!( + team_id = %self.team_id, + slot_id, + role = ?role, + wake_source = %source, + "team_run wake acquire rejected because no active run exists" + ); + return Err(TeamError::InvalidRequest( + "no active team run for run-scoped wake".into(), + )); + }; + + let slot_state = run.slot_run_state(slot_id); + let decision = acquire_policy( + source, + slot_state, + run.has_spawn_welcome_for_slot(slot_id), + intent, + ); + match decision { + AcquirePolicyDecision::RejectSlotBusy => Err(TeamError::SlotBusy(slot_id.to_owned())), + AcquirePolicyDecision::RejectInvalid(message) => Err(TeamError::InvalidRequest(message.into())), + AcquirePolicyDecision::Suppress(reason) => { + let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + let payload = run.payload(); + debug!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + slot_id, + wake_source = %source, + slot_state = ?slot_state, + reason, + "team_run wake acquire suppressed" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Ok(TeamRunWakeAcquireOutcome::Suppressed) + } + AcquirePolicyDecision::Accept => { + let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + let lease = new_operation_lease(run, slot_id, role, source, false); + info!( + team_id = %self.team_id, + team_run_id = %lease.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + accepted_as_new_run = lease.accepted_as_new_run, + "team_run operation lease acquired" + ); + Ok(TeamRunWakeAcquireOutcome::Accepted(lease)) + } + } + } + + pub(crate) async fn acquire_run_scoped_wake( + &self, + slot_id: &str, + role: TeamRunTargetRole, + source: TeamWakeSource, + ) -> Result { + self.acquire_run_scoped_wake_with_intent(slot_id, role, source, TeamRunWakeIntent::ExternalRequest) + .await + } + + pub(crate) async fn acquire_scheduler_wake( + &self, + slot_id: &str, + role: TeamRunTargetRole, + source: TeamWakeSource, + ) -> Result { + self.acquire_run_scoped_wake_with_intent(slot_id, role, source, TeamRunWakeIntent::SchedulerWakeTarget) + .await + } + pub async fn active_run_id(&self) -> Option { let guard = self.state.lock().await; guard.as_ref().filter(|r| r.is_active()).map(|r| r.team_run_id.clone()) @@ -1562,6 +1744,242 @@ mod tests { )); } + #[tokio::test] + async fn run_scoped_wake_without_active_run_is_rejected() { + let (manager, _bc) = manager(); + + let err = manager + .acquire_run_scoped_wake( + "worker", + TeamRunTargetRole::Teammate, + TeamWakeSource::McpSendMessage, + ) + .await + .expect_err("run-scoped wake must need active run"); + + assert!(matches!( + err, + TeamError::InvalidRequest(message) + if message == "no active team run for run-scoped wake" + )); + } + + #[tokio::test] + async fn user_message_busy_active_slot_is_rejected_without_lease() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-1".into())) + .await + .unwrap(); + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .unwrap(); + manager + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id, + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + }, + ) + .await; + + let err = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect_err("foreground user message must reject busy active slot"); + + assert!(matches!(err, TeamError::SlotBusy(slot) if slot == "lead")); + let payload = manager.current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 0); + } + + #[tokio::test] + async fn user_message_pending_slot_is_accepted_as_additional_foreground_wake() { + let (manager, _bc) = manager(); + let (_ack, first) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .commit_operation_lease(&first.lease_id, Some("mailbox-1".into())) + .await + .unwrap(); + + let (_ack, second) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("pending foreground message must be accepted"); + manager + .commit_operation_lease(&second.lease_id, Some("mailbox-2".into())) + .await + .unwrap(); + + let payload = manager.current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 2); + } + + #[tokio::test] + async fn mcp_send_message_busy_slot_is_accepted_and_queued() { + let (manager, _bc) = manager(); + let (ack, first) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .commit_operation_lease(&first.lease_id, Some("mailbox-1".into())) + .await + .unwrap(); + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .unwrap(); + manager + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id, + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + }, + ) + .await; + + let outcome = manager + .acquire_run_scoped_wake("lead", TeamRunTargetRole::Lead, TeamWakeSource::McpSendMessage) + .await + .expect("MCP message to busy slot must be accepted"); + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + panic!("MCP message should not be suppressed"); + }; + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-2".into())) + .await + .unwrap(); + + let payload = manager.current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 1); + } + + #[tokio::test] + async fn paused_slot_suppresses_mcp_send_message_without_lease() { + let (manager, _bc) = manager(); + let (ack, _lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .pause_slot_work("lead", Some("user stopped".into())) + .await + .unwrap(); + + let outcome = manager + .acquire_run_scoped_wake("lead", TeamRunTargetRole::Lead, TeamWakeSource::McpSendMessage) + .await + .expect("paused background wake suppresses cleanly"); + assert_eq!(outcome, TeamRunWakeAcquireOutcome::Suppressed); + assert_eq!( + manager.active_run_id().await.as_deref(), + Some(ack.team_run_id.as_str()) + ); + } + + #[tokio::test] + async fn duplicate_spawn_welcome_is_suppressed() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + + let first = manager + .acquire_run_scoped_wake( + "new-worker", + TeamRunTargetRole::Teammate, + TeamWakeSource::SpawnWelcome, + ) + .await + .unwrap(); + let TeamRunWakeAcquireOutcome::Accepted(first) = first else { + panic!("first spawn welcome should be accepted"); + }; + manager + .commit_operation_lease(&first.lease_id, Some("welcome-1".into())) + .await + .unwrap(); + + let second = manager + .acquire_run_scoped_wake( + "new-worker", + TeamRunTargetRole::Teammate, + TeamWakeSource::SpawnWelcome, + ) + .await + .unwrap(); + assert_eq!(second, TeamRunWakeAcquireOutcome::Suppressed); + } + + #[tokio::test] + async fn idle_notification_idle_slot_is_suppressed_without_scheduler_wake_target() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + + let outcome = manager + .acquire_run_scoped_wake( + "worker", + TeamRunTargetRole::Teammate, + TeamWakeSource::IdleNotification, + ) + .await + .expect("generic idle notification should suppress cleanly"); + + assert_eq!(outcome, TeamRunWakeAcquireOutcome::Suppressed); + } + + #[tokio::test] + async fn scheduler_idle_notification_idle_slot_is_accepted() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + + let outcome = manager + .acquire_scheduler_wake( + "worker", + TeamRunTargetRole::Teammate, + TeamWakeSource::IdleNotification, + ) + .await + .expect("scheduler-produced wake target should be accepted"); + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + panic!("scheduler wake target should produce a lease"); + }; + + manager + .commit_operation_lease(&lease.lease_id, None) + .await + .expect("scheduler wake should commit"); + let reservation = manager + .claim_wake_for_turn("worker", TeamRunTargetRole::Teammate, "conv-worker") + .await + .expect("scheduler wake should be claimable"); + assert_eq!(reservation.wake_source, TeamWakeSource::IdleNotification); + } + #[tokio::test] async fn cancel_run_clears_paused_gate_and_reaches_cancelled_terminal() { let (manager, bc) = manager(); From 00a4105466b3254a462fe57f2fbabd1de5c5568a Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:29:18 +0800 Subject: [PATCH 09/14] feat(team): use leases for user message wakes --- crates/aionui-team/src/session.rs | 186 ++++++++++++++++++++++++--- crates/aionui-team/src/test_utils.rs | 13 +- 2 files changed, 178 insertions(+), 21 deletions(-) diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index fa219bce3..a88841898 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -359,9 +359,9 @@ impl TeamSession { .ok_or_else(|| TeamError::AgentNotFound("no lead agent in team".into()))?; let lead_conv_id = self.scheduler.get_agent(&lead_slot_id).await?.conversation_id; - let mut ack = self + let (mut ack, lease) = self .team_run_manager - .accept_user_message(&lead_slot_id, TeamRunTargetRole::Lead, true, None) + .acquire_user_message_wake(&lead_slot_id, TeamRunTargetRole::Lead) .await?; let mailbox_message = match self @@ -379,7 +379,10 @@ impl TeamSession { { Ok(message) => message, Err(err) => { - self.team_run_manager.complete_failed().await; + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await; return Err(err); } }; @@ -404,12 +407,10 @@ impl TeamSession { } let _ = files; - self.wake_agent_for_team_work( - &lead_slot_id, - TeamWakeSource::UserMessage, - Some(mailbox_message.id.clone()), - ) - .await?; + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(mailbox_message.id.clone())) + .await?; + self.notify_reserved_wake_for_team_work(&lead_slot_id, lease.role.clone(), lease.wake_source); Ok(ack) } @@ -424,15 +425,9 @@ impl TeamSession { files: Option>, ) -> Result { let agent = self.scheduler.get_agent(slot_id).await?; - let has_active_run = self.team_run_manager.active_run_id().await.is_some(); - let source = if has_active_run { - TeamWakeSource::UserIntervention - } else { - TeamWakeSource::UserMessage - }; - let mut ack = self + let (mut ack, lease) = self .team_run_manager - .accept_user_message(slot_id, target_role_for(agent.role), has_active_run, None) + .acquire_user_message_wake(slot_id, target_role_for(agent.role)) .await?; let mailbox_message = match self @@ -450,7 +445,10 @@ impl TeamSession { { Ok(message) => message, Err(err) => { - self.team_run_manager.complete_failed().await; + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await; return Err(err); } }; @@ -475,8 +473,10 @@ impl TeamSession { } let _ = files; - self.wake_agent_for_team_work(slot_id, source, Some(mailbox_message.id.clone())) + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(mailbox_message.id.clone())) .await?; + self.notify_reserved_wake_for_team_work(slot_id, lease.role.clone(), lease.wake_source); Ok(ack) } @@ -1174,7 +1174,7 @@ mod tests { use aionui_ai_agent::AgentError; use aionui_ai_agent::agent_task::AgentInstance; use aionui_ai_agent::types::BuildTaskOptions; - use aionui_api_types::WebSocketMessage; + use aionui_api_types::{TeamRunStatus, WebSocketMessage}; use aionui_common::{AgentKillReason, TimestampMs}; use std::sync::{Arc, Mutex}; @@ -1305,6 +1305,64 @@ mod tests { } } + #[derive(Default)] + struct CompletingProjectionStore { + manager: Mutex>>, + } + + impl CompletingProjectionStore { + fn set_manager(&self, manager: Arc) { + *self.manager.lock().unwrap() = Some(manager); + } + } + + #[async_trait::async_trait] + impl TeamProjectionMessageStore for CompletingProjectionStore { + fn mint_message_id(&self) -> String { + "projection-completes-run".into() + } + + async fn find_projected_message( + &self, + _conversation_id: &str, + _msg_id: &str, + _msg_type: &str, + ) -> Result, TeamError> { + Ok(None) + } + + async fn insert_projected_message(&self, _row: &aionui_db::models::MessageRow) -> Result<(), TeamError> { + let manager = self.manager.lock().unwrap().clone(); + if let Some(manager) = manager { + manager.maybe_complete().await; + } + Ok(()) + } + } + + #[derive(Default)] + struct FailingProjectionStore; + + #[async_trait::async_trait] + impl TeamProjectionMessageStore for FailingProjectionStore { + fn mint_message_id(&self) -> String { + "projection-fails".into() + } + + async fn find_projected_message( + &self, + _conversation_id: &str, + _msg_id: &str, + _msg_type: &str, + ) -> Result, TeamError> { + Ok(None) + } + + async fn insert_projected_message(&self, _row: &aionui_db::models::MessageRow) -> Result<(), TeamError> { + Err(TeamError::InvalidRequest("projection failed for test".into())) + } + } + /// RecordingBroadcaster used by the D29d-1 ratification test below to /// assert that `team.agentSpawned` is *not* emitted on failed spawns. #[derive(Default)] @@ -1484,6 +1542,27 @@ mod tests { .unwrap() } + async fn start_session_with_repo_and_projection_store( + repo: Arc, + store: Arc, + ) -> TeamSession { + let broadcaster: Arc = Arc::new(NullBroadcaster); + TeamSession::start( + make_team(), + repo, + broadcaster, + backend_path(), + empty_task_manager(), + noop_turn_port(), + noop_cancellation_port(), + store, + "user-test".into(), + Weak::::new(), + ) + .await + .unwrap() + } + async fn start_session_with_cancellation_port( cancellation_port: Arc, ) -> TeamSession { @@ -2536,6 +2615,73 @@ mod tests { session.stop(); } + #[tokio::test] + async fn send_message_to_agent_survives_completion_between_projection_and_commit() { + let store = Arc::new(CompletingProjectionStore::default()); + let session = start_session_with_projection_store(store.clone()).await; + store.set_manager(session.team_run_manager().clone()); + session + .team_run_manager() + .accept_user_message("lead-1", TeamRunTargetRole::Lead, false, None) + .await + .expect("active run before user intervention"); + + let ack = session + .send_message_to_agent("worker-1", "race window message", None) + .await + .expect("lease should retain run while projection completes it"); + + assert_eq!(ack.accepted_slot_id, "worker-1"); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") + .await + .expect("committed user intervention should be pending"); + assert_eq!(reservation.message_id, ack.message_id); + session.stop(); + } + + #[tokio::test] + async fn send_message_projection_failure_still_commits_wake() { + let session = start_session_with_projection_store(Arc::new(FailingProjectionStore)).await; + + let ack = session + .send_message_to_agent("worker-1", "projection can fail", None) + .await + .expect("projection failure is non-fatal"); + + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") + .await + .expect("wake should still be committed"); + assert_eq!(reservation.message_id, ack.message_id); + session.stop(); + } + + #[tokio::test] + async fn send_message_mailbox_failure_aborts_lease_and_allows_completion() { + let repo: Arc = Arc::new(MockTeamRepo::with_message_write_failure()); + let session = start_session_with_repo_and_projection_store(repo, noop_projection_store()).await; + + let err = session + .send_message("mailbox fails", None) + .await + .expect_err("mailbox write failure must be returned"); + assert!( + err.to_string().contains("forced mailbox write failure"), + "unexpected error: {err}" + ); + + let completed = session + .team_run_manager() + .maybe_complete() + .await + .expect("aborted lease should allow empty run to complete"); + assert_eq!(completed.status, TeamRunStatus::Completed); + session.stop(); + } + #[tokio::test] async fn mirror_unread_to_conversation_skips_when_service_weak_is_dangling() { let session = start_session().await; diff --git a/crates/aionui-team/src/test_utils.rs b/crates/aionui-team/src/test_utils.rs index 863aa11df..e28ed4a7f 100644 --- a/crates/aionui-team/src/test_utils.rs +++ b/crates/aionui-team/src/test_utils.rs @@ -7,6 +7,7 @@ use std::sync::Mutex; pub struct MockState { pub messages: Vec, pub tasks: Vec, + pub fail_message_writes: bool, } pub struct MockTeamRepo { @@ -19,6 +20,12 @@ impl MockTeamRepo { state: Mutex::new(MockState::default()), } } + + pub fn with_message_write_failure() -> Self { + let repo = Self::new(); + repo.state.lock().unwrap().fail_message_writes = true; + repo + } } #[async_trait::async_trait] @@ -47,7 +54,11 @@ impl ITeamRepository for MockTeamRepo { // ── Mailbox ───────────────────────────────────────────────────── async fn write_message(&self, row: &MailboxMessageRow) -> Result<(), DbError> { - self.state.lock().unwrap().messages.push(row.clone()); + let mut state = self.state.lock().unwrap(); + if state.fail_message_writes { + return Err(DbError::Init("forced mailbox write failure".into())); + } + state.messages.push(row.clone()); Ok(()) } From e6f56e5329f0e986a6ee3a689a3bb6c076da7516 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:34:39 +0800 Subject: [PATCH 10/14] feat(team): use leases for mcp send message --- crates/aionui-team/src/mcp/server.rs | 4 -- crates/aionui-team/src/service.rs | 1 - crates/aionui-team/src/session.rs | 82 ++++++++++++++++++++++------ 3 files changed, 66 insertions(+), 21 deletions(-) diff --git a/crates/aionui-team/src/mcp/server.rs b/crates/aionui-team/src/mcp/server.rs index 71cbf954f..3acf231a5 100644 --- a/crates/aionui-team/src/mcp/server.rs +++ b/crates/aionui-team/src/mcp/server.rs @@ -546,10 +546,6 @@ async fn exec_send_message( let service = service .upgrade() .ok_or_else(|| "Team service not available; cannot wake target".to_string())?; - service - .require_active_team_run_for_team_work(team_id) - .await - .map_err(|e| e.to_string())?; let targets = if resolved_to == "*" { scheduler diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index 5758958e2..6999c3eaa 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -901,7 +901,6 @@ impl TeamSessionService { to_slot_id: &str, content: &str, ) -> Result<(), TeamError> { - self.require_active_team_run_for_team_work(team_id).await?; let session = { let entry = self .sessions diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index a88841898..4c20372ee 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -21,7 +21,7 @@ use crate::prompts::{build_lead_prompt, build_teammate_prompt, build_wake_payloa use crate::scheduler::{TeammateManager, normalize_name}; use crate::service::TeamSessionService; use crate::task_board::TaskBoard; -use crate::team_run::{ChildCancelTarget, TeamRunManager, WakeRecordDecision, target_role_for}; +use crate::team_run::{ChildCancelTarget, TeamRunManager, TeamRunWakeAcquireOutcome, WakeRecordDecision, target_role_for}; use crate::types::{MailboxMessageType, Team, TeamAgent, TeammateRole, TeammateStatus}; use crate::wake::TeamWakeSource; @@ -488,8 +488,19 @@ impl TeamSession { ) -> Result<(), TeamError> { let to_agent = self.scheduler.get_agent(to_slot_id).await?; let from_agent = self.scheduler.get_agent(from_slot_id).await?; + let outcome = self + .team_run_manager + .acquire_run_scoped_wake( + to_slot_id, + target_role_for(to_agent.role), + TeamWakeSource::McpSendMessage, + ) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Ok(()); + }; - let mailbox_message = self + let mailbox_message = match self .mailbox .write( &self.team.id, @@ -499,7 +510,17 @@ impl TeamSession { content, None, ) - .await?; + .await + { + Ok(message) => message, + Err(err) => { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await; + return Err(err); + } + }; let projection = TeamMessageProjection::new(self.projection_store.clone(), self.broadcaster.clone()); let request = TeamProjectionRequest { @@ -534,12 +555,11 @@ impl TeamSession { ); } - self.wake_agent_for_team_work( - to_slot_id, - TeamWakeSource::McpSendMessage, - Some(mailbox_message.id.clone()), - ) - .await + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(mailbox_message.id.clone())) + .await?; + self.notify_reserved_wake_for_team_work(to_slot_id, lease.role.clone(), lease.wake_source); + Ok(()) } pub(crate) async fn wake_agent_for_team_work( @@ -1614,6 +1634,31 @@ mod tests { session.stop(); } + #[tokio::test] + async fn agent_send_message_survives_completion_between_projection_and_commit() { + let store = Arc::new(CompletingProjectionStore::default()); + let session = start_session_with_projection_store(store.clone()).await; + store.set_manager(session.team_run_manager().clone()); + session + .team_run_manager() + .accept_user_message("lead-1", TeamRunTargetRole::Lead, false, None) + .await + .expect("active run before agent message"); + + session + .send_agent_message_from_agent("lead-1", "worker-1", "lease protected") + .await + .expect("agent message lease should retain run while projection completes it"); + + let unread = session + .mailbox() + .peek_unread(session.team_id(), "worker-1") + .await + .unwrap(); + assert!(unread.iter().any(|msg| msg.content == "lease protected")); + session.stop(); + } + #[tokio::test] async fn leader_message_reuses_active_run_when_leader_slot_is_free() { let session = start_session().await; @@ -1641,19 +1686,21 @@ mod tests { } #[tokio::test] - async fn leader_message_rejects_when_leader_slot_is_busy() { + async fn leader_message_queues_when_leader_slot_has_pending_wake() { let session = start_session().await; - session + let first = session .send_message("First leader message", None) .await .expect("first leader send creates run"); - let err = session + let second = session .send_message("Second leader message", None) .await - .expect_err("leader pending wake must make leader slot busy"); + .expect("leader pending wake should accept additional foreground message"); - assert!(matches!(err, TeamError::SlotBusy(slot_id) if slot_id == "lead-1")); + assert_eq!(second.team_run_id, first.team_run_id); + let payload = session.team_run_manager().current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 2); session.stop(); } @@ -1930,7 +1977,7 @@ mod tests { } #[tokio::test] - async fn paused_slot_suppresses_background_wake_and_keeps_mailbox_unread() { + async fn paused_slot_suppresses_mcp_send_without_mailbox_write() { let session = start_session().await; let ack = session.send_message("start", None).await.unwrap(); session @@ -1954,7 +2001,10 @@ mod tests { .peek_unread(session.team_id(), "lead-1") .await .unwrap(); - assert!(unread.iter().any(|msg| msg.content == "background update")); + assert!( + unread.iter().all(|msg| msg.content != "background update"), + "suppressed acquire must not write mailbox" + ); session.stop(); } From 2e3b9303028e5f0fd0b5da8c26b2302b1bdbad05 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:36:45 +0800 Subject: [PATCH 11/14] feat(team): use lifecycle leases for shutdown --- crates/aionui-team/src/mcp/server.rs | 16 +-- crates/aionui-team/src/scheduler/actions.rs | 12 +- crates/aionui-team/src/service.rs | 17 +++ crates/aionui-team/src/session.rs | 119 ++++++++++++++++++++ 4 files changed, 142 insertions(+), 22 deletions(-) diff --git a/crates/aionui-team/src/mcp/server.rs b/crates/aionui-team/src/mcp/server.rs index 3acf231a5..07301b406 100644 --- a/crates/aionui-team/src/mcp/server.rs +++ b/crates/aionui-team/src/mcp/server.rs @@ -740,21 +740,7 @@ async fn exec_shutdown_agent( .upgrade() .ok_or_else(|| "Team service not available; cannot wake shutdown target".to_string())?; service - .require_active_team_run_for_team_work(team_id) - .await - .map_err(|e| e.to_string())?; - - let action = crate::scheduler::SchedulerAction::ShutdownAgent { - slot_id: target_slot_id.clone(), - reason: input.reason, - }; - scheduler - .execute_action(caller_slot_id, &action) - .await - .map_err(|e| e.to_string())?; - - service - .wake_agent_for_team_work(team_id, &target_slot_id, TeamWakeSource::McpShutdownRequest, None) + .shutdown_agent_in_session(team_id, caller_slot_id, &target_slot_id, input.reason) .await .map_err(|e| e.to_string())?; diff --git a/crates/aionui-team/src/scheduler/actions.rs b/crates/aionui-team/src/scheduler/actions.rs index 6bc173ca6..edd540991 100644 --- a/crates/aionui-team/src/scheduler/actions.rs +++ b/crates/aionui-team/src/scheduler/actions.rs @@ -2,7 +2,7 @@ use tracing::debug; use super::TeammateManager; use crate::error::TeamError; -use crate::types::{MailboxMessageType, TeammateRole}; +use crate::types::{MailboxMessage, MailboxMessageType, TeammateRole}; #[derive(Debug, Clone, PartialEq)] pub enum SchedulerAction { @@ -102,7 +102,7 @@ impl TeammateManager { Ok(None) } SchedulerAction::ShutdownAgent { slot_id, reason } => { - self.handle_shutdown_agent(from_slot_id, slot_id, reason.as_deref()) + self.request_shutdown_agent(from_slot_id, slot_id, reason.as_deref()) .await?; Ok(None) } @@ -169,12 +169,12 @@ impl TeammateManager { self.mark_idle(from_slot_id, summary).await } - async fn handle_shutdown_agent( + pub async fn request_shutdown_agent( &self, from_slot_id: &str, target_slot_id: &str, reason: Option<&str>, - ) -> Result<(), TeamError> { + ) -> Result { let from_role = { let slots = self.slots.lock().await; let slot = slots @@ -206,9 +206,7 @@ impl TeammateManager { reason.unwrap_or("shutdown requested"), None, ) - .await?; - - Ok(()) + .await } async fn handle_rename_agent(&self, slot_id: &str, new_name: &str) -> Result<(), TeamError> { diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index 6999c3eaa..233fe83dd 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -913,6 +913,23 @@ impl TeamSessionService { .await } + pub async fn shutdown_agent_in_session( + &self, + team_id: &str, + caller_slot_id: &str, + target_slot_id: &str, + reason: Option, + ) -> Result<(), TeamError> { + let session = { + let entry = self + .sessions + .get(team_id) + .ok_or_else(|| TeamError::SessionNotFound(team_id.into()))?; + Arc::clone(&entry.session) + }; + session.shutdown_agent(caller_slot_id, target_slot_id, reason).await + } + pub(crate) fn notify_reserved_wake_for_team_work( &self, team_id: &str, diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index 4c20372ee..608d6773b 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -562,6 +562,66 @@ impl TeamSession { Ok(()) } + pub(crate) async fn shutdown_agent( + &self, + caller_slot_id: &str, + target_slot_id: &str, + reason: Option, + ) -> Result<(), TeamError> { + let caller = self.scheduler.get_agent(caller_slot_id).await?; + if caller.role != TeammateRole::Lead { + return Err(TeamError::LeaderOnly("team_shutdown_agent".into())); + } + let target = self.scheduler.get_agent(target_slot_id).await?; + if target.role == TeammateRole::Lead { + return Err(TeamError::InvalidRequest("cannot shutdown the team lead".into())); + } + + let outcome = self + .team_run_manager + .acquire_run_scoped_wake( + target_slot_id, + target_role_for(target.role), + TeamWakeSource::McpShutdownRequest, + ) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Err(TeamError::InvalidRequest("shutdown wake was suppressed".into())); + }; + + let shutdown_message = match self + .scheduler + .request_shutdown_agent(caller_slot_id, target_slot_id, reason.as_deref()) + .await + { + Ok(message) => message, + Err(err) => { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "shutdown_scheduler_action_failed") + .await; + return Err(err); + } + }; + + if shutdown_message.to_agent_id != target_slot_id { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "shutdown_mailbox_target_mismatch") + .await; + return Err(TeamError::InvalidRequest(format!( + "shutdown mailbox target mismatch: expected {target_slot_id}, got {}", + shutdown_message.to_agent_id + ))); + } + + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(shutdown_message.id.clone())) + .await?; + self.notify_reserved_wake_for_team_work(target_slot_id, lease.role.clone(), lease.wake_source); + Ok(()) + } + pub(crate) async fn wake_agent_for_team_work( &self, slot_id: &str, @@ -2294,6 +2354,65 @@ mod tests { session.stop(); } + #[tokio::test] + async fn shutdown_without_active_run_does_not_write_mailbox() { + let session = start_session().await; + + let err = session + .shutdown_agent("lead-1", "worker-1", Some("done".into())) + .await + .expect_err("shutdown is run-scoped"); + + assert!(matches!( + err, + TeamError::InvalidRequest(message) + if message == "no active team run for run-scoped wake" + )); + let unread = session.mailbox().peek_unread("t1", "worker-1").await.unwrap(); + assert!(unread.is_empty()); + session.stop(); + } + + #[tokio::test] + async fn shutdown_busy_target_is_accepted_as_lifecycle_wake() { + let session = start_session().await; + let ack = session.send_message("start", None).await.unwrap(); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("lead-1", TeamRunTargetRole::Lead, "c1") + .await + .unwrap(); + session + .team_run_manager() + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id, + slot_id: "lead-1".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "c1".into(), + turn_id: "turn-lead".into(), + }, + ) + .await; + + session + .shutdown_agent("lead-1", "worker-1", Some("done".into())) + .await + .expect("shutdown lifecycle wake should be accepted"); + + let unread = session.mailbox().peek_unread("t1", "worker-1").await.unwrap(); + assert_eq!(unread.len(), 1); + assert_eq!(unread[0].msg_type, MailboxMessageType::ShutdownRequest); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") + .await + .expect("shutdown wake should be pending"); + assert_eq!(reservation.wake_source, TeamWakeSource::McpShutdownRequest); + session.stop(); + } + #[tokio::test] async fn add_and_remove_agent() { let session = start_session().await; From 505eeefad9dee84f50f20da470735ce31a120942 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:41:03 +0800 Subject: [PATCH 12/14] feat(team): acquire spawn welcome lease before persistence --- crates/aionui-team/src/provisioning.rs | 9 +- .../aionui-team/src/service/spawn_support.rs | 5 +- crates/aionui-team/src/session.rs | 65 ++++++++--- .../tests/session_service_integration.rs | 106 ++++++++++++++++++ 4 files changed, 166 insertions(+), 19 deletions(-) diff --git a/crates/aionui-team/src/provisioning.rs b/crates/aionui-team/src/provisioning.rs index b1c71fef9..401a5c0cf 100644 --- a/crates/aionui-team/src/provisioning.rs +++ b/crates/aionui-team/src/provisioning.rs @@ -36,6 +36,7 @@ struct ProvisionedConversation { struct NewAgentProvisioning { user_id: String, team_id: String, + slot_id: String, name: String, role: TeammateRole, backend: String, @@ -223,6 +224,7 @@ impl TeamAgentProvisioner { .provision_new_agent(NewAgentProvisioning { user_id: user_id.to_owned(), team_id: team.id.clone(), + slot_id: generate_id(), name: req.name, role, backend: req.backend, @@ -240,6 +242,7 @@ impl TeamAgentProvisioner { &self, user_id: &str, team_id: &str, + slot_id: String, name: String, backend: String, model: String, @@ -256,6 +259,7 @@ impl TeamAgentProvisioner { .provision_new_agent(NewAgentProvisioning { user_id: user_id.to_owned(), team_id: team_id.to_owned(), + slot_id, name, role: TeammateRole::Teammate, backend, @@ -332,12 +336,11 @@ impl TeamAgentProvisioner { } async fn provision_new_agent(&self, input: NewAgentProvisioning) -> Result { - let slot_id = generate_id(); let conversation = self .create_or_adopt_conversation( &input.user_id, &input.team_id, - &slot_id, + &input.slot_id, input.role, &input.name, &input.backend, @@ -348,7 +351,7 @@ impl TeamAgentProvisioner { ) .await?; Ok(TeamAgent { - slot_id, + slot_id: input.slot_id, name: input.name, role: input.role, conversation_id: conversation.conversation_id, diff --git a/crates/aionui-team/src/service/spawn_support.rs b/crates/aionui-team/src/service/spawn_support.rs index 08c8f0f94..93c526d37 100644 --- a/crates/aionui-team/src/service/spawn_support.rs +++ b/crates/aionui-team/src/service/spawn_support.rs @@ -189,7 +189,6 @@ impl TeamSessionService { caller_slot_id: &str, req: crate::session::SpawnAgentRequest, ) -> Result { - self.require_active_team_run_for_team_work(team_id).await?; let entry = self .sessions .get(team_id) @@ -218,6 +217,7 @@ impl TeamSessionService { &self, team_id: &str, user_id: &str, + slot_id: String, name: String, backend: String, model: String, @@ -231,7 +231,7 @@ impl TeamSessionService { let _guard = lock.lock().await; self.provisioner() - .persist_spawned_agent(user_id, team_id, name, backend, model, custom_agent_id) + .persist_spawned_agent(user_id, team_id, slot_id, name, backend, model, custom_agent_id) .await } } @@ -300,6 +300,7 @@ mod tests { .persist_spawned_agent( &created.id, "user1", + "spawn-slot-1".into(), "Spawned".into(), "acp".into(), "claude".into(), diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index 608d6773b..e085cf9b1 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Weak}; use aionui_ai_agent::IWorkerTaskManager; use aionui_api_types::{TeamRunAckResponse, TeamRunTargetRole}; -use aionui_common::AgentKillReason; +use aionui_common::{AgentKillReason, generate_id}; use aionui_db::ITeamRepository; use aionui_realtime::EventBroadcaster; use tracing::{info, warn}; @@ -1105,16 +1105,40 @@ impl TeamSession { .await .unwrap_or_else(|| caller.model.clone()), }; - let new_agent = service + let new_slot_id = generate_id(); + let outcome = self + .team_run_manager + .acquire_run_scoped_wake( + &new_slot_id, + TeamRunTargetRole::Teammate, + TeamWakeSource::SpawnWelcome, + ) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Err(TeamError::InvalidRequest("spawn welcome wake was suppressed".into())); + }; + + let new_agent = match service .persist_spawned_agent( &self.team.id, &self.user_id, + new_slot_id.clone(), requested_name, backend, model, req.custom_agent_id.clone(), ) - .await?; + .await + { + Ok(agent) => agent, + Err(err) => { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "spawn_persistence_failed") + .await; + return Err(err); + } + }; // Step 5: attach to the in-memory scheduler so wake-from-lead finds // the new slot immediately. @@ -1123,7 +1147,7 @@ impl TeamSession { // Step 6: welcome message. The mailbox write is the source of truth — // if the wake never fires (e.g. warmup raced), the next caller-triggered // wake will still drain this entry. - let welcome_message = self + let welcome_message = match self .mailbox .write( &self.team.id, @@ -1133,21 +1157,34 @@ impl TeamSession { "You have been spawned as a teammate. Read your mailbox and wait for instructions.", None, ) - .await?; + .await + { + Ok(message) => message, + Err(err) => { + if let Some(service) = self.service.upgrade() { + let _ = service + .remove_agent(&self.user_id, &self.team.id, &new_agent.slot_id) + .await; + } else { + let _ = self.scheduler.remove_agent(&new_agent.slot_id).await; + } + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "spawn_welcome_mailbox_write_failed") + .await; + return Err(err); + } + }; - let spawn_welcome_role = self - .reserve_wake_for_team_work( - &new_agent.slot_id, - TeamWakeSource::SpawnWelcome, - Some(welcome_message.id), - ) - .await? - .ok_or_else(|| TeamError::InvalidRequest("spawn welcome wake was suppressed".into()))?; + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(welcome_message.id.clone())) + .await?; + let spawn_welcome_role = lease.role.clone(); info!( team_id = %self.team.id, slot_id = %new_agent.slot_id, target_role = ?spawn_welcome_role, - wake_source = %TeamWakeSource::SpawnWelcome, + wake_source = %lease.wake_source, "spawn welcome wake reserved before runtime attach" ); diff --git a/crates/aionui-team/tests/session_service_integration.rs b/crates/aionui-team/tests/session_service_integration.rs index a4eb1c0ff..e8175f813 100644 --- a/crates/aionui-team/tests/session_service_integration.rs +++ b/crates/aionui-team/tests/session_service_integration.rs @@ -570,6 +570,8 @@ struct FullMockTeamRepo { inner: MockTeamRepo, teams: std::sync::Mutex>, fail_workspace_update: std::sync::Mutex, + fail_agent_update: std::sync::Mutex, + fail_message_writes: std::sync::Mutex, } impl FullMockTeamRepo { @@ -578,12 +580,22 @@ impl FullMockTeamRepo { inner: MockTeamRepo::new(), teams: std::sync::Mutex::new(Vec::new()), fail_workspace_update: std::sync::Mutex::new(false), + fail_agent_update: std::sync::Mutex::new(false), + fail_message_writes: std::sync::Mutex::new(false), } } fn fail_workspace_update(&self) { *self.fail_workspace_update.lock().unwrap() = true; } + + fn fail_agent_updates(&self) { + *self.fail_agent_update.lock().unwrap() = true; + } + + fn fail_message_writes(&self) { + *self.fail_message_writes.lock().unwrap() = true; + } } #[async_trait::async_trait] @@ -612,6 +624,9 @@ impl ITeamRepository for FullMockTeamRepo { if params.workspace.is_some() && *self.fail_workspace_update.lock().unwrap() { return Err(DbError::Init("forced workspace writeback failure".into())); } + if params.agents.is_some() && *self.fail_agent_update.lock().unwrap() { + return Err(DbError::Init("forced agent update failure".into())); + } let mut teams = self.teams.lock().unwrap(); let team = teams .iter_mut() @@ -638,6 +653,9 @@ impl ITeamRepository for FullMockTeamRepo { } async fn write_message(&self, row: &aionui_db::models::MailboxMessageRow) -> Result<(), DbError> { + if *self.fail_message_writes.lock().unwrap() { + return Err(DbError::Init("forced mailbox write failure".into())); + } self.inner.write_message(row).await } async fn read_unread_and_mark( @@ -2341,6 +2359,94 @@ async fn spawn_agent_in_session_rejects_without_active_team_run_before_persistin ); } +#[tokio::test] +async fn spawn_agent_in_session_aborts_lease_when_persistence_fails() { + let (svc, team_repo, _, _) = setup_with_factory_metadata_team_repo_and_conversation_repo( + success_factory(), + Arc::new(StubAgentMetadataRepo::empty()), + ); + let created = svc + .create_team( + "user1", + CreateTeamRequest { + name: "Alpha".into(), + agents: two_agent_input(), + workspace: None, + }, + ) + .await + .expect("create team"); + svc.ensure_session("user1", &created.id).await.unwrap(); + svc.send_message("user1", &created.id, "start active run", None) + .await + .expect("active run"); + team_repo.fail_agent_updates(); + + let lead_slot_id = created.lead_agent_id.clone().unwrap(); + let req = SpawnAgentRequest { + name: "Helper".into(), + agent_type: Some("claude".into()), + custom_agent_id: None, + model: Some("claude-sonnet-4".into()), + }; + + let err = svc + .spawn_agent_in_session(&created.id, &lead_slot_id, req) + .await + .expect_err("forced agent persistence failure should fail spawn"); + assert!(err.to_string().contains("forced agent update failure")); + + let after = svc.get_team("user1", &created.id).await.unwrap(); + assert!( + after.agents.iter().all(|agent| agent.name != "Helper"), + "failed spawn must not persist helper after aborted spawn lease" + ); +} + +#[tokio::test] +async fn spawn_agent_in_session_compensates_when_welcome_mailbox_write_fails() { + let (svc, team_repo, _, _) = setup_with_factory_metadata_team_repo_and_conversation_repo( + success_factory(), + Arc::new(StubAgentMetadataRepo::empty()), + ); + let created = svc + .create_team( + "user1", + CreateTeamRequest { + name: "Alpha".into(), + agents: two_agent_input(), + workspace: None, + }, + ) + .await + .expect("create team"); + svc.ensure_session("user1", &created.id).await.unwrap(); + svc.send_message("user1", &created.id, "start active run", None) + .await + .expect("active run"); + team_repo.fail_message_writes(); + + let lead_slot_id = created.lead_agent_id.clone().unwrap(); + let req = SpawnAgentRequest { + name: "Helper".into(), + agent_type: Some("claude".into()), + custom_agent_id: None, + model: Some("claude-sonnet-4".into()), + }; + + let err = svc + .spawn_agent_in_session(&created.id, &lead_slot_id, req) + .await + .expect_err("welcome mailbox write failure should fail spawn"); + assert!(err.to_string().contains("forced mailbox write failure")); + + let after = svc.get_team("user1", &created.id).await.unwrap(); + assert!( + after.agents.iter().all(|agent| agent.name != "Helper"), + "compensation must remove persisted helper after welcome write failure" + ); +} + #[tokio::test] async fn es2_ensure_session_is_idempotent() { let svc = setup(); From e486d03c017214780ee1a91e70dc9a45a32eb27f Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 00:47:48 +0800 Subject: [PATCH 13/14] feat(team): lease recovery and scheduler wakes --- crates/aionui-team/src/event_loop.rs | 2 +- crates/aionui-team/src/guide/server.rs | 16 ++- crates/aionui-team/src/provisioning.rs | 41 +++---- crates/aionui-team/src/service.rs | 21 +--- .../aionui-team/src/service/spawn_support.rs | 37 +++--- crates/aionui-team/src/session.rs | 112 +++++++++++------- crates/aionui-team/src/team_run.rs | 90 ++++---------- 7 files changed, 149 insertions(+), 170 deletions(-) diff --git a/crates/aionui-team/src/event_loop.rs b/crates/aionui-team/src/event_loop.rs index 5b24c5e4c..d8f04d677 100644 --- a/crates/aionui-team/src/event_loop.rs +++ b/crates/aionui-team/src/event_loop.rs @@ -402,7 +402,7 @@ async fn finalize_turn(ctx: &AgentLoopContext, turn: TurnExecution, input: &crat if wake_target != ctx.slot_id && let Err(e) = ctx .session - .wake_agent_for_team_work(&wake_target, TeamWakeSource::IdleNotification, None) + .scheduler_wake_agent_for_team_work(&wake_target, TeamWakeSource::IdleNotification) .await { warn!( diff --git a/crates/aionui-team/src/guide/server.rs b/crates/aionui-team/src/guide/server.rs index 29a9a9550..a7bff29d1 100644 --- a/crates/aionui-team/src/guide/server.rs +++ b/crates/aionui-team/src/guide/server.rs @@ -179,7 +179,7 @@ fn build_create_team_handoff_next_step(summary: &str) -> String { const NO_ACTIVE_TEAM_RUN_FOR_RUN_SCOPED_WAKE: &str = "no active team run for run-scoped wake"; const GUIDE_NO_ACTIVE_TEAM_RUN_HANDOFF_ERROR: &str = - "Team was created, but no TeamRun is active yet. End this solo turn and continue from the Team page."; + "Team was created, but no TeamRun is active yet. Open the team chat and continue from there."; fn is_run_scoped_guide_team_tool(tool_name: &str) -> bool { matches!( @@ -493,11 +493,23 @@ mod tests { assert_eq!( error, - "Team was created, but no TeamRun is active yet. End this solo turn and continue from the Team page." + "Team was created, but no TeamRun is active yet. Open the team chat and continue from there." ); assert!(!error.contains("no active team run for run-scoped wake")); } + #[test] + fn guide_handoff_guard_is_not_a_correctness_api() { + assert!(is_run_scoped_guide_team_tool("team_send_message")); + let response = guide_no_active_team_run_handoff_response(); + let text = serde_json::to_string(&response).unwrap(); + assert!(text.contains("Open the team chat")); + assert!( + !text.contains("correctness"), + "guide handoff text must stay user-facing and not document concurrency guarantees" + ); + } + #[tokio::test] async fn start_returns_positive_port_and_token() { let server = GuideMcpServer::start().await.expect("start should succeed"); diff --git a/crates/aionui-team/src/provisioning.rs b/crates/aionui-team/src/provisioning.rs index 401a5c0cf..54a8bd876 100644 --- a/crates/aionui-team/src/provisioning.rs +++ b/crates/aionui-team/src/provisioning.rs @@ -45,6 +45,16 @@ struct NewAgentProvisioning { workspace: Option, } +pub(crate) struct PersistSpawnedAgentRequest { + pub user_id: String, + pub team_id: String, + pub slot_id: String, + pub name: String, + pub backend: String, + pub model: String, + pub custom_agent_id: Option, +} + pub struct TeamConversationCreateRequest { pub user_id: String, pub agent_type: AgentType, @@ -238,38 +248,29 @@ impl TeamAgentProvisioner { Ok(agent) } - pub(crate) async fn persist_spawned_agent( - &self, - user_id: &str, - team_id: &str, - slot_id: String, - name: String, - backend: String, - model: String, - custom_agent_id: Option, - ) -> Result { + pub(crate) async fn persist_spawned_agent(&self, req: PersistSpawnedAgentRequest) -> Result { let row = self .repo - .get_team(team_id) + .get_team(&req.team_id) .await? - .ok_or_else(|| TeamError::TeamNotFound(team_id.into()))?; + .ok_or_else(|| TeamError::TeamNotFound(req.team_id.clone()))?; let mut team = Team::from_row(&row)?; let workspace = self.workspace_resolver().resolve_for_new_agent(&row, &team).await?; let agent = self .provision_new_agent(NewAgentProvisioning { - user_id: user_id.to_owned(), - team_id: team_id.to_owned(), - slot_id, - name, + user_id: req.user_id, + team_id: req.team_id.clone(), + slot_id: req.slot_id, + name: req.name, role: TeammateRole::Teammate, - backend, - model, - custom_agent_id, + backend: req.backend, + model: req.model, + custom_agent_id: req.custom_agent_id, workspace: Some(workspace), }) .await?; team.agents.push(agent.clone()); - self.persist_agents(team_id, &team.agents).await?; + self.persist_agents(&req.team_id, &team.agents).await?; Ok(agent) } diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index 233fe83dd..acadb27ea 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -877,23 +877,6 @@ impl TeamSessionService { Ok(()) } - pub(crate) async fn wake_agent_for_team_work( - &self, - team_id: &str, - slot_id: &str, - source: TeamWakeSource, - trigger_message_id: Option, - ) -> Result<(), TeamError> { - let entry = self - .sessions - .get(team_id) - .ok_or_else(|| TeamError::SessionNotFound(team_id.into()))?; - entry - .session - .wake_agent_for_team_work(slot_id, source, trigger_message_id) - .await - } - pub async fn send_agent_message_from_agent( &self, team_id: &str, @@ -952,6 +935,10 @@ impl TeamSessionService { .notify_reserved_wake_for_team_work(slot_id, target_role, source); } + /// Friendly pre-check used by Guide MCP to return handoff copy before invoking + /// run-scoped team tools. This is not a concurrency guarantee; any operation + /// that writes mailbox, projection, scheduler, spawn, shutdown, or wake state + /// must still acquire a TeamRun operation lease in TeamSession/TeamRunManager. pub(crate) async fn require_active_team_run_for_team_work(&self, team_id: &str) -> Result<(), TeamError> { let entry = self .sessions diff --git a/crates/aionui-team/src/service/spawn_support.rs b/crates/aionui-team/src/service/spawn_support.rs index 93c526d37..f9135734d 100644 --- a/crates/aionui-team/src/service/spawn_support.rs +++ b/crates/aionui-team/src/service/spawn_support.rs @@ -3,6 +3,8 @@ use aionui_api_types::BehaviorPolicy; use aionui_common::AgentType; use aionui_common::constants::{TEAM_CAPABLE_BACKENDS, has_mcp_capability}; +use crate::provisioning::PersistSpawnedAgentRequest; + /// Known ACP vendor labels. Kept in lockstep with the `agent_metadata` /// seed in `005_agent_metadata.sql` — a caller hitting an unknown /// vendor should trigger a schema drift discussion, not silently fall @@ -213,26 +215,15 @@ impl TeamSessionService { /// The lock is *not* held across the process warmup step — callers /// (`TeamSession::spawn_agent`) wire that up separately so a slow /// `warmup` never stalls other spawns against the same team. - pub(crate) async fn persist_spawned_agent( - &self, - team_id: &str, - user_id: &str, - slot_id: String, - name: String, - backend: String, - model: String, - custom_agent_id: Option, - ) -> Result { + pub(crate) async fn persist_spawned_agent(&self, req: PersistSpawnedAgentRequest) -> Result { let lock = self .add_agent_locks - .entry(team_id.to_owned()) + .entry(req.team_id.clone()) .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) .clone(); let _guard = lock.lock().await; - self.provisioner() - .persist_spawned_agent(user_id, team_id, slot_id, name, backend, model, custom_agent_id) - .await + self.provisioner().persist_spawned_agent(req).await } } @@ -297,15 +288,15 @@ mod tests { force_team_workspace(&team_repo, &created.id, "").await; let spawned = svc - .persist_spawned_agent( - &created.id, - "user1", - "spawn-slot-1".into(), - "Spawned".into(), - "acp".into(), - "claude".into(), - None, - ) + .persist_spawned_agent(PersistSpawnedAgentRequest { + team_id: created.id.clone(), + user_id: "user1".into(), + slot_id: "spawn-slot-1".into(), + name: "Spawned".into(), + backend: "acp".into(), + model: "claude".into(), + custom_agent_id: None, + }) .await .unwrap(); diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index e085cf9b1..77d957128 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -18,10 +18,13 @@ use crate::message_projection::{ }; use crate::ports::{AgentTurnCancellationPort, AgentTurnExecutionPort}; use crate::prompts::{build_lead_prompt, build_teammate_prompt, build_wake_payload}; +use crate::provisioning::PersistSpawnedAgentRequest; use crate::scheduler::{TeammateManager, normalize_name}; use crate::service::TeamSessionService; use crate::task_board::TaskBoard; -use crate::team_run::{ChildCancelTarget, TeamRunManager, TeamRunWakeAcquireOutcome, WakeRecordDecision, target_role_for}; +#[cfg(test)] +use crate::team_run::WakeRecordDecision; +use crate::team_run::{ChildCancelTarget, TeamRunManager, TeamRunWakeAcquireOutcome, target_role_for}; use crate::types::{MailboxMessageType, Team, TeamAgent, TeammateRole, TeammateStatus}; use crate::wake::TeamWakeSource; @@ -622,6 +625,7 @@ impl TeamSession { Ok(()) } + #[cfg(test)] pub(crate) async fn wake_agent_for_team_work( &self, slot_id: &str, @@ -650,6 +654,7 @@ impl TeamSession { Ok(()) } + #[cfg(test)] pub(crate) async fn reserve_wake_for_team_work( &self, slot_id: &str, @@ -692,6 +697,28 @@ impl TeamSession { ); } + pub(crate) async fn scheduler_wake_agent_for_team_work( + &self, + slot_id: &str, + source: TeamWakeSource, + ) -> Result<(), TeamError> { + let agent = self.scheduler.get_agent(slot_id).await?; + let target_role = target_role_for(agent.role); + let outcome = self + .team_run_manager + .acquire_scheduler_wake(slot_id, target_role.clone(), source) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Ok(()); + }; + + self.team_run_manager + .commit_operation_lease(&lease.lease_id, None) + .await?; + self.notify_reserved_wake_for_team_work(slot_id, target_role, source); + Ok(()) + } + /// Mirror each non-user mailbox row into the target agent's conversation /// as a left bubble so the UI shows "who said what" when the user opens /// an agent's chat panel. @@ -940,8 +967,7 @@ impl TeamSession { async fn notify_leader_child_interrupted(&self, slot_id: &str, reason: Option) -> Result<(), TeamError> { if let Some(lead_slot_id) = self.scheduler.find_lead_slot_id().await { let content = reason.unwrap_or_else(|| format!("Agent {slot_id} was interrupted by the user.")); - let mailbox_message = self - .mailbox + self.mailbox .write( &self.team.id, &lead_slot_id, @@ -951,12 +977,8 @@ impl TeamSession { Some("Interrupted by user"), ) .await?; - self.wake_agent_for_team_work( - &lead_slot_id, - TeamWakeSource::InterruptedNotification, - Some(mailbox_message.id), - ) - .await?; + self.wake_leader_after_recovery_message(slot_id, TeamWakeSource::InterruptedNotification) + .await?; } Ok(()) } @@ -970,8 +992,7 @@ impl TeamSession { return Err(TeamError::AgentNotFound("lead".into())); }; let content = format!("Spawned teammate {failed_slot_id} failed to attach its runtime. Error: {error}"); - let mailbox_message = self - .mailbox + self.mailbox .write( &self.team.id, &lead_slot_id, @@ -981,12 +1002,8 @@ impl TeamSession { None, ) .await?; - self.wake_agent_for_team_work( - &lead_slot_id, - TeamWakeSource::SpawnAttachFailure, - Some(mailbox_message.id), - ) - .await + self.wake_leader_after_recovery_message(failed_slot_id, TeamWakeSource::SpawnAttachFailure) + .await } pub(crate) async fn wake_leader_after_recovery_message( @@ -997,17 +1014,34 @@ impl TeamSession { let Some(lead_slot_id) = self.scheduler.find_lead_slot_id().await else { return Err(TeamError::AgentNotFound("lead".into())); }; - if self.team_run_manager.active_run_id().await.is_some() { - return self.wake_agent_for_team_work(&lead_slot_id, source, None).await; - } - info!( - team_id = %self.team.id, - slot_id = %lead_slot_id, - source_slot_id, - wake_source = %source, - wake_policy = "deferred_mailbox_only", - "leader recovery message deferred because no active team run exists" - ); + let outcome = match self + .team_run_manager + .acquire_run_scoped_wake(&lead_slot_id, TeamRunTargetRole::Lead, source) + .await + { + Ok(outcome) => outcome, + Err(TeamError::InvalidRequest(message)) if message == "no active team run for run-scoped wake" => { + info!( + team_id = %self.team.id, + slot_id = %lead_slot_id, + source_slot_id, + wake_source = %source, + wake_policy = "deferred_mailbox_only", + "leader recovery message deferred because no active team run exists" + ); + return Ok(()); + } + Err(err) => return Err(err), + }; + + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Ok(()); + }; + + self.team_run_manager + .commit_operation_lease(&lease.lease_id, None) + .await?; + self.notify_reserved_wake_for_team_work(&lead_slot_id, TeamRunTargetRole::Lead, source); Ok(()) } @@ -1108,26 +1142,22 @@ impl TeamSession { let new_slot_id = generate_id(); let outcome = self .team_run_manager - .acquire_run_scoped_wake( - &new_slot_id, - TeamRunTargetRole::Teammate, - TeamWakeSource::SpawnWelcome, - ) + .acquire_run_scoped_wake(&new_slot_id, TeamRunTargetRole::Teammate, TeamWakeSource::SpawnWelcome) .await?; let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { return Err(TeamError::InvalidRequest("spawn welcome wake was suppressed".into())); }; let new_agent = match service - .persist_spawned_agent( - &self.team.id, - &self.user_id, - new_slot_id.clone(), - requested_name, + .persist_spawned_agent(PersistSpawnedAgentRequest { + team_id: self.team.id.clone(), + user_id: self.user_id.clone(), + slot_id: new_slot_id.clone(), + name: requested_name, backend, model, - req.custom_agent_id.clone(), - ) + custom_agent_id: req.custom_agent_id.clone(), + }) .await { Ok(agent) => agent, @@ -2250,7 +2280,7 @@ mod tests { } #[tokio::test] - async fn crash_notification_without_active_run_is_deferred_mailbox_only() { + async fn recovery_wake_without_active_run_is_deferred_without_active_run_snapshot_branch() { let session = start_session().await; session diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index cfe174087..b0494737a 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -14,7 +14,9 @@ use crate::events::{ TEAM_RUN_ACCEPTED_EVENT, TEAM_RUN_CANCELLED_EVENT, TEAM_RUN_COMPLETED_EVENT, TEAM_RUN_FAILED_EVENT, TEAM_RUN_STARTED_EVENT, TEAM_RUN_UPDATED_EVENT, TeamEventEmitter, }; -use crate::slot_wake_gate::{SlotWakeGate, WakeGateDecision}; +use crate::slot_wake_gate::SlotWakeGate; +#[cfg(test)] +use crate::slot_wake_gate::WakeGateDecision; use crate::types::TeammateRole; use crate::wake::TeamWakeSource; @@ -66,6 +68,7 @@ pub enum ChildCancelTarget { Starting(StartingChildReservation), } +#[cfg(test)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum WakeRecordDecision { Recorded, @@ -180,13 +183,12 @@ impl TeamRunRecord { } fn has_spawn_welcome_for_slot(&self, slot_id: &str) -> bool { - self.pending_wakes.get(slot_id).is_some_and(|wakes| { - wakes - .iter() - .any(|wake| wake.source == TeamWakeSource::SpawnWelcome) - }) || self.active_operation_leases.values().any(|active| { - active.lease.slot_id == slot_id && active.lease.wake_source == TeamWakeSource::SpawnWelcome - }) + self.pending_wakes + .get(slot_id) + .is_some_and(|wakes| wakes.iter().any(|wake| wake.source == TeamWakeSource::SpawnWelcome)) + || self.active_operation_leases.values().any(|active| { + active.lease.slot_id == slot_id && active.lease.wake_source == TeamWakeSource::SpawnWelcome + }) } fn role_for_slot(&self, slot_id: &str) -> Option { @@ -305,12 +307,8 @@ fn new_operation_lease( wake_source, accepted_as_new_run, }; - run.active_operation_leases.insert( - lease.lease_id.clone(), - ActiveOperationLease { - lease: lease.clone(), - }, - ); + run.active_operation_leases + .insert(lease.lease_id.clone(), ActiveOperationLease { lease: lease.clone() }); lease } @@ -363,12 +361,8 @@ fn acquire_policy( } TeamWakeSource::McpShutdownRequest => AcquirePolicyDecision::Accept, TeamWakeSource::IdleNotification | TeamWakeSource::InterruptedNotification => match slot_state { - TeamRunSlotState::Idle if intent == TeamRunWakeIntent::SchedulerWakeTarget => { - AcquirePolicyDecision::Accept - } - TeamRunSlotState::Idle => { - AcquirePolicyDecision::Suppress("background_notification_without_wake_target") - } + TeamRunSlotState::Idle if intent == TeamRunWakeIntent::SchedulerWakeTarget => AcquirePolicyDecision::Accept, + TeamRunSlotState::Idle => AcquirePolicyDecision::Suppress("background_notification_without_wake_target"), TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Paused => { AcquirePolicyDecision::Suppress("background_notification_deduped") } @@ -531,13 +525,7 @@ impl TeamRunManager { slot_wake_gate: SlotWakeGate::default(), active_operation_leases: HashMap::new(), }; - let lease = new_operation_lease( - &mut record, - slot_id, - role.clone(), - TeamWakeSource::UserMessage, - true, - ); + let lease = new_operation_lease(&mut record, slot_id, role.clone(), TeamWakeSource::UserMessage, true); let ack = record.ack(slot_id, role, None); let payload = record.payload(); *guard = Some(record); @@ -678,12 +666,7 @@ impl TeamRunManager { }; let slot_state = run.slot_run_state(slot_id); - let decision = acquire_policy( - source, - slot_state, - run.has_spawn_welcome_for_slot(slot_id), - intent, - ); + let decision = acquire_policy(source, slot_state, run.has_spawn_welcome_for_slot(slot_id), intent); match decision { AcquirePolicyDecision::RejectSlotBusy => Err(TeamError::SlotBusy(slot_id.to_owned())), AcquirePolicyDecision::RejectInvalid(message) => Err(TeamError::InvalidRequest(message.into())), @@ -827,6 +810,7 @@ impl TeamRunManager { .map(|_| ()) } + #[cfg(test)] pub(crate) async fn record_or_suppress_wake( &self, slot_id: &str, @@ -1686,10 +1670,7 @@ mod tests { let completed = manager.maybe_complete().await; assert!(completed.is_none(), "active lease must retain the run"); - assert_eq!( - manager.active_run_id().await.as_deref(), - Some(ack.team_run_id.as_str()) - ); + assert_eq!(manager.active_run_id().await.as_deref(), Some(ack.team_run_id.as_str())); manager .commit_operation_lease(&lease.lease_id, Some("mailbox-1".into())) @@ -1749,11 +1730,7 @@ mod tests { let (manager, _bc) = manager(); let err = manager - .acquire_run_scoped_wake( - "worker", - TeamRunTargetRole::Teammate, - TeamWakeSource::McpSendMessage, - ) + .acquire_run_scoped_wake("worker", TeamRunTargetRole::Teammate, TeamWakeSource::McpSendMessage) .await .expect_err("run-scoped wake must need active run"); @@ -1888,10 +1865,7 @@ mod tests { .await .expect("paused background wake suppresses cleanly"); assert_eq!(outcome, TeamRunWakeAcquireOutcome::Suppressed); - assert_eq!( - manager.active_run_id().await.as_deref(), - Some(ack.team_run_id.as_str()) - ); + assert_eq!(manager.active_run_id().await.as_deref(), Some(ack.team_run_id.as_str())); } #[tokio::test] @@ -1903,11 +1877,7 @@ mod tests { .unwrap(); let first = manager - .acquire_run_scoped_wake( - "new-worker", - TeamRunTargetRole::Teammate, - TeamWakeSource::SpawnWelcome, - ) + .acquire_run_scoped_wake("new-worker", TeamRunTargetRole::Teammate, TeamWakeSource::SpawnWelcome) .await .unwrap(); let TeamRunWakeAcquireOutcome::Accepted(first) = first else { @@ -1919,11 +1889,7 @@ mod tests { .unwrap(); let second = manager - .acquire_run_scoped_wake( - "new-worker", - TeamRunTargetRole::Teammate, - TeamWakeSource::SpawnWelcome, - ) + .acquire_run_scoped_wake("new-worker", TeamRunTargetRole::Teammate, TeamWakeSource::SpawnWelcome) .await .unwrap(); assert_eq!(second, TeamRunWakeAcquireOutcome::Suppressed); @@ -1938,11 +1904,7 @@ mod tests { .unwrap(); let outcome = manager - .acquire_run_scoped_wake( - "worker", - TeamRunTargetRole::Teammate, - TeamWakeSource::IdleNotification, - ) + .acquire_run_scoped_wake("worker", TeamRunTargetRole::Teammate, TeamWakeSource::IdleNotification) .await .expect("generic idle notification should suppress cleanly"); @@ -1958,11 +1920,7 @@ mod tests { .unwrap(); let outcome = manager - .acquire_scheduler_wake( - "worker", - TeamRunTargetRole::Teammate, - TeamWakeSource::IdleNotification, - ) + .acquire_scheduler_wake("worker", TeamRunTargetRole::Teammate, TeamWakeSource::IdleNotification) .await .expect("scheduler-produced wake target should be accepted"); let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { From 25c4eea85566421e19f4919d31416e5c2591ac79 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 10:49:02 +0800 Subject: [PATCH 14/14] chore(team): simplify guide handoff copy --- crates/aionui-team/src/guide/server.rs | 19 ++++++++++++++----- crates/aionui-team/src/prompts/team_guide.rs | 13 +++++++++---- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/crates/aionui-team/src/guide/server.rs b/crates/aionui-team/src/guide/server.rs index a7bff29d1..4770e05c2 100644 --- a/crates/aionui-team/src/guide/server.rs +++ b/crates/aionui-team/src/guide/server.rs @@ -171,9 +171,11 @@ async fn handle_tool_request( fn build_create_team_handoff_next_step(summary: &str) -> String { format!( - "Team was created and the Team page is open. End this solo turn now. \ - Do not call any `team_*` tools from this solo turn. Tell the user to continue from the Team page; \ - their next message there will start the first formal `TeamRun`. Task summary: {summary}" + "Team was created and the UI has switched to the team conversation. End this solo turn now. \ + Do not call any `team_*` tools from this solo turn. Reply to the user only with one short \ + handoff in their language. It should mean: the Team is ready, send the next message, and I will continue from there. \ + Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state. \ + Task summary: {summary}" ) } @@ -435,10 +437,17 @@ mod tests { fn create_team_next_step_tells_solo_agent_to_end_turn() { let next_step = build_create_team_handoff_next_step("Build a research and implementation team"); - assert!(next_step.contains("Team was created and the Team page is open.")); + assert!(next_step.contains("Team was created and the UI has switched to the team conversation.")); assert!(next_step.contains("End this solo turn now.")); assert!(next_step.contains("Do not call any `team_*` tools from this solo turn.")); - assert!(next_step.contains("their next message there will start the first formal `TeamRun`.")); + assert!(next_step.contains( + "Reply to the user only with one short handoff in their language. It should mean: the Team is ready, send the next message, and I will continue from there." + )); + assert!( + next_step.contains( + "Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state." + ) + ); assert!(next_step.contains("Task summary: Build a research and implementation team")); assert!( !next_step.contains("team_spawn_agent"), diff --git a/crates/aionui-team/src/prompts/team_guide.rs b/crates/aionui-team/src/prompts/team_guide.rs index a6f2d86ff..47adfa22f 100644 --- a/crates/aionui-team/src/prompts/team_guide.rs +++ b/crates/aionui-team/src/prompts/team_guide.rs @@ -55,7 +55,7 @@ If case 2 applies, ask at most once whether the user wants to bring in a Team. K | Tester | Write and run tests | {agent_type} | (model from list) | 4. **Output the table as a normal text message and END YOUR TURN.** Do NOT call `aion_create_team` or any other tool (including ask_user) in this turn. Wait for the user to reply in their next message with explicit confirmation (e.g. \"ok\", \"go ahead\", \"确认\") before proceeding. 5. After user confirms → call `aion_create_team`. The summary MUST include both the goal and the confirmed team configuration. (The system automatically sets the correct agent type — you do NOT need to pass agentType.) -6. After `aion_create_team` returns → the Team has been created, the current conversation has been bound as Leader, and the system navigates to the Team page automatically. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output a brief handoff telling the user to continue from the Team page, then END YOUR TURN. The user's next message from the Team page will start the first formal `TeamRun`. +6. After `aion_create_team` returns → the Team has been created and the current conversation has been bound as Leader. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output only one brief user-facing handoff in the user's language. It should mean: the Team is ready, send the next message, and I will continue from there. Then END YOUR TURN. Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state in the user-facing handoff. 7. User declines or wants changes → adjust or proceed solo. Do not mention Team again unless the user asks. ### Tool constraint @@ -128,7 +128,7 @@ If case 2 applies, ask at most once whether the user wants to bring in a Team. K | Tester | Write and run tests | claude | (model from list) |\n\ 4. **Output the table as a normal text message and END YOUR TURN.** Do NOT call `aion_create_team` or any other tool (including ask_user) in this turn. Wait for the user to reply in their next message with explicit confirmation (e.g. \"ok\", \"go ahead\", \"确认\") before proceeding.\n\ 5. After user confirms → call `aion_create_team`. The summary MUST include both the goal and the confirmed team configuration. (The system automatically sets the correct agent type — you do NOT need to pass agentType.)\n\ -6. After `aion_create_team` returns → the Team has been created, the current conversation has been bound as Leader, and the system navigates to the Team page automatically. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output a brief handoff telling the user to continue from the Team page, then END YOUR TURN. The user's next message from the Team page will start the first formal `TeamRun`.\n\ +6. After `aion_create_team` returns → the Team has been created and the current conversation has been bound as Leader. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output only one brief user-facing handoff in the user's language. It should mean: the Team is ready, send the next message, and I will continue from there. Then END YOUR TURN. Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state in the user-facing handoff.\n\ 7. User declines or wants changes → adjust or proceed solo. Do not mention Team again unless the user asks.\n\ \n\ ### Tool constraint\n\ @@ -141,12 +141,17 @@ Before team creation: use **only** `aion_create_team` and `aion_list_models`. Af let prompt = build_team_guide_prompt("claude", None); assert!(prompt.contains( - "After `aion_create_team` returns → the Team has been created, the current conversation has been bound as Leader, and the system navigates to the Team page automatically." + "After `aion_create_team` returns → the Team has been created and the current conversation has been bound as Leader." )); assert!(prompt.contains( "Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn." )); - assert!(prompt.contains("The user's next message from the Team page will start the first formal `TeamRun`.")); + assert!(prompt.contains( + "Output only one brief user-facing handoff in the user's language. It should mean: the Team is ready, send the next message, and I will continue from there." + )); + assert!(prompt.contains( + "Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state in the user-facing handoff." + )); assert!( prompt.contains("After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn.") );