diff --git a/crates/aionui-app/src/commands/cmd_team_guide.rs b/crates/aionui-app/src/commands/cmd_team_guide.rs index 90f699320..1e9dbcfcf 100644 --- a/crates/aionui-app/src/commands/cmd_team_guide.rs +++ b/crates/aionui-app/src/commands/cmd_team_guide.rs @@ -500,12 +500,95 @@ mod tests { assert!(!serialized.contains("conv-secret-123")); } + #[tokio::test] + async fn forward_tool_create_team_object_success_returns_success_text() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/tool")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "teamId": "team-1", + "name": "Dev Team", + "route": "/team/team-1", + "status": "team_created", + "next_step": "Team was created and the Team page is open. End this solo turn now.", + }))) + .mount(&mock_server) + .await; + + let server = guide_server_for_port(mock_server.address().port()); + let result = server + .forward_tool("aion_create_team", &json!({"summary": "redacted"})) + .await; + + assert_ne!(result.is_error, Some(true)); + let text = first_text(&result); + assert!(text.contains("team_created")); + assert!(text.contains("team-1")); + assert!(text.contains("End this solo turn now")); + } + + #[tokio::test] + async fn forward_tool_create_team_malformed_object_remains_unexpected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/tool")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "name": "Dev Team", + "route": "/team/team-1", + "status": "team_created", + "next_step": "Team was created and the Team page is open. End this solo turn now.", + }))) + .mount(&mock_server) + .await; + + let server = guide_server_for_port(mock_server.address().port()); + let result = server + .forward_tool("aion_create_team", &json!({"summary": "redacted"})) + .await; + + assert_eq!(result.is_error, Some(true)); + assert_eq!(first_text(&result), "unexpected local guide tool response"); + assert_eq!( + result.structured_content.as_ref().unwrap()["code"], + "MCP_TOOL_RESPONSE_UNEXPECTED" + ); + } + + #[tokio::test] + async fn forward_tool_create_team_legacy_result_body_remains_unexpected() { + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/tool")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "result": "legacy create-team success", + }))) + .mount(&mock_server) + .await; + + let server = guide_server_for_port(mock_server.address().port()); + let result = server + .forward_tool("aion_create_team", &json!({"summary": "redacted"})) + .await; + + assert_eq!(result.is_error, Some(true)); + assert_eq!(first_text(&result), "unexpected local guide tool response"); + assert_eq!( + result.structured_content.as_ref().unwrap()["code"], + "MCP_TOOL_RESPONSE_UNEXPECTED" + ); + } + #[tokio::test] async fn forward_tool_unexpected_2xx_body_returns_unexpected_without_echoing_body() { let mock_server = MockServer::start().await; Mock::given(method("POST")) .and(path("/tool")) - .respond_with(ResponseTemplate::new(200).set_body_string("unexpected raw body with team-secret-789")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "teamId": "team-secret-789", + "route": "/team/team-secret-789", + "status": "team_created", + "next_step": "Team was created and the Team page is open. End this solo turn now.", + }))) .mount(&mock_server) .await; @@ -520,7 +603,7 @@ mod tests { ); let serialized = serde_json::to_string(&result).unwrap(); assert!(!serialized.contains("team-secret-789")); - assert!(!serialized.contains("unexpected raw body")); + assert!(!serialized.contains("team_created")); } #[tokio::test] @@ -599,8 +682,8 @@ impl GuideServer { match resp.text().await { Ok(text) => { if let Ok(v) = serde_json::from_str::(&text) { - if let Some(result) = v.get("result").and_then(|r| r.as_str()) { - return tool_success(result.to_owned()); + if let Some(result) = parse_tool_success_text(tool_name, &v) { + return tool_success(result); } if v.get("error").is_some() { return tool_error( @@ -694,3 +777,25 @@ fn extract_nested_code(value: &serde_json::Value, path: &[&str]) -> Option None, } } + +fn parse_tool_success_text(tool_name: &str, value: &serde_json::Value) -> Option { + if tool_name == "aion_create_team" { + return is_create_team_success_body(value).then(|| serde_json::to_string(value).ok())?; + } + + if let Some(result) = value.get("result").and_then(|result| result.as_str()) { + return Some(result.to_owned()); + } + + None +} + +fn is_create_team_success_body(value: &serde_json::Value) -> bool { + value.get("status").and_then(|status| status.as_str()) == Some("team_created") + && value.get("teamId").and_then(|team_id| team_id.as_str()).is_some() + && value.get("route").and_then(|route| route.as_str()).is_some() + && value + .get("next_step") + .and_then(|next_step| next_step.as_str()) + .is_some() +} diff --git a/crates/aionui-app/src/router/team_conversation_adapters.rs b/crates/aionui-app/src/router/team_conversation_adapters.rs index 50526cddd..2eb114056 100644 --- a/crates/aionui-app/src/router/team_conversation_adapters.rs +++ b/crates/aionui-app/src/router/team_conversation_adapters.rs @@ -16,6 +16,7 @@ use aionui_team::{ TeamConversationProvisioningPort, TeamError, TeamProjectionMessageStore, }; use async_trait::async_trait; +use tracing::info; pub struct TeamConversationAdapters { conversation_service: ConversationService, @@ -73,18 +74,42 @@ impl AgentTurnExecutionPort for TeamConversationAdapters { > }); - let outcome = self - .conversation_service - .run_agent_turn(ConversationAgentTurnRequest { - user_id: request.user_id, - conversation_id: request.conversation_id, - content: request.content, - files: request.files, - inject_skills: Vec::new(), - on_started, - }) - .await - .map_err(map_conversation_turn_error)?; + let conversation_id = request.conversation_id.clone(); + let outcome = loop { + match self + .conversation_service + .run_agent_turn(ConversationAgentTurnRequest { + user_id: request.user_id.clone(), + conversation_id: conversation_id.clone(), + content: request.content.clone(), + files: request.files.clone(), + inject_skills: Vec::new(), + on_started: on_started.clone(), + }) + .await + { + Ok(outcome) => break outcome, + Err(error) if is_retryable_conversation_busy(&error) => { + info!( + conversation_id = %conversation_id, + team_run_id = ?request.team_run_id, + slot_id = %request.slot_id, + "team conversation turn waiting for active conversation turn to release" + ); + self.conversation_service + .runtime_state() + .wait_until_unclaimed(&conversation_id) + .await; + info!( + conversation_id = %conversation_id, + team_run_id = ?request.team_run_id, + slot_id = %request.slot_id, + "team conversation turn retrying after active conversation turn released" + ); + } + Err(error) => return Err(map_conversation_turn_error(error)), + } + }; Ok(AgentTurnOutcome { conversation_id: outcome.conversation_id, @@ -275,6 +300,10 @@ impl TeamConversationLookupPort for TeamConversationAdapters { } } +fn is_retryable_conversation_busy(error: &ConversationError) -> bool { + matches!(error, ConversationError::Busy { reason } if reason.contains("already running")) +} + fn map_conversation_create_error(error: ConversationError) -> TeamError { match error { ConversationError::WorkspacePathUnavailable { path } => TeamError::WorkspacePathUnavailable(path), diff --git a/crates/aionui-conversation/src/runtime_state.rs b/crates/aionui-conversation/src/runtime_state.rs index f047853d6..a0269916c 100644 --- a/crates/aionui-conversation/src/runtime_state.rs +++ b/crates/aionui-conversation/src/runtime_state.rs @@ -5,6 +5,7 @@ use std::{ use aionui_api_types::{ConversationRuntimeStateKind, ConversationRuntimeSummary}; use aionui_common::ConversationStatus; +use tokio::sync::Notify; use tracing::{info, warn}; use crate::ConversationError; @@ -12,6 +13,7 @@ use crate::ConversationError; #[derive(Debug, Default)] pub struct ConversationRuntimeStateService { state: Mutex, + release_notify: Notify, } #[derive(Debug, Default)] @@ -112,6 +114,16 @@ impl ConversationRuntimeStateService { .and_then(|state| state.active_turns.get(conversation_id).cloned()) } + pub async fn wait_until_unclaimed(&self, conversation_id: &str) { + loop { + let notified = self.release_notify.notified(); + if !self.is_claimed(conversation_id) { + return; + } + notified.await; + } + } + pub fn mark_deleting(&self, conversation_id: &str) -> bool { match self.state.lock() { Ok(mut state) => { @@ -305,6 +317,8 @@ impl ConversationRuntimeStateService { deleting = was_deleting, "conversation runtime turn claim released" ); + drop(state); + self.release_notify.notify_waiters(); was_deleting } Err(_) => { @@ -417,6 +431,39 @@ mod tests { assert!(state.try_claim_turn("conv-1", "turn-2").is_ok()); } + #[tokio::test] + async fn wait_until_unclaimed_completes_after_active_claim_releases() { + let state = Arc::new(ConversationRuntimeStateService::default()); + let mut claim = state + .try_claim_turn("conv-1", "turn-1") + .expect("claim should be created"); + + let waiter = { + let state = state.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + state.wait_until_unclaimed("conv-1").await; + let _ = tx.send(()); + }); + rx + }; + tokio::pin!(waiter); + + assert!( + tokio::time::timeout(std::time::Duration::from_millis(20), &mut waiter) + .await + .is_err(), + "waiter must stay pending while the claim is active" + ); + + let _ = claim.release(); + assert!(!state.is_claimed("conv-1")); + tokio::time::timeout(std::time::Duration::from_secs(1), &mut waiter) + .await + .expect("waiter should finish after release") + .expect("waiter task should send completion"); + } + #[test] fn deleting_rejects_new_turn_claims() { let state = Arc::new(ConversationRuntimeStateService::default()); diff --git a/crates/aionui-team/src/event_loop.rs b/crates/aionui-team/src/event_loop.rs index ce271975d..62558b307 100644 --- a/crates/aionui-team/src/event_loop.rs +++ b/crates/aionui-team/src/event_loop.rs @@ -12,7 +12,8 @@ use tracing::{debug, info, warn}; use crate::mailbox::Mailbox; use crate::ports::{ - AgentTurnExecutionPort, AgentTurnRequest, AgentTurnSource, AgentTurnStarted, AgentTurnStartedCallback, + AgentTurnExecutionError, AgentTurnExecutionPort, AgentTurnRequest, AgentTurnSource, AgentTurnStarted, + AgentTurnStartedCallback, }; use crate::scheduler::TeammateManager; use crate::session::TeamSession; @@ -118,6 +119,10 @@ struct TurnExecution { turn_id: Option, } +fn is_retryable_start_skip(error: &AgentTurnExecutionError) -> bool { + matches!(error, AgentTurnExecutionError::Skipped { reason } if reason.contains("already running")) +} + /// The event loop for one agent slot. Spawned as a tokio task. /// /// Flow: @@ -337,6 +342,13 @@ async fn execute_turn(ctx: &AgentLoopContext, input: &crate::session::WakeInput) { if started_seen.load(Ordering::SeqCst) { ctx.session.team_run_manager().complete_failed().await; + } else if is_retryable_start_skip(&e) { + ctx.session + .team_run_manager() + .retry_child_start_later(&reservation.reservation_id, &e.to_string()) + .await; + let _ = ctx.scheduler.set_status(&ctx.slot_id, TeammateStatus::Idle).await; + return None; } else { ctx.session .team_run_manager() @@ -392,7 +404,7 @@ async fn finalize_turn(ctx: &AgentLoopContext, turn: TurnExecution, input: &crat if wake_target != ctx.slot_id && let Err(e) = ctx .session - .wake_agent_for_team_work(&wake_target, TeamWakeSource::IdleNotification, None) + .scheduler_wake_agent_for_team_work(&wake_target, TeamWakeSource::IdleNotification) .await { warn!( diff --git a/crates/aionui-team/src/guide/server.rs b/crates/aionui-team/src/guide/server.rs index a0a2cfd25..4770e05c2 100644 --- a/crates/aionui-team/src/guide/server.rs +++ b/crates/aionui-team/src/guide/server.rs @@ -169,6 +169,36 @@ async fn handle_tool_request( // Tool implementations // --------------------------------------------------------------------------- +fn build_create_team_handoff_next_step(summary: &str) -> String { + format!( + "Team was created and the UI has switched to the team conversation. End this solo turn now. \ + Do not call any `team_*` tools from this solo turn. Reply to the user only with one short \ + handoff in their language. It should mean: the Team is ready, send the next message, and I will continue from there. \ + Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state. \ + Task summary: {summary}" + ) +} + +const NO_ACTIVE_TEAM_RUN_FOR_RUN_SCOPED_WAKE: &str = "no active team run for run-scoped wake"; +const GUIDE_NO_ACTIVE_TEAM_RUN_HANDOFF_ERROR: &str = + "Team was created, but no TeamRun is active yet. Open the team chat and continue from there."; + +fn is_run_scoped_guide_team_tool(tool_name: &str) -> bool { + matches!( + tool_name, + "team_send_message" + | "team_spawn_agent" + | "team_task_create" + | "team_task_update" + | "team_rename_agent" + | "team_shutdown_agent" + ) +} + +fn guide_no_active_team_run_handoff_response() -> serde_json::Value { + serde_json::json!({ "error": GUIDE_NO_ACTIVE_TEAM_RUN_HANDOFF_ERROR }) +} + async fn exec_create_team( request_body: &serde_json::Value, args: &serde_json::Value, @@ -268,11 +298,7 @@ async fn exec_create_team( "name": team.name, "route": route, "status": "team_created", - "next_step": format!( - "You are now the team Leader. Your team tools (team_spawn_agent, team_send_message, etc.) are now active. \ - Immediately proceed to spawn teammates as planned. Task summary: {}", - params.summary - ) + "next_step": build_create_team_handoff_next_step(¶ms.summary) }) } @@ -318,6 +344,29 @@ async fn exec_team_tool( } }; + if is_run_scoped_guide_team_tool(tool_name) { + match svc.require_active_team_run_for_team_work(&team_id).await { + Ok(()) => {} + Err(crate::TeamError::InvalidRequest(message)) if message == NO_ACTIVE_TEAM_RUN_FOR_RUN_SCOPED_WAKE => { + warn!( + tool = tool_name, + team_id = %team_id, + "Guide HTTP: run-scoped team tool refused because no active TeamRun exists" + ); + return guide_no_active_team_run_handoff_response(); + } + Err(error) => { + warn!( + tool = tool_name, + team_id = %team_id, + error = %error, + "Guide HTTP: active TeamRun check failed before forwarding team tool" + ); + return serde_json::json!({"error": error.to_string()}); + } + } + } + let svc_weak = Arc::downgrade(&svc); let result = crate::mcp::server::dispatch_tool( tool_name, @@ -384,6 +433,92 @@ mod tests { use std::time::Duration; use tokio::time::timeout; + #[test] + fn create_team_next_step_tells_solo_agent_to_end_turn() { + let next_step = build_create_team_handoff_next_step("Build a research and implementation team"); + + assert!(next_step.contains("Team was created and the UI has switched to the team conversation.")); + assert!(next_step.contains("End this solo turn now.")); + assert!(next_step.contains("Do not call any `team_*` tools from this solo turn.")); + assert!(next_step.contains( + "Reply to the user only with one short handoff in their language. It should mean: the Team is ready, send the next message, and I will continue from there." + )); + assert!( + next_step.contains( + "Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state." + ) + ); + assert!(next_step.contains("Task summary: Build a research and implementation team")); + assert!( + !next_step.contains("team_spawn_agent"), + "next_step must not name spawn as an immediately available action" + ); + assert!( + !next_step.contains("team_send_message"), + "next_step must not name send_message as an immediately available action" + ); + assert!( + !next_step.contains("tools are now active"), + "next_step must not claim Team tools are active immediately after creation" + ); + } + + #[test] + fn run_scoped_guide_team_tools_are_classified_for_handoff_guard() { + for tool_name in [ + "team_send_message", + "team_spawn_agent", + "team_task_create", + "team_task_update", + "team_rename_agent", + "team_shutdown_agent", + ] { + assert!( + is_run_scoped_guide_team_tool(tool_name), + "{tool_name} should require an active TeamRun in the Guide forwarding path" + ); + } + + for tool_name in [ + "team_members", + "team_task_list", + "team_list_models", + "team_describe_assistant", + ] { + assert!( + !is_run_scoped_guide_team_tool(tool_name), + "{tool_name} is read-only/catalog-style and should not use the run-scoped handoff guard" + ); + } + } + + #[test] + fn guide_no_active_team_run_handoff_error_is_clear() { + let response = guide_no_active_team_run_handoff_response(); + let error = response + .get("error") + .and_then(serde_json::Value::as_str) + .expect("error string"); + + assert_eq!( + error, + "Team was created, but no TeamRun is active yet. Open the team chat and continue from there." + ); + assert!(!error.contains("no active team run for run-scoped wake")); + } + + #[test] + fn guide_handoff_guard_is_not_a_correctness_api() { + assert!(is_run_scoped_guide_team_tool("team_send_message")); + let response = guide_no_active_team_run_handoff_response(); + let text = serde_json::to_string(&response).unwrap(); + assert!(text.contains("Open the team chat")); + assert!( + !text.contains("correctness"), + "guide handoff text must stay user-facing and not document concurrency guarantees" + ); + } + #[tokio::test] async fn start_returns_positive_port_and_token() { let server = GuideMcpServer::start().await.expect("start should succeed"); diff --git a/crates/aionui-team/src/mcp/server.rs b/crates/aionui-team/src/mcp/server.rs index 59ca23891..1136971df 100644 --- a/crates/aionui-team/src/mcp/server.rs +++ b/crates/aionui-team/src/mcp/server.rs @@ -548,10 +548,6 @@ async fn exec_send_message( let service = service .upgrade() .ok_or_else(|| "Team service not available; cannot wake target".to_string())?; - service - .require_active_team_run_for_team_work(team_id) - .await - .map_err(|e| e.to_string())?; let targets = if resolved_to == "*" { scheduler @@ -765,21 +761,7 @@ async fn exec_shutdown_agent( .upgrade() .ok_or_else(|| "Team service not available; cannot wake shutdown target".to_string())?; service - .require_active_team_run_for_team_work(team_id) - .await - .map_err(|e| e.to_string())?; - - let action = crate::scheduler::SchedulerAction::ShutdownAgent { - slot_id: target_slot_id.clone(), - reason: input.reason, - }; - scheduler - .execute_action(caller_slot_id, &action) - .await - .map_err(|e| e.to_string())?; - - service - .wake_agent_for_team_work(team_id, &target_slot_id, TeamWakeSource::McpShutdownRequest, None) + .shutdown_agent_in_session(team_id, caller_slot_id, &target_slot_id, input.reason) .await .map_err(|e| e.to_string())?; diff --git a/crates/aionui-team/src/prompts/team_guide.rs b/crates/aionui-team/src/prompts/team_guide.rs index 690fd9683..47adfa22f 100644 --- a/crates/aionui-team/src/prompts/team_guide.rs +++ b/crates/aionui-team/src/prompts/team_guide.rs @@ -55,11 +55,11 @@ If case 2 applies, ask at most once whether the user wants to bring in a Team. K | Tester | Write and run tests | {agent_type} | (model from list) | 4. **Output the table as a normal text message and END YOUR TURN.** Do NOT call `aion_create_team` or any other tool (including ask_user) in this turn. Wait for the user to reply in their next message with explicit confirmation (e.g. \"ok\", \"go ahead\", \"确认\") before proceeding. 5. After user confirms → call `aion_create_team`. The summary MUST include both the goal and the confirmed team configuration. (The system automatically sets the correct agent type — you do NOT need to pass agentType.) -6. After `aion_create_team` returns → you ARE now the team Leader. The system navigates to the team page automatically. **Immediately** use `team_spawn_agent` to create each teammate from the confirmed configuration table. Then use `team_send_message` to assign initial tasks to each spawned teammate. Do NOT end your turn until all teammates are spawned and tasked. +6. After `aion_create_team` returns → the Team has been created and the current conversation has been bound as Leader. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output only one brief user-facing handoff in the user's language. It should mean: the Team is ready, send the next message, and I will continue from there. Then END YOUR TURN. Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state in the user-facing handoff. 7. User declines or wants changes → adjust or proceed solo. Do not mention Team again unless the user asks. ### Tool constraint -Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: use team tools (`team_spawn_agent`, `team_send_message`, `team_members`, `team_task_create`, etc.) to manage your team."; +Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn. Team tools are only for normal Team runtime after the Team page accepts the user's first Team message and an active `TeamRun` exists."; /// Build the Team Guide prompt for a solo agent. /// @@ -128,14 +128,43 @@ If case 2 applies, ask at most once whether the user wants to bring in a Team. K | Tester | Write and run tests | claude | (model from list) |\n\ 4. **Output the table as a normal text message and END YOUR TURN.** Do NOT call `aion_create_team` or any other tool (including ask_user) in this turn. Wait for the user to reply in their next message with explicit confirmation (e.g. \"ok\", \"go ahead\", \"确认\") before proceeding.\n\ 5. After user confirms → call `aion_create_team`. The summary MUST include both the goal and the confirmed team configuration. (The system automatically sets the correct agent type — you do NOT need to pass agentType.)\n\ -6. After `aion_create_team` returns → you ARE now the team Leader. The system navigates to the team page automatically. **Immediately** use `team_spawn_agent` to create each teammate from the confirmed configuration table. Then use `team_send_message` to assign initial tasks to each spawned teammate. Do NOT end your turn until all teammates are spawned and tasked.\n\ +6. After `aion_create_team` returns → the Team has been created and the current conversation has been bound as Leader. **Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn.** Output only one brief user-facing handoff in the user's language. It should mean: the Team is ready, send the next message, and I will continue from there. Then END YOUR TURN. Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state in the user-facing handoff.\n\ 7. User declines or wants changes → adjust or proceed solo. Do not mention Team again unless the user asks.\n\ \n\ ### Tool constraint\n\ -Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: use team tools (`team_spawn_agent`, `team_send_message`, `team_members`, `team_task_create`, etc.) to manage your team."; +Before team creation: use **only** `aion_create_team` and `aion_list_models`. After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn. Team tools are only for normal Team runtime after the Team page accepts the user's first Team message and an active `TeamRun` exists."; assert_eq!(prompt, expected); } + #[test] + fn team_guide_prompt_hands_off_after_create_team() { + let prompt = build_team_guide_prompt("claude", None); + + assert!(prompt.contains( + "After `aion_create_team` returns → the Team has been created and the current conversation has been bound as Leader." + )); + assert!(prompt.contains( + "Do NOT call `team_spawn_agent`, `team_send_message`, or any other `team_*` tool in this solo turn." + )); + assert!(prompt.contains( + "Output only one brief user-facing handoff in the user's language. It should mean: the Team is ready, send the next message, and I will continue from there." + )); + assert!(prompt.contains( + "Do not mention the Team page, solo turn, `team_*` tools, `TeamRun`, or internal tool state in the user-facing handoff." + )); + assert!( + prompt.contains("After `aion_create_team` succeeds: do not call any `team_*` tools in this solo turn.") + ); + assert!( + !prompt.contains("Your team tools (team_spawn_agent, team_send_message, etc.) are now active."), + "prompt must not claim Team tools are active immediately after creation" + ); + assert!( + !prompt.contains("Immediately proceed to spawn teammates as planned"), + "prompt must not ask the solo agent to spawn teammates in the same solo turn" + ); + } + #[test] fn team_guide_prompt_with_preset_leader_label() { let prompt = build_team_guide_prompt("gemini", Some("Word Creator")); diff --git a/crates/aionui-team/src/provisioning.rs b/crates/aionui-team/src/provisioning.rs index b1c71fef9..54a8bd876 100644 --- a/crates/aionui-team/src/provisioning.rs +++ b/crates/aionui-team/src/provisioning.rs @@ -36,6 +36,7 @@ struct ProvisionedConversation { struct NewAgentProvisioning { user_id: String, team_id: String, + slot_id: String, name: String, role: TeammateRole, backend: String, @@ -44,6 +45,16 @@ struct NewAgentProvisioning { workspace: Option, } +pub(crate) struct PersistSpawnedAgentRequest { + pub user_id: String, + pub team_id: String, + pub slot_id: String, + pub name: String, + pub backend: String, + pub model: String, + pub custom_agent_id: Option, +} + pub struct TeamConversationCreateRequest { pub user_id: String, pub agent_type: AgentType, @@ -223,6 +234,7 @@ impl TeamAgentProvisioner { .provision_new_agent(NewAgentProvisioning { user_id: user_id.to_owned(), team_id: team.id.clone(), + slot_id: generate_id(), name: req.name, role, backend: req.backend, @@ -236,36 +248,29 @@ impl TeamAgentProvisioner { Ok(agent) } - pub(crate) async fn persist_spawned_agent( - &self, - user_id: &str, - team_id: &str, - name: String, - backend: String, - model: String, - custom_agent_id: Option, - ) -> Result { + pub(crate) async fn persist_spawned_agent(&self, req: PersistSpawnedAgentRequest) -> Result { let row = self .repo - .get_team(team_id) + .get_team(&req.team_id) .await? - .ok_or_else(|| TeamError::TeamNotFound(team_id.into()))?; + .ok_or_else(|| TeamError::TeamNotFound(req.team_id.clone()))?; let mut team = Team::from_row(&row)?; let workspace = self.workspace_resolver().resolve_for_new_agent(&row, &team).await?; let agent = self .provision_new_agent(NewAgentProvisioning { - user_id: user_id.to_owned(), - team_id: team_id.to_owned(), - name, + user_id: req.user_id, + team_id: req.team_id.clone(), + slot_id: req.slot_id, + name: req.name, role: TeammateRole::Teammate, - backend, - model, - custom_agent_id, + backend: req.backend, + model: req.model, + custom_agent_id: req.custom_agent_id, workspace: Some(workspace), }) .await?; team.agents.push(agent.clone()); - self.persist_agents(team_id, &team.agents).await?; + self.persist_agents(&req.team_id, &team.agents).await?; Ok(agent) } @@ -332,12 +337,11 @@ impl TeamAgentProvisioner { } async fn provision_new_agent(&self, input: NewAgentProvisioning) -> Result { - let slot_id = generate_id(); let conversation = self .create_or_adopt_conversation( &input.user_id, &input.team_id, - &slot_id, + &input.slot_id, input.role, &input.name, &input.backend, @@ -348,7 +352,7 @@ impl TeamAgentProvisioner { ) .await?; Ok(TeamAgent { - slot_id, + slot_id: input.slot_id, name: input.name, role: input.role, conversation_id: conversation.conversation_id, diff --git a/crates/aionui-team/src/scheduler/actions.rs b/crates/aionui-team/src/scheduler/actions.rs index 6bc173ca6..edd540991 100644 --- a/crates/aionui-team/src/scheduler/actions.rs +++ b/crates/aionui-team/src/scheduler/actions.rs @@ -2,7 +2,7 @@ use tracing::debug; use super::TeammateManager; use crate::error::TeamError; -use crate::types::{MailboxMessageType, TeammateRole}; +use crate::types::{MailboxMessage, MailboxMessageType, TeammateRole}; #[derive(Debug, Clone, PartialEq)] pub enum SchedulerAction { @@ -102,7 +102,7 @@ impl TeammateManager { Ok(None) } SchedulerAction::ShutdownAgent { slot_id, reason } => { - self.handle_shutdown_agent(from_slot_id, slot_id, reason.as_deref()) + self.request_shutdown_agent(from_slot_id, slot_id, reason.as_deref()) .await?; Ok(None) } @@ -169,12 +169,12 @@ impl TeammateManager { self.mark_idle(from_slot_id, summary).await } - async fn handle_shutdown_agent( + pub async fn request_shutdown_agent( &self, from_slot_id: &str, target_slot_id: &str, reason: Option<&str>, - ) -> Result<(), TeamError> { + ) -> Result { let from_role = { let slots = self.slots.lock().await; let slot = slots @@ -206,9 +206,7 @@ impl TeammateManager { reason.unwrap_or("shutdown requested"), None, ) - .await?; - - Ok(()) + .await } async fn handle_rename_agent(&self, slot_id: &str, new_name: &str) -> Result<(), TeamError> { diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index c99c363de..8e43dde6f 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -900,23 +900,6 @@ impl TeamSessionService { Ok(()) } - pub(crate) async fn wake_agent_for_team_work( - &self, - team_id: &str, - slot_id: &str, - source: TeamWakeSource, - trigger_message_id: Option, - ) -> Result<(), TeamError> { - let entry = self - .sessions - .get(team_id) - .ok_or_else(|| TeamError::SessionNotFound(team_id.into()))?; - entry - .session - .wake_agent_for_team_work(slot_id, source, trigger_message_id) - .await - } - pub(crate) async fn send_agent_message_from_agent( &self, team_id: &str, @@ -937,6 +920,23 @@ impl TeamSessionService { .await } + pub async fn shutdown_agent_in_session( + &self, + team_id: &str, + caller_slot_id: &str, + target_slot_id: &str, + reason: Option, + ) -> Result<(), TeamError> { + let session = { + let entry = self + .sessions + .get(team_id) + .ok_or_else(|| TeamError::SessionNotFound(team_id.into()))?; + Arc::clone(&entry.session) + }; + session.shutdown_agent(caller_slot_id, target_slot_id, reason).await + } + pub(crate) fn notify_reserved_wake_for_team_work( &self, team_id: &str, @@ -959,6 +959,10 @@ impl TeamSessionService { .notify_reserved_wake_for_team_work(slot_id, target_role, source); } + /// Friendly pre-check used by Guide MCP to return handoff copy before invoking + /// run-scoped team tools. This is not a concurrency guarantee; any operation + /// that writes mailbox, projection, scheduler, spawn, shutdown, or wake state + /// must still acquire a TeamRun operation lease in TeamSession/TeamRunManager. pub(crate) async fn require_active_team_run_for_team_work(&self, team_id: &str) -> Result<(), TeamError> { let entry = self .sessions diff --git a/crates/aionui-team/src/service/spawn_support.rs b/crates/aionui-team/src/service/spawn_support.rs index 08c8f0f94..f9135734d 100644 --- a/crates/aionui-team/src/service/spawn_support.rs +++ b/crates/aionui-team/src/service/spawn_support.rs @@ -3,6 +3,8 @@ use aionui_api_types::BehaviorPolicy; use aionui_common::AgentType; use aionui_common::constants::{TEAM_CAPABLE_BACKENDS, has_mcp_capability}; +use crate::provisioning::PersistSpawnedAgentRequest; + /// Known ACP vendor labels. Kept in lockstep with the `agent_metadata` /// seed in `005_agent_metadata.sql` — a caller hitting an unknown /// vendor should trigger a schema drift discussion, not silently fall @@ -189,7 +191,6 @@ impl TeamSessionService { caller_slot_id: &str, req: crate::session::SpawnAgentRequest, ) -> Result { - self.require_active_team_run_for_team_work(team_id).await?; let entry = self .sessions .get(team_id) @@ -214,25 +215,15 @@ impl TeamSessionService { /// The lock is *not* held across the process warmup step — callers /// (`TeamSession::spawn_agent`) wire that up separately so a slow /// `warmup` never stalls other spawns against the same team. - pub(crate) async fn persist_spawned_agent( - &self, - team_id: &str, - user_id: &str, - name: String, - backend: String, - model: String, - custom_agent_id: Option, - ) -> Result { + pub(crate) async fn persist_spawned_agent(&self, req: PersistSpawnedAgentRequest) -> Result { let lock = self .add_agent_locks - .entry(team_id.to_owned()) + .entry(req.team_id.clone()) .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) .clone(); let _guard = lock.lock().await; - self.provisioner() - .persist_spawned_agent(user_id, team_id, name, backend, model, custom_agent_id) - .await + self.provisioner().persist_spawned_agent(req).await } } @@ -297,14 +288,15 @@ mod tests { force_team_workspace(&team_repo, &created.id, "").await; let spawned = svc - .persist_spawned_agent( - &created.id, - "user1", - "Spawned".into(), - "acp".into(), - "claude".into(), - None, - ) + .persist_spawned_agent(PersistSpawnedAgentRequest { + team_id: created.id.clone(), + user_id: "user1".into(), + slot_id: "spawn-slot-1".into(), + name: "Spawned".into(), + backend: "acp".into(), + model: "claude".into(), + custom_agent_id: None, + }) .await .unwrap(); diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index 8113270f5..fbddeaccd 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -6,7 +6,7 @@ use aionui_api_types::{ TeamRunAckResponse, TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, TeamSendMessageTargetQueueState, TeamSlotRuntimeHealth, TeamSlotWorkPayload, }; -use aionui_common::AgentKillReason; +use aionui_common::{AgentKillReason, generate_id}; use aionui_db::ITeamRepository; use aionui_realtime::EventBroadcaster; use tracing::{info, warn}; @@ -21,10 +21,13 @@ use crate::message_projection::{ }; use crate::ports::{AgentTurnCancellationPort, AgentTurnExecutionPort}; use crate::prompts::{build_lead_prompt, build_teammate_prompt, build_wake_payload}; +use crate::provisioning::PersistSpawnedAgentRequest; use crate::scheduler::{TeammateManager, normalize_name}; use crate::service::TeamSessionService; use crate::task_board::TaskBoard; -use crate::team_run::{ChildCancelTarget, TeamRunManager, WakeRecordDecision, target_role_for}; +#[cfg(test)] +use crate::team_run::WakeRecordDecision; +use crate::team_run::{ChildCancelTarget, TeamRunManager, TeamRunWakeAcquireOutcome, target_role_for}; use crate::types::{MailboxMessageType, Team, TeamAgent, TeammateRole, TeammateStatus}; use crate::wake::TeamWakeSource; @@ -369,9 +372,9 @@ impl TeamSession { .ok_or_else(|| TeamError::AgentNotFound("no lead agent in team".into()))?; let lead_conv_id = self.scheduler.get_agent(&lead_slot_id).await?.conversation_id; - let mut ack = self + let (mut ack, lease) = self .team_run_manager - .accept_user_message(&lead_slot_id, TeamRunTargetRole::Lead, true, None) + .acquire_user_message_wake(&lead_slot_id, TeamRunTargetRole::Lead) .await?; let mailbox_message = match self @@ -389,7 +392,10 @@ impl TeamSession { { Ok(message) => message, Err(err) => { - self.team_run_manager.complete_failed().await; + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await; return Err(err); } }; @@ -414,12 +420,10 @@ impl TeamSession { } let _ = files; - self.wake_agent_for_team_work( - &lead_slot_id, - TeamWakeSource::UserMessage, - Some(mailbox_message.id.clone()), - ) - .await?; + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(mailbox_message.id.clone())) + .await?; + self.notify_reserved_wake_for_team_work(&lead_slot_id, lease.role.clone(), lease.wake_source); Ok(ack) } @@ -434,15 +438,9 @@ impl TeamSession { files: Option>, ) -> Result { let agent = self.scheduler.get_agent(slot_id).await?; - let has_active_run = self.team_run_manager.active_run_id().await.is_some(); - let source = if has_active_run { - TeamWakeSource::UserIntervention - } else { - TeamWakeSource::UserMessage - }; - let mut ack = self + let (mut ack, lease) = self .team_run_manager - .accept_user_message(slot_id, target_role_for(agent.role), has_active_run, None) + .acquire_user_message_wake(slot_id, target_role_for(agent.role)) .await?; let mailbox_message = match self @@ -460,7 +458,10 @@ impl TeamSession { { Ok(message) => message, Err(err) => { - self.team_run_manager.complete_failed().await; + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await; return Err(err); } }; @@ -485,8 +486,10 @@ impl TeamSession { } let _ = files; - self.wake_agent_for_team_work(slot_id, source, Some(mailbox_message.id.clone())) + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(mailbox_message.id.clone())) .await?; + self.notify_reserved_wake_for_team_work(slot_id, lease.role.clone(), lease.wake_source); Ok(ack) } @@ -498,8 +501,46 @@ impl TeamSession { ) -> Result { let to_agent = self.scheduler.get_agent(to_slot_id).await?; let from_agent = self.scheduler.get_agent(from_slot_id).await?; + let to_role = target_role_for(to_agent.role); + let outcome = self + .team_run_manager + .acquire_run_scoped_wake(to_slot_id, to_role.clone(), TeamWakeSource::McpSendMessage) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + let (team_run_id, work) = self + .team_run_manager + .slot_work_for_slot(to_slot_id) + .await + .ok_or_else(|| TeamError::InvalidRequest(format!("no active team run work for slot {to_slot_id}")))?; + let queue_state = classify_send_message_queue_state(&work, true); + let target = TeamSendMessageTargetQueueState { + slot_id: work.slot_id, + role: work.role, + queue_state, + pending_wake_count: work.pending_wake_count, + starting_child_count: work.starting_child_count, + active_turn_id: work.active_turn_id, + suppressed_wake_count: work.suppressed_wake_count, + }; + info!( + team_id = %self.team.id, + team_run_id = %team_run_id, + caller_slot_id = from_slot_id, + target_slot_id = to_slot_id, + target_role = ?target.role, + wake_source = %TeamWakeSource::McpSendMessage, + suppressed_wake_count = target.suppressed_wake_count, + reason = ?target.queue_state, + "team_agent_message_wake_suppressed" + ); + return Ok(AgentMessageQueueResult { + team_run_id, + delivery: TeamSendMessageDelivery::WakeSuppressed, + target, + }); + }; - let mailbox_message = self + let mailbox_message = match self .mailbox .write( &self.team.id, @@ -509,7 +550,17 @@ impl TeamSession { content, None, ) - .await?; + .await + { + Ok(message) => message, + Err(err) => { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await; + return Err(err); + } + }; let projection = TeamMessageProjection::new(self.projection_store.clone(), self.broadcaster.clone()); let request = TeamProjectionRequest { @@ -544,18 +595,11 @@ impl TeamSession { ); } - let to_role = target_role_for(to_agent.role); - let decision = self - .team_run_manager - .record_or_suppress_wake( - to_slot_id, - to_role.clone(), - TeamWakeSource::McpSendMessage, - Some(mailbox_message.id.clone()), - ) + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(mailbox_message.id.clone())) .await?; let target_event_loop_registered = self.event_loops.has(to_slot_id); - if matches!(decision, WakeRecordDecision::Recorded) && !target_event_loop_registered { + if !target_event_loop_registered { warn!( team_id = %self.team.id, slot_id = to_slot_id, @@ -573,14 +617,7 @@ impl TeamSession { .slot_work_for_slot(to_slot_id) .await .ok_or_else(|| TeamError::InvalidRequest(format!("no active team run work for slot {to_slot_id}")))?; - let delivery = match decision { - WakeRecordDecision::Recorded => TeamSendMessageDelivery::WakeRecorded, - WakeRecordDecision::Suppressed => TeamSendMessageDelivery::WakeSuppressed, - }; - let queue_state = classify_send_message_queue_state( - &work, - target_event_loop_registered || !matches!(delivery, TeamSendMessageDelivery::WakeRecorded), - ); + let queue_state = classify_send_message_queue_state(&work, target_event_loop_registered); let target = TeamSendMessageTargetQueueState { slot_id: work.slot_id, role: work.role, @@ -607,17 +644,78 @@ impl TeamSession { "team_agent_message_queued" ); - if matches!(decision, WakeRecordDecision::Recorded) && target_event_loop_registered { + if target_event_loop_registered { self.notify_reserved_wake_for_team_work(to_slot_id, to_role, TeamWakeSource::McpSendMessage); } Ok(AgentMessageQueueResult { team_run_id, - delivery, + delivery: TeamSendMessageDelivery::WakeRecorded, target, }) } + pub(crate) async fn shutdown_agent( + &self, + caller_slot_id: &str, + target_slot_id: &str, + reason: Option, + ) -> Result<(), TeamError> { + let caller = self.scheduler.get_agent(caller_slot_id).await?; + if caller.role != TeammateRole::Lead { + return Err(TeamError::LeaderOnly("team_shutdown_agent".into())); + } + let target = self.scheduler.get_agent(target_slot_id).await?; + if target.role == TeammateRole::Lead { + return Err(TeamError::InvalidRequest("cannot shutdown the team lead".into())); + } + + let outcome = self + .team_run_manager + .acquire_run_scoped_wake( + target_slot_id, + target_role_for(target.role), + TeamWakeSource::McpShutdownRequest, + ) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Err(TeamError::InvalidRequest("shutdown wake was suppressed".into())); + }; + + let shutdown_message = match self + .scheduler + .request_shutdown_agent(caller_slot_id, target_slot_id, reason.as_deref()) + .await + { + Ok(message) => message, + Err(err) => { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "shutdown_scheduler_action_failed") + .await; + return Err(err); + } + }; + + if shutdown_message.to_agent_id != target_slot_id { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "shutdown_mailbox_target_mismatch") + .await; + return Err(TeamError::InvalidRequest(format!( + "shutdown mailbox target mismatch: expected {target_slot_id}, got {}", + shutdown_message.to_agent_id + ))); + } + + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(shutdown_message.id.clone())) + .await?; + self.notify_reserved_wake_for_team_work(target_slot_id, lease.role.clone(), lease.wake_source); + Ok(()) + } + + #[cfg(test)] pub(crate) async fn wake_agent_for_team_work( &self, slot_id: &str, @@ -646,6 +744,7 @@ impl TeamSession { Ok(()) } + #[cfg(test)] pub(crate) async fn reserve_wake_for_team_work( &self, slot_id: &str, @@ -688,6 +787,28 @@ impl TeamSession { ); } + pub(crate) async fn scheduler_wake_agent_for_team_work( + &self, + slot_id: &str, + source: TeamWakeSource, + ) -> Result<(), TeamError> { + let agent = self.scheduler.get_agent(slot_id).await?; + let target_role = target_role_for(agent.role); + let outcome = self + .team_run_manager + .acquire_scheduler_wake(slot_id, target_role.clone(), source) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Ok(()); + }; + + self.team_run_manager + .commit_operation_lease(&lease.lease_id, None) + .await?; + self.notify_reserved_wake_for_team_work(slot_id, target_role, source); + Ok(()) + } + /// Mirror each non-user mailbox row into the target agent's conversation /// as a left bubble so the UI shows "who said what" when the user opens /// an agent's chat panel. @@ -936,8 +1057,7 @@ impl TeamSession { async fn notify_leader_child_interrupted(&self, slot_id: &str, reason: Option) -> Result<(), TeamError> { if let Some(lead_slot_id) = self.scheduler.find_lead_slot_id().await { let content = reason.unwrap_or_else(|| format!("Agent {slot_id} was interrupted by the user.")); - let mailbox_message = self - .mailbox + self.mailbox .write( &self.team.id, &lead_slot_id, @@ -947,12 +1067,8 @@ impl TeamSession { Some("Interrupted by user"), ) .await?; - self.wake_agent_for_team_work( - &lead_slot_id, - TeamWakeSource::InterruptedNotification, - Some(mailbox_message.id), - ) - .await?; + self.wake_leader_after_recovery_message(slot_id, TeamWakeSource::InterruptedNotification) + .await?; } Ok(()) } @@ -966,8 +1082,7 @@ impl TeamSession { return Err(TeamError::AgentNotFound("lead".into())); }; let content = format!("Spawned teammate {failed_slot_id} failed to attach its runtime. Error: {error}"); - let mailbox_message = self - .mailbox + self.mailbox .write( &self.team.id, &lead_slot_id, @@ -977,12 +1092,8 @@ impl TeamSession { None, ) .await?; - self.wake_agent_for_team_work( - &lead_slot_id, - TeamWakeSource::SpawnAttachFailure, - Some(mailbox_message.id), - ) - .await + self.wake_leader_after_recovery_message(failed_slot_id, TeamWakeSource::SpawnAttachFailure) + .await } pub(crate) async fn wake_leader_after_recovery_message( @@ -993,17 +1104,34 @@ impl TeamSession { let Some(lead_slot_id) = self.scheduler.find_lead_slot_id().await else { return Err(TeamError::AgentNotFound("lead".into())); }; - if self.team_run_manager.active_run_id().await.is_some() { - return self.wake_agent_for_team_work(&lead_slot_id, source, None).await; - } - info!( - team_id = %self.team.id, - slot_id = %lead_slot_id, - source_slot_id, - wake_source = %source, - wake_policy = "deferred_mailbox_only", - "leader recovery message deferred because no active team run exists" - ); + let outcome = match self + .team_run_manager + .acquire_run_scoped_wake(&lead_slot_id, TeamRunTargetRole::Lead, source) + .await + { + Ok(outcome) => outcome, + Err(TeamError::InvalidRequest(message)) if message == "no active team run for run-scoped wake" => { + info!( + team_id = %self.team.id, + slot_id = %lead_slot_id, + source_slot_id, + wake_source = %source, + wake_policy = "deferred_mailbox_only", + "leader recovery message deferred because no active team run exists" + ); + return Ok(()); + } + Err(err) => return Err(err), + }; + + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Ok(()); + }; + + self.team_run_manager + .commit_operation_lease(&lease.lease_id, None) + .await?; + self.notify_reserved_wake_for_team_work(&lead_slot_id, TeamRunTargetRole::Lead, source); Ok(()) } @@ -1101,16 +1229,36 @@ impl TeamSession { .await .unwrap_or_else(|| caller.model.clone()), }; - let new_agent = service - .persist_spawned_agent( - &self.team.id, - &self.user_id, - requested_name, + let new_slot_id = generate_id(); + let outcome = self + .team_run_manager + .acquire_run_scoped_wake(&new_slot_id, TeamRunTargetRole::Teammate, TeamWakeSource::SpawnWelcome) + .await?; + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + return Err(TeamError::InvalidRequest("spawn welcome wake was suppressed".into())); + }; + + let new_agent = match service + .persist_spawned_agent(PersistSpawnedAgentRequest { + team_id: self.team.id.clone(), + user_id: self.user_id.clone(), + slot_id: new_slot_id.clone(), + name: requested_name, backend, model, - req.custom_agent_id.clone(), - ) - .await?; + custom_agent_id: req.custom_agent_id.clone(), + }) + .await + { + Ok(agent) => agent, + Err(err) => { + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "spawn_persistence_failed") + .await; + return Err(err); + } + }; // Step 5: attach to the in-memory scheduler so wake-from-lead finds // the new slot immediately. @@ -1119,7 +1267,7 @@ impl TeamSession { // Step 6: welcome message. The mailbox write is the source of truth — // if the wake never fires (e.g. warmup raced), the next caller-triggered // wake will still drain this entry. - let welcome_message = self + let welcome_message = match self .mailbox .write( &self.team.id, @@ -1129,21 +1277,34 @@ impl TeamSession { "You have been spawned as a teammate. Read your mailbox and wait for instructions.", None, ) - .await?; + .await + { + Ok(message) => message, + Err(err) => { + if let Some(service) = self.service.upgrade() { + let _ = service + .remove_agent(&self.user_id, &self.team.id, &new_agent.slot_id) + .await; + } else { + let _ = self.scheduler.remove_agent(&new_agent.slot_id).await; + } + let _ = self + .team_run_manager + .abort_operation_lease(&lease.lease_id, "spawn_welcome_mailbox_write_failed") + .await; + return Err(err); + } + }; - let spawn_welcome_role = self - .reserve_wake_for_team_work( - &new_agent.slot_id, - TeamWakeSource::SpawnWelcome, - Some(welcome_message.id), - ) - .await? - .ok_or_else(|| TeamError::InvalidRequest("spawn welcome wake was suppressed".into()))?; + self.team_run_manager + .commit_operation_lease(&lease.lease_id, Some(welcome_message.id.clone())) + .await?; + let spawn_welcome_role = lease.role.clone(); info!( team_id = %self.team.id, slot_id = %new_agent.slot_id, target_role = ?spawn_welcome_role, - wake_source = %TeamWakeSource::SpawnWelcome, + wake_source = %lease.wake_source, "spawn welcome wake reserved before runtime attach" ); @@ -1273,7 +1434,7 @@ mod tests { use aionui_ai_agent::AgentError; use aionui_ai_agent::agent_task::AgentInstance; use aionui_ai_agent::types::BuildTaskOptions; - use aionui_api_types::{TeamSendMessageDelivery, TeamSendMessageReason, WebSocketMessage}; + use aionui_api_types::{TeamRunStatus, TeamSendMessageDelivery, TeamSendMessageReason, WebSocketMessage}; use aionui_common::{AgentKillReason, TimestampMs, now_ms}; use std::sync::{Arc, Mutex}; @@ -1404,6 +1565,64 @@ mod tests { } } + #[derive(Default)] + struct CompletingProjectionStore { + manager: Mutex>>, + } + + impl CompletingProjectionStore { + fn set_manager(&self, manager: Arc) { + *self.manager.lock().unwrap() = Some(manager); + } + } + + #[async_trait::async_trait] + impl TeamProjectionMessageStore for CompletingProjectionStore { + fn mint_message_id(&self) -> String { + "projection-completes-run".into() + } + + async fn find_projected_message( + &self, + _conversation_id: &str, + _msg_id: &str, + _msg_type: &str, + ) -> Result, TeamError> { + Ok(None) + } + + async fn insert_projected_message(&self, _row: &aionui_db::models::MessageRow) -> Result<(), TeamError> { + let manager = self.manager.lock().unwrap().clone(); + if let Some(manager) = manager { + manager.maybe_complete().await; + } + Ok(()) + } + } + + #[derive(Default)] + struct FailingProjectionStore; + + #[async_trait::async_trait] + impl TeamProjectionMessageStore for FailingProjectionStore { + fn mint_message_id(&self) -> String { + "projection-fails".into() + } + + async fn find_projected_message( + &self, + _conversation_id: &str, + _msg_id: &str, + _msg_type: &str, + ) -> Result, TeamError> { + Ok(None) + } + + async fn insert_projected_message(&self, _row: &aionui_db::models::MessageRow) -> Result<(), TeamError> { + Err(TeamError::InvalidRequest("projection failed for test".into())) + } + } + /// RecordingBroadcaster used by the D29d-1 ratification test below to /// assert that `team.agentSpawned` is *not* emitted on failed spawns. #[derive(Default)] @@ -1603,6 +1822,27 @@ mod tests { .unwrap() } + async fn start_session_with_repo_and_projection_store( + repo: Arc, + store: Arc, + ) -> TeamSession { + let broadcaster: Arc = Arc::new(NullBroadcaster); + TeamSession::start( + make_team(), + repo, + broadcaster, + backend_path(), + empty_task_manager(), + noop_turn_port(), + noop_cancellation_port(), + store, + "user-test".into(), + Weak::::new(), + ) + .await + .unwrap() + } + async fn start_session_with_cancellation_port( cancellation_port: Arc, ) -> TeamSession { @@ -1654,6 +1894,31 @@ mod tests { session.stop(); } + #[tokio::test] + async fn agent_send_message_survives_completion_between_projection_and_commit() { + let store = Arc::new(CompletingProjectionStore::default()); + let session = start_session_with_projection_store(store.clone()).await; + store.set_manager(session.team_run_manager().clone()); + session + .team_run_manager() + .accept_user_message("lead-1", TeamRunTargetRole::Lead, false, None) + .await + .expect("active run before agent message"); + + session + .send_agent_message_from_agent("lead-1", "worker-1", "lease protected") + .await + .expect("agent message lease should retain run while projection completes it"); + + let unread = session + .mailbox() + .peek_unread(session.team_id(), "worker-1") + .await + .unwrap(); + assert!(unread.iter().any(|msg| msg.content == "lease protected")); + session.stop(); + } + #[tokio::test] async fn agent_message_to_idle_target_returns_queued_for_idle() { let session = start_session_arc().await; @@ -1747,19 +2012,21 @@ mod tests { } #[tokio::test] - async fn leader_message_rejects_when_leader_slot_is_busy() { + async fn leader_message_queues_when_leader_slot_has_pending_wake() { let session = start_session().await; - session + let first = session .send_message("First leader message", None) .await .expect("first leader send creates run"); - let err = session + let second = session .send_message("Second leader message", None) .await - .expect_err("leader pending wake must make leader slot busy"); + .expect("leader pending wake should accept additional foreground message"); - assert!(matches!(err, TeamError::SlotBusy(slot_id) if slot_id == "lead-1")); + assert_eq!(second.team_run_id, first.team_run_id); + let payload = session.team_run_manager().current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 2); session.stop(); } @@ -2048,7 +2315,7 @@ mod tests { } #[tokio::test] - async fn paused_slot_suppresses_background_wake_and_keeps_mailbox_unread() { + async fn paused_slot_suppresses_mcp_send_without_mailbox_write() { let session = start_session().await; let ack = session.send_message("start", None).await.unwrap(); session @@ -2072,7 +2339,10 @@ mod tests { .peek_unread(session.team_id(), "lead-1") .await .unwrap(); - assert!(unread.iter().any(|msg| msg.content == "background update")); + assert!( + unread.iter().all(|msg| msg.content != "background update"), + "suppressed acquire must not write mailbox" + ); session.stop(); } @@ -2221,7 +2491,7 @@ mod tests { } #[tokio::test] - async fn crash_notification_without_active_run_is_deferred_mailbox_only() { + async fn recovery_wake_without_active_run_is_deferred_without_active_run_snapshot_branch() { let session = start_session().await; session @@ -2362,6 +2632,67 @@ mod tests { session.stop(); } + #[tokio::test] + async fn shutdown_without_active_run_does_not_write_mailbox() { + let session = start_session().await; + + let err = session + .shutdown_agent("lead-1", "worker-1", Some("done".into())) + .await + .expect_err("shutdown is run-scoped"); + + assert!(matches!( + err, + TeamError::InvalidRequest(message) + if message == "no active team run for run-scoped wake" + )); + let unread = session.mailbox().peek_unread("t1", "worker-1").await.unwrap(); + assert!(unread.is_empty()); + session.stop(); + } + + #[tokio::test] + async fn shutdown_busy_target_is_accepted_as_lifecycle_wake() { + let session = start_session().await; + let ack = session.send_message("start", None).await.unwrap(); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("lead-1", TeamRunTargetRole::Lead, "c1") + .await + .unwrap(); + session + .team_run_manager() + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id, + slot_id: "lead-1".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "c1".into(), + turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, + }, + ) + .await; + + session + .shutdown_agent("lead-1", "worker-1", Some("done".into())) + .await + .expect("shutdown lifecycle wake should be accepted"); + + let unread = session.mailbox().peek_unread("t1", "worker-1").await.unwrap(); + assert_eq!(unread.len(), 1); + assert_eq!(unread[0].msg_type, MailboxMessageType::ShutdownRequest); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") + .await + .expect("shutdown wake should be pending"); + assert_eq!(reservation.wake_source, TeamWakeSource::McpShutdownRequest); + session.stop(); + } + #[tokio::test] async fn add_and_remove_agent() { let session = start_session().await; @@ -2733,6 +3064,73 @@ mod tests { session.stop(); } + #[tokio::test] + async fn send_message_to_agent_survives_completion_between_projection_and_commit() { + let store = Arc::new(CompletingProjectionStore::default()); + let session = start_session_with_projection_store(store.clone()).await; + store.set_manager(session.team_run_manager().clone()); + session + .team_run_manager() + .accept_user_message("lead-1", TeamRunTargetRole::Lead, false, None) + .await + .expect("active run before user intervention"); + + let ack = session + .send_message_to_agent("worker-1", "race window message", None) + .await + .expect("lease should retain run while projection completes it"); + + assert_eq!(ack.accepted_slot_id, "worker-1"); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") + .await + .expect("committed user intervention should be pending"); + assert_eq!(reservation.message_id, ack.message_id); + session.stop(); + } + + #[tokio::test] + async fn send_message_projection_failure_still_commits_wake() { + let session = start_session_with_projection_store(Arc::new(FailingProjectionStore)).await; + + let ack = session + .send_message_to_agent("worker-1", "projection can fail", None) + .await + .expect("projection failure is non-fatal"); + + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") + .await + .expect("wake should still be committed"); + assert_eq!(reservation.message_id, ack.message_id); + session.stop(); + } + + #[tokio::test] + async fn send_message_mailbox_failure_aborts_lease_and_allows_completion() { + let repo: Arc = Arc::new(MockTeamRepo::with_message_write_failure()); + let session = start_session_with_repo_and_projection_store(repo, noop_projection_store()).await; + + let err = session + .send_message("mailbox fails", None) + .await + .expect_err("mailbox write failure must be returned"); + assert!( + err.to_string().contains("forced mailbox write failure"), + "unexpected error: {err}" + ); + + let completed = session + .team_run_manager() + .maybe_complete() + .await + .expect("aborted lease should allow empty run to complete"); + assert_eq!(completed.status, TeamRunStatus::Completed); + session.stop(); + } + #[tokio::test] async fn mirror_unread_to_conversation_skips_when_service_weak_is_dangling() { let session = start_session().await; diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index 672aa7c6d..f6072992d 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -7,7 +7,7 @@ use aionui_api_types::{ }; use aionui_common::{TimestampMs, generate_id, now_ms}; use tokio::sync::Mutex; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use crate::error::TeamError; use crate::events::{ @@ -15,7 +15,9 @@ use crate::events::{ TEAM_RUN_ACCEPTED_EVENT, TEAM_RUN_CANCELLED_EVENT, TEAM_RUN_COMPLETED_EVENT, TEAM_RUN_FAILED_EVENT, TEAM_RUN_STARTED_EVENT, TEAM_RUN_UPDATED_EVENT, TeamEventEmitter, }; -use crate::slot_wake_gate::{SlotWakeGate, WakeGateDecision}; +use crate::slot_wake_gate::SlotWakeGate; +#[cfg(test)] +use crate::slot_wake_gate::WakeGateDecision; use crate::types::TeammateRole; use crate::wake::TeamWakeSource; @@ -46,6 +48,8 @@ pub struct StartingChildReservation { pub slot_id: String, pub role: TeamRunTargetRole, pub conversation_id: String, + pub(crate) wake_source: TeamWakeSource, + pub(crate) message_id: Option, pub state: StartingReservationState, } @@ -70,12 +74,56 @@ pub enum ChildCancelTarget { Starting(StartingChildReservation), } +#[cfg(test)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum WakeRecordDecision { Recorded, Suppressed, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct TeamRunOperationLease { + pub lease_id: String, + pub team_run_id: String, + pub slot_id: String, + pub role: TeamRunTargetRole, + pub wake_source: TeamWakeSource, + pub accepted_as_new_run: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ActiveOperationLease { + lease: TeamRunOperationLease, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum TeamRunWakeAcquireOutcome { + Accepted(TeamRunOperationLease), + Suppressed, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TeamRunSlotState { + Busy, + Pending, + Paused, + Idle, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AcquirePolicyDecision { + Accept, + Suppress(&'static str), + RejectSlotBusy, + RejectInvalid(&'static str), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TeamRunWakeIntent { + ExternalRequest, + SchedulerWakeTarget, +} + #[derive(Debug, Clone)] pub struct PauseSlotOutcome { pub team_run_id: String, @@ -105,6 +153,7 @@ struct TeamRunRecord { pending_wakes: HashMap>, slot_runtime_health: HashMap, slot_wake_gate: SlotWakeGate, + active_operation_leases: HashMap, } impl TeamRunRecord { @@ -123,6 +172,32 @@ impl TeamRunRecord { .count() } + fn active_operation_lease_count(&self) -> usize { + self.active_operation_leases.len() + } + + fn slot_run_state(&self, slot_id: &str) -> TeamRunSlotState { + if self.active_child_turns.contains_key(slot_id) || self.starting_child_count_for_slot(slot_id) > 0 { + return TeamRunSlotState::Busy; + } + if self.pending_wake_count_for_slot(slot_id) > 0 { + return TeamRunSlotState::Pending; + } + if self.slot_wake_gate.snapshot_for_slot(slot_id).paused { + return TeamRunSlotState::Paused; + } + TeamRunSlotState::Idle + } + + fn has_spawn_welcome_for_slot(&self, slot_id: &str) -> bool { + self.pending_wakes + .get(slot_id) + .is_some_and(|wakes| wakes.iter().any(|wake| wake.source == TeamWakeSource::SpawnWelcome)) + || self.active_operation_leases.values().any(|active| { + active.lease.slot_id == slot_id && active.lease.wake_source == TeamWakeSource::SpawnWelcome + }) + } + fn role_for_slot(&self, slot_id: &str) -> Option { self.active_child_turns .get(slot_id) @@ -239,6 +314,93 @@ impl TeamRunRecord { } } +fn new_operation_lease( + run: &mut TeamRunRecord, + slot_id: &str, + role: TeamRunTargetRole, + wake_source: TeamWakeSource, + accepted_as_new_run: bool, +) -> TeamRunOperationLease { + let lease = TeamRunOperationLease { + lease_id: generate_id(), + team_run_id: run.team_run_id.clone(), + slot_id: slot_id.to_owned(), + role, + wake_source, + accepted_as_new_run, + }; + run.active_operation_leases + .insert(lease.lease_id.clone(), ActiveOperationLease { lease: lease.clone() }); + lease +} + +fn push_pending_wake_locked( + run: &mut TeamRunRecord, + slot_id: String, + role: TeamRunTargetRole, + source: TeamWakeSource, + message_id: Option, +) { + run.pending_wakes + .entry(slot_id.clone()) + .or_default() + .push_back(PendingWake { + slot_id, + role, + source, + message_id, + }); +} + +fn acquire_policy( + source: TeamWakeSource, + slot_state: TeamRunSlotState, + has_spawn_welcome: bool, + intent: TeamRunWakeIntent, +) -> AcquirePolicyDecision { + match source { + TeamWakeSource::UserMessage | TeamWakeSource::UserIntervention => match slot_state { + TeamRunSlotState::Busy => AcquirePolicyDecision::RejectSlotBusy, + TeamRunSlotState::Pending | TeamRunSlotState::Paused | TeamRunSlotState::Idle => { + AcquirePolicyDecision::Accept + } + }, + TeamWakeSource::McpSendMessage => match slot_state { + TeamRunSlotState::Paused => AcquirePolicyDecision::Suppress("paused_slot_background_wake"), + TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Idle => { + AcquirePolicyDecision::Accept + } + }, + TeamWakeSource::SpawnWelcome => { + if has_spawn_welcome { + return AcquirePolicyDecision::Suppress("duplicate_spawn_welcome"); + } + match slot_state { + TeamRunSlotState::Busy => AcquirePolicyDecision::RejectInvalid("spawn welcome target is already busy"), + TeamRunSlotState::Pending => AcquirePolicyDecision::Suppress("spawn_welcome_already_pending"), + TeamRunSlotState::Paused | TeamRunSlotState::Idle => AcquirePolicyDecision::Accept, + } + } + TeamWakeSource::McpShutdownRequest => AcquirePolicyDecision::Accept, + TeamWakeSource::IdleNotification | TeamWakeSource::InterruptedNotification => match slot_state { + TeamRunSlotState::Idle if intent == TeamRunWakeIntent::SchedulerWakeTarget => AcquirePolicyDecision::Accept, + TeamRunSlotState::Idle => AcquirePolicyDecision::Suppress("background_notification_without_wake_target"), + TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Paused => { + AcquirePolicyDecision::Suppress("background_notification_deduped") + } + }, + TeamWakeSource::CrashNotification + | TeamWakeSource::InactivityTimeout + | TeamWakeSource::SpawnAttachFailure + | TeamWakeSource::ShutdownRejected => match slot_state { + TeamRunSlotState::Paused => AcquirePolicyDecision::Suppress("paused_slot_recovery_wake"), + TeamRunSlotState::Busy | TeamRunSlotState::Pending | TeamRunSlotState::Idle => { + AcquirePolicyDecision::Accept + } + }, + } +} + #[derive(Clone)] pub struct TeamRunManager { team_id: String, @@ -310,6 +472,7 @@ impl TeamRunManager { pending_wakes: HashMap::new(), slot_runtime_health: HashMap::new(), slot_wake_gate: SlotWakeGate::default(), + active_operation_leases: HashMap::new(), }; let ack = record.ack(target_slot_id, target_role, message_id); let payload = record.payload(); @@ -327,6 +490,263 @@ impl TeamRunManager { Ok(ack) } + pub(crate) async fn acquire_user_message_wake( + &self, + slot_id: &str, + role: TeamRunTargetRole, + ) -> Result<(TeamRunAckResponse, TeamRunOperationLease), TeamError> { + let mut guard = self.state.lock().await; + + if let Some(run) = guard.as_mut().filter(|r| r.is_active()) { + if matches!(run.slot_run_state(slot_id), TeamRunSlotState::Busy) { + info!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + slot_id, + role = ?role, + "team_run user wake acquire rejected because slot is busy" + ); + return Err(TeamError::SlotBusy(slot_id.to_owned())); + } + + let source = TeamWakeSource::UserIntervention; + let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + let lease = new_operation_lease(run, slot_id, role.clone(), source, false); + let ack = run.ack(slot_id, role, None); + info!( + team_id = %self.team_id, + team_run_id = %lease.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + accepted_as_new_run = lease.accepted_as_new_run, + "team_run operation lease acquired" + ); + return Ok((ack, lease)); + } + + if let Some(cancelling) = guard.as_ref().filter(|r| matches!(r.status, TeamRunStatus::Cancelling)) { + return Err(TeamError::InvalidRequest(format!( + "team run {} is cancelling", + cancelling.team_run_id + ))); + } + + let mut record = TeamRunRecord { + team_run_id: generate_id(), + team_id: self.team_id.clone(), + target_slot_id: slot_id.to_owned(), + target_role: role.clone(), + status: TeamRunStatus::Accepted, + started_at: None, + completed_at: None, + cancelled_at: None, + cancel_reason: None, + active_child_turns: HashMap::new(), + starting_reservations: HashMap::new(), + pending_wakes: HashMap::new(), + slot_runtime_health: HashMap::new(), + slot_wake_gate: SlotWakeGate::default(), + active_operation_leases: HashMap::new(), + }; + let lease = new_operation_lease(&mut record, slot_id, role.clone(), TeamWakeSource::UserMessage, true); + let ack = record.ack(slot_id, role, None); + let payload = record.payload(); + *guard = Some(record); + drop(guard); + + info!( + team_id = %self.team_id, + team_run_id = %ack.team_run_id, + target_slot_id = %ack.target_slot_id, + target_role = ?ack.target_role, + "team_run accepted" + ); + info!( + team_id = %self.team_id, + team_run_id = %lease.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + accepted_as_new_run = lease.accepted_as_new_run, + "team_run operation lease acquired" + ); + self.emitter.broadcast_team_run(TEAM_RUN_ACCEPTED_EVENT, payload); + Ok((ack, lease)) + } + + pub(crate) async fn commit_operation_lease( + &self, + lease_id: &str, + trigger_message_id: Option, + ) -> Result<(), TeamError> { + let mut guard = self.state.lock().await; + let Some(run) = guard.as_mut().filter(|r| r.is_active()) else { + error!( + team_id = %self.team_id, + lease_id, + "team_run operation lease commit failed because no active run exists" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + let Some(active) = run.active_operation_leases.remove(lease_id) else { + error!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id, + "team_run operation lease commit failed because lease is missing" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + + let lease = active.lease; + push_pending_wake_locked( + run, + lease.slot_id.clone(), + lease.role.clone(), + lease.wake_source, + trigger_message_id, + ); + let payload = run.payload(); + info!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + pending_wake_count = payload.pending_wake_count, + active_operation_lease_count = run.active_operation_lease_count(), + "team_run operation lease committed" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Ok(()) + } + + pub(crate) async fn abort_operation_lease(&self, lease_id: &str, reason: &str) -> Result<(), TeamError> { + let mut guard = self.state.lock().await; + let Some(run) = guard.as_mut().filter(|r| r.is_active()) else { + error!( + team_id = %self.team_id, + lease_id, + reason, + "team_run operation lease abort failed because no active run exists" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + let Some(active) = run.active_operation_leases.remove(lease_id) else { + error!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id, + reason, + "team_run operation lease abort failed because lease is missing" + ); + return Err(TeamError::InvalidRequest(format!( + "team run operation lease missing: {lease_id}" + ))); + }; + let payload = run.payload(); + warn!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + lease_id = %active.lease.lease_id, + slot_id = %active.lease.slot_id, + wake_source = %active.lease.wake_source, + reason, + active_operation_lease_count = run.active_operation_lease_count(), + "team_run operation lease aborted" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Ok(()) + } + + async fn acquire_run_scoped_wake_with_intent( + &self, + slot_id: &str, + role: TeamRunTargetRole, + source: TeamWakeSource, + intent: TeamRunWakeIntent, + ) -> Result { + let mut guard = self.state.lock().await; + let Some(run) = guard.as_mut().filter(|r| r.is_active()) else { + warn!( + team_id = %self.team_id, + slot_id, + role = ?role, + wake_source = %source, + "team_run wake acquire rejected because no active run exists" + ); + return Err(TeamError::InvalidRequest( + "no active team run for run-scoped wake".into(), + )); + }; + + let slot_state = run.slot_run_state(slot_id); + let decision = acquire_policy(source, slot_state, run.has_spawn_welcome_for_slot(slot_id), intent); + match decision { + AcquirePolicyDecision::RejectSlotBusy => Err(TeamError::SlotBusy(slot_id.to_owned())), + AcquirePolicyDecision::RejectInvalid(message) => Err(TeamError::InvalidRequest(message.into())), + AcquirePolicyDecision::Suppress(reason) => { + let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + let payload = run.payload(); + debug!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + slot_id, + wake_source = %source, + slot_state = ?slot_state, + reason, + "team_run wake acquire suppressed" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Ok(TeamRunWakeAcquireOutcome::Suppressed) + } + AcquirePolicyDecision::Accept => { + let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + let lease = new_operation_lease(run, slot_id, role, source, false); + info!( + team_id = %self.team_id, + team_run_id = %lease.team_run_id, + lease_id = %lease.lease_id, + slot_id = %lease.slot_id, + wake_source = %lease.wake_source, + accepted_as_new_run = lease.accepted_as_new_run, + "team_run operation lease acquired" + ); + Ok(TeamRunWakeAcquireOutcome::Accepted(lease)) + } + } + } + + pub(crate) async fn acquire_run_scoped_wake( + &self, + slot_id: &str, + role: TeamRunTargetRole, + source: TeamWakeSource, + ) -> Result { + self.acquire_run_scoped_wake_with_intent(slot_id, role, source, TeamRunWakeIntent::ExternalRequest) + .await + } + + pub(crate) async fn acquire_scheduler_wake( + &self, + slot_id: &str, + role: TeamRunTargetRole, + source: TeamWakeSource, + ) -> Result { + self.acquire_run_scoped_wake_with_intent(slot_id, role, source, TeamRunWakeIntent::SchedulerWakeTarget) + .await + } + pub async fn active_run_id(&self) -> Option { let guard = self.state.lock().await; guard.as_ref().filter(|r| r.is_active()).map(|r| r.team_run_id.clone()) @@ -478,6 +898,7 @@ impl TeamRunManager { .map(|_| ()) } + #[cfg(test)] pub(crate) async fn record_or_suppress_wake( &self, slot_id: &str, @@ -613,6 +1034,8 @@ impl TeamRunManager { slot_id: pending.slot_id.clone(), role, conversation_id: conversation_id.to_owned(), + wake_source: pending.source, + message_id: pending.message_id.clone(), state: StartingReservationState::Starting, }; run.starting_reservations @@ -636,6 +1059,50 @@ impl TeamRunManager { Some(reservation) } + pub async fn retry_child_start_later(&self, reservation_id: &str, reason: &str) -> Option { + let mut guard = self.state.lock().await; + let run = guard.as_mut().filter(|run| run.is_active())?; + let reservation = match run.starting_reservations.remove(reservation_id) { + Some(reservation) => reservation, + None => { + warn!( + team_id = %self.team_id, + reservation_id, + error = %reason, + "team_run reservation retry ignored because reservation is missing" + ); + return None; + } + }; + + let pending = PendingWake { + slot_id: reservation.slot_id.clone(), + role: reservation.role, + source: reservation.wake_source, + message_id: reservation.message_id.clone(), + }; + run.pending_wakes + .entry(reservation.slot_id.clone()) + .or_default() + .push_front(pending); + let payload = run.payload(); + info!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + reservation_id = %reservation.reservation_id, + slot_id = %reservation.slot_id, + error = %reason, + slot_pending_wake_count = run.pending_wake_count_for_slot(&reservation.slot_id), + pending_wake_count = payload.pending_wake_count, + starting_child_count = payload.starting_child_count, + active_child_count = payload.active_child_count, + "team_run reservation deferred for retry" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload.clone()); + Some(payload) + } + pub(crate) async fn peek_next_pending_wake(&self, slot_id: &str) -> Option { let guard = self.state.lock().await; guard @@ -1164,6 +1631,7 @@ fn maybe_complete_locked(run: &mut TeamRunRecord, emitter: &TeamEventEmitter) -> if run.pending_wake_count() > 0 || !run.starting_reservations.is_empty() || !run.active_child_turns.is_empty() + || run.active_operation_lease_count() > 0 || run.has_retained_wake_gate_work() { emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, run.payload()); @@ -1197,6 +1665,7 @@ fn maybe_cancelled_locked(run: &mut TeamRunRecord, emitter: &TeamEventEmitter) - if run.pending_wake_count() > 0 || !run.starting_reservations.is_empty() || !run.active_child_turns.is_empty() + || run.active_operation_lease_count() > 0 || run.has_retained_wake_gate_work() { emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, run.payload()); @@ -1275,6 +1744,293 @@ mod tests { .expect("slot work must exist") } + #[tokio::test] + async fn lease_keeps_run_active_until_commit() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user message should create run and lease"); + + assert_eq!(lease.team_run_id, ack.team_run_id); + assert_eq!(lease.slot_id, "lead"); + assert_eq!(lease.wake_source, TeamWakeSource::UserMessage); + assert!(lease.accepted_as_new_run); + + let completed = manager.maybe_complete().await; + assert!(completed.is_none(), "active lease must retain the run"); + assert_eq!(manager.active_run_id().await.as_deref(), Some(ack.team_run_id.as_str())); + + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-1".into())) + .await + .expect("commit should convert lease to pending wake"); + + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .expect("committed wake should be claimable"); + assert_eq!(reservation.message_id.as_deref(), Some("mailbox-1")); + } + + #[tokio::test] + async fn abort_lease_releases_completion_hold() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user message should create run and lease"); + + manager + .abort_operation_lease(&lease.lease_id, "mailbox_write_failed") + .await + .expect("abort should remove lease"); + + let completed = manager + .maybe_complete() + .await + .expect("run should complete after aborted only lease"); + assert_eq!(completed.team_run_id, ack.team_run_id); + assert_eq!(completed.status, TeamRunStatus::Completed); + } + + #[tokio::test] + async fn commit_missing_lease_returns_internal_consistency_error() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("create active run"); + + let err = manager + .commit_operation_lease("missing-lease", Some("mailbox-1".into())) + .await + .expect_err("missing lease is a contract violation"); + + assert!(matches!( + err, + TeamError::InvalidRequest(message) + if message == "team run operation lease missing: missing-lease" + )); + } + + #[tokio::test] + async fn run_scoped_wake_without_active_run_is_rejected() { + let (manager, _bc) = manager(); + + let err = manager + .acquire_run_scoped_wake("worker", TeamRunTargetRole::Teammate, TeamWakeSource::McpSendMessage) + .await + .expect_err("run-scoped wake must need active run"); + + assert!(matches!( + err, + TeamError::InvalidRequest(message) + if message == "no active team run for run-scoped wake" + )); + } + + #[tokio::test] + async fn user_message_busy_active_slot_is_rejected_without_lease() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-1".into())) + .await + .unwrap(); + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .unwrap(); + manager + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id, + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, + }, + ) + .await; + + let err = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect_err("foreground user message must reject busy active slot"); + + assert!(matches!(err, TeamError::SlotBusy(slot) if slot == "lead")); + let payload = manager.current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 0); + } + + #[tokio::test] + async fn user_message_pending_slot_is_accepted_as_additional_foreground_wake() { + let (manager, _bc) = manager(); + let (_ack, first) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .commit_operation_lease(&first.lease_id, Some("mailbox-1".into())) + .await + .unwrap(); + + let (_ack, second) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("pending foreground message must be accepted"); + manager + .commit_operation_lease(&second.lease_id, Some("mailbox-2".into())) + .await + .unwrap(); + + let payload = manager.current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 2); + } + + #[tokio::test] + async fn mcp_send_message_busy_slot_is_accepted_and_queued() { + let (manager, _bc) = manager(); + let (ack, first) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .commit_operation_lease(&first.lease_id, Some("mailbox-1".into())) + .await + .unwrap(); + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .unwrap(); + manager + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id, + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, + }, + ) + .await; + + let outcome = manager + .acquire_run_scoped_wake("lead", TeamRunTargetRole::Lead, TeamWakeSource::McpSendMessage) + .await + .expect("MCP message to busy slot must be accepted"); + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + panic!("MCP message should not be suppressed"); + }; + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-2".into())) + .await + .unwrap(); + + let payload = manager.current_payload().await.unwrap(); + assert_eq!(payload.pending_wake_count, 1); + } + + #[tokio::test] + async fn paused_slot_suppresses_mcp_send_message_without_lease() { + let (manager, _bc) = manager(); + let (ack, _lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + manager + .pause_slot_work("lead", Some("user stopped".into())) + .await + .unwrap(); + + let outcome = manager + .acquire_run_scoped_wake("lead", TeamRunTargetRole::Lead, TeamWakeSource::McpSendMessage) + .await + .expect("paused background wake suppresses cleanly"); + assert_eq!(outcome, TeamRunWakeAcquireOutcome::Suppressed); + assert_eq!(manager.active_run_id().await.as_deref(), Some(ack.team_run_id.as_str())); + } + + #[tokio::test] + async fn duplicate_spawn_welcome_is_suppressed() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + + let first = manager + .acquire_run_scoped_wake("new-worker", TeamRunTargetRole::Teammate, TeamWakeSource::SpawnWelcome) + .await + .unwrap(); + let TeamRunWakeAcquireOutcome::Accepted(first) = first else { + panic!("first spawn welcome should be accepted"); + }; + manager + .commit_operation_lease(&first.lease_id, Some("welcome-1".into())) + .await + .unwrap(); + + let second = manager + .acquire_run_scoped_wake("new-worker", TeamRunTargetRole::Teammate, TeamWakeSource::SpawnWelcome) + .await + .unwrap(); + assert_eq!(second, TeamRunWakeAcquireOutcome::Suppressed); + } + + #[tokio::test] + async fn idle_notification_idle_slot_is_suppressed_without_scheduler_wake_target() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + + let outcome = manager + .acquire_run_scoped_wake("worker", TeamRunTargetRole::Teammate, TeamWakeSource::IdleNotification) + .await + .expect("generic idle notification should suppress cleanly"); + + assert_eq!(outcome, TeamRunWakeAcquireOutcome::Suppressed); + } + + #[tokio::test] + async fn scheduler_idle_notification_idle_slot_is_accepted() { + let (manager, _bc) = manager(); + manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .unwrap(); + + let outcome = manager + .acquire_scheduler_wake("worker", TeamRunTargetRole::Teammate, TeamWakeSource::IdleNotification) + .await + .expect("scheduler-produced wake target should be accepted"); + let TeamRunWakeAcquireOutcome::Accepted(lease) = outcome else { + panic!("scheduler wake target should produce a lease"); + }; + + manager + .commit_operation_lease(&lease.lease_id, None) + .await + .expect("scheduler wake should commit"); + let reservation = manager + .claim_wake_for_turn("worker", TeamRunTargetRole::Teammate, "conv-worker") + .await + .expect("scheduler wake should be claimable"); + assert_eq!(reservation.wake_source, TeamWakeSource::IdleNotification); + } + #[tokio::test] async fn active_slot_work_includes_backend_slow_fields_after_threshold() { let (manager, _rx) = manager(); diff --git a/crates/aionui-team/src/test_utils.rs b/crates/aionui-team/src/test_utils.rs index 863aa11df..e28ed4a7f 100644 --- a/crates/aionui-team/src/test_utils.rs +++ b/crates/aionui-team/src/test_utils.rs @@ -7,6 +7,7 @@ use std::sync::Mutex; pub struct MockState { pub messages: Vec, pub tasks: Vec, + pub fail_message_writes: bool, } pub struct MockTeamRepo { @@ -19,6 +20,12 @@ impl MockTeamRepo { state: Mutex::new(MockState::default()), } } + + pub fn with_message_write_failure() -> Self { + let repo = Self::new(); + repo.state.lock().unwrap().fail_message_writes = true; + repo + } } #[async_trait::async_trait] @@ -47,7 +54,11 @@ impl ITeamRepository for MockTeamRepo { // ── Mailbox ───────────────────────────────────────────────────── async fn write_message(&self, row: &MailboxMessageRow) -> Result<(), DbError> { - self.state.lock().unwrap().messages.push(row.clone()); + let mut state = self.state.lock().unwrap(); + if state.fail_message_writes { + return Err(DbError::Init("forced mailbox write failure".into())); + } + state.messages.push(row.clone()); Ok(()) } diff --git a/crates/aionui-team/tests/e2e_team_flow.rs b/crates/aionui-team/tests/e2e_team_flow.rs index cc99c3df9..a92cf14da 100644 --- a/crates/aionui-team/tests/e2e_team_flow.rs +++ b/crates/aionui-team/tests/e2e_team_flow.rs @@ -121,6 +121,27 @@ impl AgentTurnExecutionPort for ErrorBeforeStartTurnPort { } } +#[derive(Default)] +struct SkippedBusyTurnPort { + requests: Arc>>, +} + +impl SkippedBusyTurnPort { + fn requests(&self) -> Arc>> { + self.requests.clone() + } +} + +#[async_trait] +impl AgentTurnExecutionPort for SkippedBusyTurnPort { + async fn run_agent_turn(&self, request: AgentTurnRequest) -> Result { + self.requests.lock().unwrap().push(request.clone()); + Err(AgentTurnExecutionError::Skipped { + reason: format!("conversation {} is already running", request.conversation_id), + }) + } +} + struct StartedThenFailedTurnPort; #[async_trait] @@ -698,6 +719,16 @@ async fn wait_for_event(broadcaster: &RecordingBroadcaster, name: &str) { panic!("timed out waiting for event {name}"); } +async fn wait_for_turn_request_count(requests: &Arc>>, count: usize) { + for _ in 0..100 { + if requests.lock().unwrap().len() >= count { + return; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("timed out waiting for {count} turn requests"); +} + fn register_test_event_loops(session: &Arc) { let registry = session.event_loops().clone(); for agent in two_agents() { @@ -1593,6 +1624,51 @@ async fn s9b_event_loop_fails_run_when_turn_errors_before_started() { session.stop(); } +#[tokio::test] +async fn s9b_retryable_busy_before_start_retains_team_run_without_timer_retry() { + let broadcaster = Arc::new(RecordingBroadcaster::new()); + let turn_port = Arc::new(SkippedBusyTurnPort::default()); + let requests = turn_port.requests(); + let session = + setup_session_with_runtime_ports(turn_port, Arc::new(NoopCancellationPort), broadcaster.clone()).await; + + session + .send_message("user input to team", None) + .await + .expect("send_message must succeed"); + + wait_for_turn_request_count(&requests, 1).await; + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + let request_log = requests.lock().unwrap(); + let first_team_run_id = request_log[0] + .team_run_id + .as_deref() + .expect("first attempt should belong to TeamRun") + .to_owned(); + assert_eq!( + request_log.len(), + 1, + "Team event loop should not use a timer to retry retryable busy skips" + ); + drop(request_log); + + assert_eq!( + session.team_run_manager().active_run_id().await, + Some(first_team_run_id.clone()), + "run should remain active for a state-driven retry" + ); + let payload = session + .team_run_manager() + .current_payload() + .await + .expect("retryable busy skip should keep the active run"); + assert_eq!(payload.team_run_id, first_team_run_id); + assert_eq!(payload.pending_wake_count, 1); + assert_eq!(payload.starting_child_count, 0); + + session.stop(); +} + #[tokio::test] async fn s9c_event_loop_fails_run_when_started_turn_returns_failed() { let broadcaster = Arc::new(RecordingBroadcaster::new()); diff --git a/crates/aionui-team/tests/session_service_integration.rs b/crates/aionui-team/tests/session_service_integration.rs index a4eb1c0ff..e8175f813 100644 --- a/crates/aionui-team/tests/session_service_integration.rs +++ b/crates/aionui-team/tests/session_service_integration.rs @@ -570,6 +570,8 @@ struct FullMockTeamRepo { inner: MockTeamRepo, teams: std::sync::Mutex>, fail_workspace_update: std::sync::Mutex, + fail_agent_update: std::sync::Mutex, + fail_message_writes: std::sync::Mutex, } impl FullMockTeamRepo { @@ -578,12 +580,22 @@ impl FullMockTeamRepo { inner: MockTeamRepo::new(), teams: std::sync::Mutex::new(Vec::new()), fail_workspace_update: std::sync::Mutex::new(false), + fail_agent_update: std::sync::Mutex::new(false), + fail_message_writes: std::sync::Mutex::new(false), } } fn fail_workspace_update(&self) { *self.fail_workspace_update.lock().unwrap() = true; } + + fn fail_agent_updates(&self) { + *self.fail_agent_update.lock().unwrap() = true; + } + + fn fail_message_writes(&self) { + *self.fail_message_writes.lock().unwrap() = true; + } } #[async_trait::async_trait] @@ -612,6 +624,9 @@ impl ITeamRepository for FullMockTeamRepo { if params.workspace.is_some() && *self.fail_workspace_update.lock().unwrap() { return Err(DbError::Init("forced workspace writeback failure".into())); } + if params.agents.is_some() && *self.fail_agent_update.lock().unwrap() { + return Err(DbError::Init("forced agent update failure".into())); + } let mut teams = self.teams.lock().unwrap(); let team = teams .iter_mut() @@ -638,6 +653,9 @@ impl ITeamRepository for FullMockTeamRepo { } async fn write_message(&self, row: &aionui_db::models::MailboxMessageRow) -> Result<(), DbError> { + if *self.fail_message_writes.lock().unwrap() { + return Err(DbError::Init("forced mailbox write failure".into())); + } self.inner.write_message(row).await } async fn read_unread_and_mark( @@ -2341,6 +2359,94 @@ async fn spawn_agent_in_session_rejects_without_active_team_run_before_persistin ); } +#[tokio::test] +async fn spawn_agent_in_session_aborts_lease_when_persistence_fails() { + let (svc, team_repo, _, _) = setup_with_factory_metadata_team_repo_and_conversation_repo( + success_factory(), + Arc::new(StubAgentMetadataRepo::empty()), + ); + let created = svc + .create_team( + "user1", + CreateTeamRequest { + name: "Alpha".into(), + agents: two_agent_input(), + workspace: None, + }, + ) + .await + .expect("create team"); + svc.ensure_session("user1", &created.id).await.unwrap(); + svc.send_message("user1", &created.id, "start active run", None) + .await + .expect("active run"); + team_repo.fail_agent_updates(); + + let lead_slot_id = created.lead_agent_id.clone().unwrap(); + let req = SpawnAgentRequest { + name: "Helper".into(), + agent_type: Some("claude".into()), + custom_agent_id: None, + model: Some("claude-sonnet-4".into()), + }; + + let err = svc + .spawn_agent_in_session(&created.id, &lead_slot_id, req) + .await + .expect_err("forced agent persistence failure should fail spawn"); + assert!(err.to_string().contains("forced agent update failure")); + + let after = svc.get_team("user1", &created.id).await.unwrap(); + assert!( + after.agents.iter().all(|agent| agent.name != "Helper"), + "failed spawn must not persist helper after aborted spawn lease" + ); +} + +#[tokio::test] +async fn spawn_agent_in_session_compensates_when_welcome_mailbox_write_fails() { + let (svc, team_repo, _, _) = setup_with_factory_metadata_team_repo_and_conversation_repo( + success_factory(), + Arc::new(StubAgentMetadataRepo::empty()), + ); + let created = svc + .create_team( + "user1", + CreateTeamRequest { + name: "Alpha".into(), + agents: two_agent_input(), + workspace: None, + }, + ) + .await + .expect("create team"); + svc.ensure_session("user1", &created.id).await.unwrap(); + svc.send_message("user1", &created.id, "start active run", None) + .await + .expect("active run"); + team_repo.fail_message_writes(); + + let lead_slot_id = created.lead_agent_id.clone().unwrap(); + let req = SpawnAgentRequest { + name: "Helper".into(), + agent_type: Some("claude".into()), + custom_agent_id: None, + model: Some("claude-sonnet-4".into()), + }; + + let err = svc + .spawn_agent_in_session(&created.id, &lead_slot_id, req) + .await + .expect_err("welcome mailbox write failure should fail spawn"); + assert!(err.to_string().contains("forced mailbox write failure")); + + let after = svc.get_team("user1", &created.id).await.unwrap(); + assert!( + after.agents.iter().all(|agent| agent.name != "Helper"), + "compensation must remove persisted helper after welcome write failure" + ); +} + #[tokio::test] async fn es2_ensure_session_is_idempotent() { let svc = setup();