From 1e192d398b07250531eca9c7e276d38a3c13788a Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 15:26:44 +0800 Subject: [PATCH 1/5] feat(team): add queued send message payload types --- crates/aionui-api-types/src/lib.rs | 4 +- crates/aionui-api-types/src/team.rs | 135 ++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 1 deletion(-) diff --git a/crates/aionui-api-types/src/lib.rs b/crates/aionui-api-types/src/lib.rs index b1e2444d..647faac6 100644 --- a/crates/aionui-api-types/src/lib.rs +++ b/crates/aionui-api-types/src/lib.rs @@ -151,7 +151,9 @@ pub use team::{ TeamAgentRemovedPayload, TeamAgentRenamedPayload, TeamAgentResponse, TeamAgentSpawnedPayload, TeamAgentStatusPayload, TeamChildTurnPayload, TeamListResponse, TeamMcpPhase, TeamMcpRuntimeConfig, TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole, - TeamRuntimeSeed, TeamSessionBinding, TeamSlotWorkPayload, TeammateMessagePayload, + TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason, + TeamSendMessageStatus, TeamSendMessageTargetQueueState, TeamSessionBinding, TeamSlotRuntimeHealth, + TeamSlotWorkPayload, TeammateMessagePayload, }; pub use team_mcp::{GuideMcpConfig, TEAM_MCP_SERVER_NAME, TeamMcpStdioConfig}; pub use websocket::WebSocketMessage; diff --git a/crates/aionui-api-types/src/team.rs b/crates/aionui-api-types/src/team.rs index 9e784052..4418a2ee 100644 --- a/crates/aionui-api-types/src/team.rs +++ b/crates/aionui-api-types/src/team.rs @@ -234,6 +234,13 @@ pub struct TeamRunAckResponse { pub message_id: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TeamSlotRuntimeHealth { + Disconnected, + Unhealthy, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TeamSlotWorkPayload { pub slot_id: String, @@ -246,6 +253,16 @@ pub struct TeamSlotWorkPayload { pub suppressed_wake_count: usize, #[serde(default, skip_serializing_if = "Option::is_none")] pub active_turn_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active_turn_started_at_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active_turn_elapsed_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active_turn_slow: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active_turn_slow_threshold_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub runtime_health: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -273,6 +290,57 @@ pub struct TeamChildTurnPayload { pub status: TeamRunStatus, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TeamSendMessageStatus { + Queued, + Rejected, + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TeamSendMessageDelivery { + WakeRecorded, + WakeSuppressed, + NotRecorded, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TeamSendMessageReason { + QueuedForIdle, + BehindStartingTurn, + BehindActiveTurn, + SuppressedByPause, + NoActiveTeamRun, + TargetNotFound, + TargetDisconnected, + TargetUnhealthy, + InternalError, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct TeamSendMessageTargetQueueState { + pub slot_id: String, + pub role: TeamRunTargetRole, + pub queue_state: TeamSendMessageReason, + pub pending_wake_count: usize, + pub starting_child_count: usize, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active_turn_id: Option, + pub suppressed_wake_count: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct TeamSendMessageQueuedResponse { + pub status: TeamSendMessageStatus, + pub delivery: TeamSendMessageDelivery, + pub reason: TeamSendMessageReason, + pub team_run_id: String, + pub targets: Vec, +} + // --------------------------------------------------------------------------- // E. Team management — Response DTOs // --------------------------------------------------------------------------- @@ -1117,6 +1185,11 @@ mod tests { paused: false, suppressed_wake_count: 0, active_turn_id: Some("turn-lead".into()), + active_turn_started_at_ms: None, + active_turn_elapsed_ms: None, + active_turn_slow: None, + active_turn_slow_threshold_ms: None, + runtime_health: None, }, TeamSlotWorkPayload { slot_id: "worker-1".into(), @@ -1126,6 +1199,11 @@ mod tests { paused: false, suppressed_wake_count: 0, active_turn_id: None, + active_turn_started_at_ms: None, + active_turn_elapsed_ms: None, + active_turn_slow: None, + active_turn_slow_threshold_ms: None, + runtime_health: None, }, ], }; @@ -1159,6 +1237,11 @@ mod tests { active_turn_id: None, paused: true, suppressed_wake_count: 2, + active_turn_started_at_ms: None, + active_turn_elapsed_ms: None, + active_turn_slow: None, + active_turn_slow_threshold_ms: None, + runtime_health: None, }; let value = serde_json::to_value(&payload).unwrap(); @@ -1168,6 +1251,58 @@ mod tests { assert!(value.get(format!("{}_pending_count", "background")).is_none()); } + #[test] + fn team_slot_work_payload_serializes_active_turn_slow_fields() { + let payload = TeamSlotWorkPayload { + slot_id: "worker-1".into(), + role: TeamRunTargetRole::Teammate, + pending_wake_count: 0, + starting_child_count: 0, + paused: false, + suppressed_wake_count: 0, + active_turn_id: Some("turn-worker".into()), + active_turn_started_at_ms: Some(1_000), + active_turn_elapsed_ms: Some(600_001), + active_turn_slow: Some(true), + active_turn_slow_threshold_ms: Some(600_000), + runtime_health: Some(TeamSlotRuntimeHealth::Unhealthy), + }; + + let value = serde_json::to_value(payload).unwrap(); + + assert_eq!(value["active_turn_started_at_ms"], 1_000); + assert_eq!(value["active_turn_elapsed_ms"], 600_001); + assert_eq!(value["active_turn_slow"], true); + assert_eq!(value["active_turn_slow_threshold_ms"], 600_000); + assert_eq!(value["runtime_health"], "unhealthy"); + } + + #[test] + fn team_send_message_queued_response_serializes_stable_contract() { + let response = TeamSendMessageQueuedResponse { + status: TeamSendMessageStatus::Queued, + delivery: TeamSendMessageDelivery::WakeRecorded, + reason: TeamSendMessageReason::BehindActiveTurn, + team_run_id: "run-1".into(), + targets: vec![TeamSendMessageTargetQueueState { + slot_id: "worker-1".into(), + role: TeamRunTargetRole::Teammate, + queue_state: TeamSendMessageReason::BehindActiveTurn, + pending_wake_count: 1, + starting_child_count: 0, + active_turn_id: Some("turn-worker".into()), + suppressed_wake_count: 0, + }], + }; + + let value = serde_json::to_value(response).unwrap(); + + assert_eq!(value["status"], "queued"); + assert_eq!(value["delivery"], "wake_recorded"); + assert_eq!(value["reason"], "behind_active_turn"); + assert_eq!(value["targets"][0]["queue_state"], "behind_active_turn"); + } + #[test] fn team_slot_work_payload_defaults_pause_fields_for_old_payloads() { let decoded: TeamSlotWorkPayload = serde_json::from_value(serde_json::json!({ From 295787efc04e49c92c4bf04a4a696991d6ae0ac7 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 15:31:50 +0800 Subject: [PATCH 2/5] feat(team): derive slow state for active child turns --- crates/aionui-team/src/event_loop.rs | 2 + crates/aionui-team/src/session.rs | 14 ++- crates/aionui-team/src/team_run.rs | 178 ++++++++++++++++++++++++++- 3 files changed, 190 insertions(+), 4 deletions(-) diff --git a/crates/aionui-team/src/event_loop.rs b/crates/aionui-team/src/event_loop.rs index d684b78f..ce271975 100644 --- a/crates/aionui-team/src/event_loop.rs +++ b/crates/aionui-team/src/event_loop.rs @@ -267,6 +267,8 @@ async fn execute_turn(ctx: &AgentLoopContext, input: &crate::session::WakeInput) role: started.role, conversation_id: started.conversation_id, turn_id: started.turn_id, + started_at_ms: aionui_common::now_ms(), + last_slow_notified_at_ms: None, }; match team_run_manager .record_child_started(&reservation_id, child.clone()) diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index fa219bce..3d75acd1 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -1175,7 +1175,7 @@ mod tests { use aionui_ai_agent::agent_task::AgentInstance; use aionui_ai_agent::types::BuildTaskOptions; use aionui_api_types::WebSocketMessage; - use aionui_common::{AgentKillReason, TimestampMs}; + use aionui_common::{AgentKillReason, TimestampMs, now_ms}; use std::sync::{Arc, Mutex}; struct NullBroadcaster; @@ -1593,6 +1593,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "c1".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }; let worker = ActiveChildTurn { team_run_id: ack.team_run_id.clone(), @@ -1600,6 +1602,8 @@ mod tests { role: TeamRunTargetRole::Teammate, conversation_id: "c2".into(), turn_id: "turn-worker".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }; session @@ -1673,6 +1677,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "c1".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1697,6 +1703,8 @@ mod tests { role: TeamRunTargetRole::Teammate, conversation_id: "c2".into(), turn_id: "turn-worker".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1767,6 +1775,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "c1".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1822,6 +1832,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "c1".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index 522e7f14..16c50bcf 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -2,7 +2,8 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use aionui_api_types::{ - TeamChildTurnPayload, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole, TeamSlotWorkPayload, + TeamChildTurnPayload, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole, TeamSlotRuntimeHealth, + TeamSlotWorkPayload, }; use aionui_common::{TimestampMs, generate_id, now_ms}; use tokio::sync::Mutex; @@ -18,6 +19,9 @@ use crate::slot_wake_gate::{SlotWakeGate, WakeGateDecision}; use crate::types::TeammateRole; use crate::wake::TeamWakeSource; +const ACTIVE_CHILD_SLOW_THRESHOLD_MS: u64 = 10 * 60 * 1000; +const ACTIVE_CHILD_SLOW_REPEAT_MS: u64 = 10 * 60 * 1000; + #[derive(Debug, Clone, PartialEq, Eq)] pub struct ActiveChildTurn { pub team_run_id: String, @@ -25,6 +29,8 @@ pub struct ActiveChildTurn { pub role: TeamRunTargetRole, pub conversation_id: String, pub turn_id: String, + pub started_at_ms: TimestampMs, + pub last_slow_notified_at_ms: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -97,6 +103,7 @@ struct TeamRunRecord { active_child_turns: HashMap, starting_reservations: HashMap, pending_wakes: HashMap>, + slot_runtime_health: HashMap, slot_wake_gate: SlotWakeGate, } @@ -137,6 +144,7 @@ impl TeamRunRecord { } fn slot_work(&self) -> Vec { + let now = now_ms(); let mut slot_ids = self .pending_wakes .keys() @@ -147,6 +155,7 @@ impl TeamRunRecord { .map(|reservation| reservation.slot_id.clone()), ) .chain(self.active_child_turns.keys().cloned()) + .chain(self.slot_runtime_health.keys().cloned()) .chain(self.slot_wake_gate.slot_ids().cloned()) .collect::>(); slot_ids.sort(); @@ -157,12 +166,25 @@ impl TeamRunRecord { .filter_map(|slot_id| { let role = self.role_for_slot(&slot_id)?; let gate = self.slot_wake_gate.snapshot_for_slot(&slot_id); + let active_child = self.active_child_turns.get(&slot_id); + let active_elapsed_ms = active_child.map(|child| { + child + .last_slow_notified_at_ms + .unwrap_or(now) + .saturating_sub(child.started_at_ms) + .max(0) as u64 + }); Some(TeamSlotWorkPayload { pending_wake_count: self.pending_wake_count_for_slot(&slot_id), starting_child_count: self.starting_child_count_for_slot(&slot_id), paused: gate.paused, suppressed_wake_count: gate.suppressed_wake_count, - active_turn_id: self.active_child_turns.get(&slot_id).map(|child| child.turn_id.clone()), + active_turn_id: active_child.map(|child| child.turn_id.clone()), + active_turn_started_at_ms: active_child.map(|child| child.started_at_ms), + active_turn_elapsed_ms: active_elapsed_ms, + active_turn_slow: active_elapsed_ms.map(|elapsed| elapsed >= ACTIVE_CHILD_SLOW_THRESHOLD_MS), + active_turn_slow_threshold_ms: active_child.map(|_| ACTIVE_CHILD_SLOW_THRESHOLD_MS), + runtime_health: self.slot_runtime_health.get(&slot_id).cloned(), slot_id, role, }) @@ -286,6 +308,7 @@ impl TeamRunManager { active_child_turns: HashMap::new(), starting_reservations: HashMap::new(), pending_wakes: HashMap::new(), + slot_runtime_health: HashMap::new(), slot_wake_gate: SlotWakeGate::default(), }; let ack = record.ack(target_slot_id, target_role, message_id); @@ -324,7 +347,63 @@ impl TeamRunManager { pub async fn current_payload(&self) -> Option { let guard = self.state.lock().await; - guard.as_ref().map(TeamRunRecord::payload) + guard.as_ref().filter(|run| run.is_active()).map(TeamRunRecord::payload) + } + + pub async fn mark_slot_runtime_health( + &self, + slot_id: &str, + health: TeamSlotRuntimeHealth, + ) -> Option { + let mut guard = self.state.lock().await; + let run = guard.as_mut().filter(|run| run.is_active())?; + run.slot_runtime_health.insert(slot_id.to_owned(), health); + let payload = run.payload(); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload.clone()); + Some(payload) + } + + pub async fn observe_slow_child_turns(&self, now: TimestampMs) -> Option { + let mut guard = self.state.lock().await; + let run = guard.as_mut().filter(|run| run.is_active())?; + let mut observed = false; + + for child in run.active_child_turns.values_mut() { + let elapsed_ms = now.saturating_sub(child.started_at_ms).max(0) as u64; + if elapsed_ms < ACTIVE_CHILD_SLOW_THRESHOLD_MS { + continue; + } + let due = child + .last_slow_notified_at_ms + .map(|last| now.saturating_sub(last).max(0) as u64 >= ACTIVE_CHILD_SLOW_REPEAT_MS) + .unwrap_or(true); + if !due { + continue; + } + child.last_slow_notified_at_ms = Some(now); + observed = true; + info!( + team_id = %self.team_id, + team_run_id = %child.team_run_id, + slot_id = %child.slot_id, + role = ?child.role, + conversation_id = %child.conversation_id, + turn_id = %child.turn_id, + elapsed_ms, + slow_threshold_ms = ACTIVE_CHILD_SLOW_THRESHOLD_MS, + "team_child_turn slow" + ); + } + + if !observed { + return None; + } + + let payload = run.payload(); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload.clone()); + Some(payload) } pub async fn pause_slot_work(&self, slot_id: &str, reason: Option) -> Result { @@ -498,6 +577,7 @@ impl TeamRunManager { if run.pending_wakes.get(slot_id).is_some_and(VecDeque::is_empty) { run.pending_wakes.remove(slot_id); } + run.slot_runtime_health.remove(slot_id); if pending.slot_id != slot_id { warn!( team_id = %self.team_id, @@ -1187,6 +1267,76 @@ mod tests { .expect("slot work must exist") } + #[tokio::test] + async fn active_slot_work_includes_backend_slow_fields_after_threshold() { + let (manager, _rx) = manager(); + let ack = manager + .accept_user_message("lead", TeamRunTargetRole::Lead, true, Some("msg-1".into())) + .await + .unwrap(); + manager + .record_pending_wake("lead", TeamRunTargetRole::Lead, TeamWakeSource::UserMessage) + .await + .unwrap(); + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .unwrap(); + + let child = ActiveChildTurn { + team_run_id: ack.team_run_id.clone(), + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + started_at_ms: 10_000, + last_slow_notified_at_ms: None, + }; + manager.record_child_started(&reservation.reservation_id, child).await; + + manager.observe_slow_child_turns(610_001).await; + let payload = manager.current_payload().await.unwrap(); + let lead = slot_work(&payload, "lead"); + + assert_eq!(lead.active_turn_started_at_ms, Some(10_000)); + assert_eq!(lead.active_turn_elapsed_ms, Some(600_001)); + assert_eq!(lead.active_turn_slow, Some(true)); + assert_eq!(lead.active_turn_slow_threshold_ms, Some(600_000)); + assert_eq!(lead.runtime_health, None); + } + + #[tokio::test] + async fn slow_observation_is_rate_limited_per_active_child() { + let (manager, _rx) = manager(); + let ack = manager + .accept_user_message("lead", TeamRunTargetRole::Lead, true, Some("msg-1".into())) + .await + .unwrap(); + manager + .record_pending_wake("lead", TeamRunTargetRole::Lead, TeamWakeSource::UserMessage) + .await + .unwrap(); + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .unwrap(); + + let child = ActiveChildTurn { + team_run_id: ack.team_run_id.clone(), + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + started_at_ms: 0, + last_slow_notified_at_ms: None, + }; + manager.record_child_started(&reservation.reservation_id, child).await; + + assert!(manager.observe_slow_child_turns(600_001).await.is_some()); + assert!(manager.observe_slow_child_turns(900_000).await.is_none()); + assert!(manager.observe_slow_child_turns(1_200_001).await.is_some()); + } + #[tokio::test] async fn cancel_run_clears_paused_gate_and_reaches_cancelled_terminal() { let (manager, bc) = manager(); @@ -1254,6 +1404,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv-lead".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1295,6 +1447,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv-lead".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1338,6 +1492,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv-lead".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }; manager .record_child_started(&reservation.reservation_id, child.clone()) @@ -1500,6 +1656,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv-lead".into(), turn_id: "turn-user".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await, @@ -1532,6 +1690,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv-lead".into(), turn_id: "turn-background".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await, @@ -1626,6 +1786,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv-lead".into(), turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1803,6 +1965,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv".into(), turn_id: "turn".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1867,6 +2031,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv".into(), turn_id: "turn".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -1905,6 +2071,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv".into(), turn_id: "turn".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await, @@ -1979,6 +2147,8 @@ mod tests { role: TeamRunTargetRole::Teammate, conversation_id: "conv-worker".into(), turn_id: "turn-worker".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; @@ -2015,6 +2185,8 @@ mod tests { role: TeamRunTargetRole::Lead, conversation_id: "conv".into(), turn_id: "late-turn".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, }, ) .await; From fe3ea18eea3e45a79f7774f22618a6acdf6af58c Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 15:39:00 +0800 Subject: [PATCH 3/5] feat(team): return queued state from team_send_message --- crates/aionui-team/src/mcp/server.rs | 58 ++++- crates/aionui-team/src/service.rs | 6 +- crates/aionui-team/src/session.rs | 203 +++++++++++++++++- crates/aionui-team/src/team_run.rs | 8 + .../tests/mcp_server_integration.rs | 6 +- 5 files changed, 262 insertions(+), 19 deletions(-) diff --git a/crates/aionui-team/src/mcp/server.rs b/crates/aionui-team/src/mcp/server.rs index 71cbf954..59ca2389 100644 --- a/crates/aionui-team/src/mcp/server.rs +++ b/crates/aionui-team/src/mcp/server.rs @@ -1,7 +1,9 @@ use std::net::SocketAddr; use std::sync::{Arc, Weak}; -use aionui_api_types::{TeamMcpPhase, TeamMcpStatusPayload, WebSocketMessage}; +use aionui_api_types::{ + TeamMcpPhase, TeamMcpStatusPayload, TeamSendMessageQueuedResponse, TeamSendMessageStatus, WebSocketMessage, +}; use aionui_realtime::EventBroadcaster; use serde_json::{Value, json}; use tokio::net::{TcpListener, TcpStream}; @@ -12,7 +14,7 @@ use crate::error::TeamError; use crate::events::TEAM_MCP_STATUS_EVENT; use crate::scheduler::TeammateManager; use crate::service::TeamSessionService; -use crate::session::SpawnAgentRequest; +use crate::session::{AgentMessageQueueResult, SpawnAgentRequest}; use crate::types::{TeammateRole, TeammateStatus}; use crate::wake::TeamWakeSource; @@ -562,14 +564,33 @@ async fn exec_send_message( } else { vec![resolved_to.clone()] }; + let mut target_results = Vec::with_capacity(targets.len()); for target in &targets { - service + let result = service .send_agent_message_from_agent(team_id, caller_slot_id, target, &input.message) .await .map_err(|e| e.to_string())?; + target_results.push(result); } - Ok(format!("Message sent to {}", input.to)) + let response = build_send_message_queued_response(target_results)?; + + serde_json::to_string(&response).map_err(|e| format!("Serialization error: {e}")) +} + +fn build_send_message_queued_response( + target_results: Vec, +) -> Result { + let first = target_results + .first() + .ok_or_else(|| "No message targets resolved".to_string())?; + Ok(TeamSendMessageQueuedResponse { + status: TeamSendMessageStatus::Queued, + delivery: first.delivery.clone(), + reason: first.target.queue_state.clone(), + team_run_id: first.team_run_id.clone(), + targets: target_results.into_iter().map(|result| result.target).collect(), + }) } async fn exec_spawn_agent( @@ -888,6 +909,35 @@ async fn http_mcp_loop( #[cfg(test)] mod tests { use super::*; + use aionui_api_types::{ + TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, TeamSendMessageTargetQueueState, + }; + + #[test] + fn build_send_message_queued_response_serializes_json_contract() { + let response = build_send_message_queued_response(vec![AgentMessageQueueResult { + team_run_id: "run-1".into(), + delivery: TeamSendMessageDelivery::WakeRecorded, + target: TeamSendMessageTargetQueueState { + slot_id: "worker-1".into(), + role: TeamRunTargetRole::Teammate, + queue_state: TeamSendMessageReason::QueuedForIdle, + pending_wake_count: 1, + starting_child_count: 0, + active_turn_id: None, + suppressed_wake_count: 0, + }, + }]) + .unwrap(); + + let text = serde_json::to_string(&response).unwrap(); + let payload: serde_json::Value = serde_json::from_str(&text).expect("team_send_message result must be JSON"); + assert_eq!(payload["status"], "queued"); + assert_eq!(payload["delivery"], "wake_recorded"); + assert_eq!(payload["reason"], "queued_for_idle"); + assert_eq!(payload["targets"][0]["slot_id"], "worker-1"); + assert_eq!(payload["targets"][0]["queue_state"], "queued_for_idle"); + } /// Non-Lead callers are rejected at the dispatch layer with the /// "Only Lead ..." phrasing. Service weak is never upgraded because diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index 5758958e..a831a153 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -24,7 +24,7 @@ use crate::ports::{ AgentTurnCancellationPort, AgentTurnExecutionPort, TeamConversationBindingLookup, TeamConversationLookupPort, }; use crate::provisioning::{TeamAgentProvisioner, TeamConversationProvisioningPort}; -use crate::session::TeamSession; +use crate::session::{AgentMessageQueueResult, TeamSession}; use crate::types::{Team, TeamAgent, TeammateRole}; use crate::wake::TeamWakeSource; use crate::workspace::validate_create_workspace_path; @@ -894,13 +894,13 @@ impl TeamSessionService { .await } - pub async fn send_agent_message_from_agent( + pub(crate) async fn send_agent_message_from_agent( &self, team_id: &str, from_slot_id: &str, to_slot_id: &str, content: &str, - ) -> Result<(), TeamError> { + ) -> Result { self.require_active_team_run_for_team_work(team_id).await?; let session = { let entry = self diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index 3d75acd1..8113270f 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -2,7 +2,10 @@ use std::path::PathBuf; use std::sync::{Arc, Weak}; use aionui_ai_agent::IWorkerTaskManager; -use aionui_api_types::{TeamRunAckResponse, TeamRunTargetRole}; +use aionui_api_types::{ + TeamRunAckResponse, TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, + TeamSendMessageTargetQueueState, TeamSlotRuntimeHealth, TeamSlotWorkPayload, +}; use aionui_common::AgentKillReason; use aionui_db::ITeamRepository; use aionui_realtime::EventBroadcaster; @@ -48,6 +51,13 @@ pub struct WakeInput { pub(crate) trigger_message_id: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct AgentMessageQueueResult { + pub team_run_id: String, + pub delivery: TeamSendMessageDelivery, + pub target: TeamSendMessageTargetQueueState, +} + /// Input for [`TeamSession::spawn_agent`]. Populated by the lead agent when /// it calls the `spawn_agent` MCP tool. #[derive(Debug, Clone)] @@ -485,7 +495,7 @@ impl TeamSession { from_slot_id: &str, to_slot_id: &str, content: &str, - ) -> Result<(), TeamError> { + ) -> Result { let to_agent = self.scheduler.get_agent(to_slot_id).await?; let from_agent = self.scheduler.get_agent(from_slot_id).await?; @@ -534,12 +544,78 @@ impl TeamSession { ); } - self.wake_agent_for_team_work( - to_slot_id, - TeamWakeSource::McpSendMessage, - Some(mailbox_message.id.clone()), - ) - .await + let to_role = target_role_for(to_agent.role); + let decision = self + .team_run_manager + .record_or_suppress_wake( + to_slot_id, + to_role.clone(), + TeamWakeSource::McpSendMessage, + Some(mailbox_message.id.clone()), + ) + .await?; + let target_event_loop_registered = self.event_loops.has(to_slot_id); + if matches!(decision, WakeRecordDecision::Recorded) && !target_event_loop_registered { + warn!( + team_id = %self.team.id, + slot_id = to_slot_id, + target_role = ?to_role, + wake_source = %TeamWakeSource::McpSendMessage, + "team wake recorded but event loop is not registered; pending wake retained" + ); + self.team_run_manager + .mark_slot_runtime_health(to_slot_id, TeamSlotRuntimeHealth::Unhealthy) + .await; + } + + let (team_run_id, work) = self + .team_run_manager + .slot_work_for_slot(to_slot_id) + .await + .ok_or_else(|| TeamError::InvalidRequest(format!("no active team run work for slot {to_slot_id}")))?; + let delivery = match decision { + WakeRecordDecision::Recorded => TeamSendMessageDelivery::WakeRecorded, + WakeRecordDecision::Suppressed => TeamSendMessageDelivery::WakeSuppressed, + }; + let queue_state = classify_send_message_queue_state( + &work, + target_event_loop_registered || !matches!(delivery, TeamSendMessageDelivery::WakeRecorded), + ); + let target = TeamSendMessageTargetQueueState { + slot_id: work.slot_id, + role: work.role, + queue_state, + pending_wake_count: work.pending_wake_count, + starting_child_count: work.starting_child_count, + active_turn_id: work.active_turn_id, + suppressed_wake_count: work.suppressed_wake_count, + }; + + info!( + team_id = %self.team.id, + team_run_id = %team_run_id, + caller_slot_id = from_slot_id, + target_slot_id = to_slot_id, + target_role = ?target.role, + wake_source = %TeamWakeSource::McpSendMessage, + message_id = %mailbox_message.id, + slot_pending_wake_count = target.pending_wake_count, + starting_child_count = target.starting_child_count, + active_turn_id = ?target.active_turn_id.as_deref(), + suppressed_wake_count = target.suppressed_wake_count, + reason = ?target.queue_state, + "team_agent_message_queued" + ); + + if matches!(decision, WakeRecordDecision::Recorded) && target_event_loop_registered { + self.notify_reserved_wake_for_team_work(to_slot_id, to_role, TeamWakeSource::McpSendMessage); + } + + Ok(AgentMessageQueueResult { + team_run_id, + delivery, + target, + }) } pub(crate) async fn wake_agent_for_team_work( @@ -1165,16 +1241,39 @@ impl TeamSession { } } +fn classify_send_message_queue_state( + work: &TeamSlotWorkPayload, + target_event_loop_registered: bool, +) -> TeamSendMessageReason { + if matches!(work.runtime_health, Some(TeamSlotRuntimeHealth::Disconnected)) { + return TeamSendMessageReason::TargetDisconnected; + } + if !target_event_loop_registered || matches!(work.runtime_health, Some(TeamSlotRuntimeHealth::Unhealthy)) { + return TeamSendMessageReason::TargetUnhealthy; + } + if work.paused && work.suppressed_wake_count > 0 { + return TeamSendMessageReason::SuppressedByPause; + } + if work.active_turn_id.is_some() { + return TeamSendMessageReason::BehindActiveTurn; + } + if work.starting_child_count > 0 { + return TeamSendMessageReason::BehindStartingTurn; + } + TeamSendMessageReason::QueuedForIdle +} + #[cfg(test)] mod tests { use super::*; + use crate::event_loop::AgentLoopContext; use crate::team_run::ActiveChildTurn; use crate::test_utils::MockTeamRepo; use crate::types::{Team, TeamAgent, TeammateRole}; use aionui_ai_agent::AgentError; use aionui_ai_agent::agent_task::AgentInstance; use aionui_ai_agent::types::BuildTaskOptions; - use aionui_api_types::WebSocketMessage; + use aionui_api_types::{TeamSendMessageDelivery, TeamSendMessageReason, WebSocketMessage}; use aionui_common::{AgentKillReason, TimestampMs, now_ms}; use std::sync::{Arc, Mutex}; @@ -1465,6 +1564,26 @@ mod tests { .unwrap() } + async fn start_session_arc() -> Arc { + Arc::new(start_session().await) + } + + fn register_test_event_loop(session: &Arc, slot_id: &str) { + session.event_loops().spawn( + slot_id, + AgentLoopContext { + team_id: session.team_id().to_owned(), + slot_id: slot_id.to_owned(), + user_id: session.user_id().to_owned(), + session: session.clone(), + scheduler: session.scheduler().clone(), + mailbox: session.mailbox().clone(), + turn_port: session.turn_port().clone(), + registry: session.event_loops().clone(), + }, + ); + } + async fn start_session_with_projection_store(store: Arc) -> TeamSession { let repo: Arc = Arc::new(MockTeamRepo::new()); let broadcaster: Arc = Arc::new(NullBroadcaster); @@ -1535,6 +1654,72 @@ mod tests { session.stop(); } + #[tokio::test] + async fn agent_message_to_idle_target_returns_queued_for_idle() { + let session = start_session_arc().await; + register_test_event_loop(&session, "worker-1"); + let ack = session.send_message("start", None).await.unwrap(); + + let result = session + .send_agent_message_from_agent("lead-1", "worker-1", "Do the implementation") + .await + .unwrap(); + + assert_eq!(result.team_run_id, ack.team_run_id); + assert_eq!(result.delivery, TeamSendMessageDelivery::WakeRecorded); + assert_eq!(result.target.queue_state, TeamSendMessageReason::QueuedForIdle); + assert_eq!(result.target.pending_wake_count, 1); + session.event_loops().shutdown(); + session.stop(); + } + + #[tokio::test] + async fn agent_message_to_active_target_returns_behind_active_turn() { + let session = start_session_arc().await; + register_test_event_loop(&session, "worker-1"); + let ack = session + .team_run_manager() + .accept_user_message("worker-1", TeamRunTargetRole::Teammate, false, None) + .await + .unwrap(); + session + .team_run_manager() + .record_pending_wake("worker-1", TeamRunTargetRole::Teammate, TeamWakeSource::UserMessage) + .await + .unwrap(); + let reservation = session + .team_run_manager() + .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "conv-worker") + .await + .unwrap(); + session + .team_run_manager() + .record_child_started( + &reservation.reservation_id, + ActiveChildTurn { + team_run_id: ack.team_run_id.clone(), + slot_id: "worker-1".into(), + role: TeamRunTargetRole::Teammate, + conversation_id: "conv-worker".into(), + turn_id: "turn-worker".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, + }, + ) + .await; + + let result = session + .send_agent_message_from_agent("lead-1", "worker-1", "Second item") + .await + .unwrap(); + + assert_eq!(result.target.queue_state, TeamSendMessageReason::BehindActiveTurn); + assert_eq!(result.target.pending_wake_count, 1); + assert_eq!(result.target.active_turn_id.as_deref(), Some("turn-worker")); + session.event_loops().shutdown(); + session.stop(); + } + #[tokio::test] async fn leader_message_reuses_active_run_when_leader_slot_is_free() { let session = start_session().await; diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index 16c50bcf..672aa7c6 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -350,6 +350,14 @@ impl TeamRunManager { guard.as_ref().filter(|run| run.is_active()).map(TeamRunRecord::payload) } + pub(crate) async fn slot_work_for_slot(&self, slot_id: &str) -> Option<(String, TeamSlotWorkPayload)> { + let guard = self.state.lock().await; + let run = guard.as_ref().filter(|run| run.is_active())?; + let payload = run.payload(); + let work = payload.slot_work.iter().find(|work| work.slot_id == slot_id).cloned()?; + Some((payload.team_run_id, work)) + } + pub async fn mark_slot_runtime_health( &self, slot_id: &str, diff --git a/crates/aionui-team/tests/mcp_server_integration.rs b/crates/aionui-team/tests/mcp_server_integration.rs index 6efdf794..07c26b41 100644 --- a/crates/aionui-team/tests/mcp_server_integration.rs +++ b/crates/aionui-team/tests/mcp_server_integration.rs @@ -336,7 +336,7 @@ async fn ts3_send_message_to_nonexistent_agent() { } #[tokio::test] -async fn ts_shutdown_approved_intercepted() { +async fn team_send_message_shutdown_approved_intercepted() { let env = setup().await; let mut stream = connect_and_init(env.server.port(), "test-token-123", "worker-1").await; @@ -357,7 +357,7 @@ async fn ts_shutdown_approved_intercepted() { } #[tokio::test] -async fn ts_shutdown_rejected_intercepted() { +async fn team_send_message_shutdown_rejected_intercepted() { let env = setup().await; let mut stream = connect_and_init(env.server.port(), "test-token-123", "worker-1").await; @@ -377,7 +377,7 @@ async fn ts_shutdown_rejected_intercepted() { } #[tokio::test] -async fn ts_regular_message_rejects_without_live_team_run_service() { +async fn team_send_message_regular_message_rejects_without_live_team_run_service() { let env = setup().await; let mut stream = connect_and_init(env.server.port(), "test-token-123", "worker-1").await; From f9e4c757683097b1396e5ab29706449f91e19804 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 15:41:23 +0800 Subject: [PATCH 4/5] feat(team): monitor slow child turns per session --- crates/aionui-team/src/service.rs | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index a831a153..c99c363d 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -37,6 +37,7 @@ pub(crate) fn inherit_team_workspace(extra: &mut serde_json::Value, workspace: & struct SessionEntry { session: Arc, + slow_monitor_handle: tokio::task::JoinHandle<()>, } pub struct TeamSessionService { @@ -544,8 +545,10 @@ impl TeamSessionService { // Spawn per-agent event loops self.spawn_event_loops(&session, &user_id, &agents_snapshot); + let slow_monitor_handle = Self::spawn_slow_monitor(session.clone()); let entry = SessionEntry { session: session.clone(), + slow_monitor_handle, }; self.sessions.insert(team_id.to_owned(), entry); @@ -596,6 +599,17 @@ impl TeamSessionService { self.broadcaster.broadcast(event); } + fn spawn_slow_monitor(session: Arc) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; + session.team_run_manager().observe_slow_child_turns(now_ms()).await; + } + }) + } + fn broadcast_team_created(&self, team_id: &str, team_name: &str) { info!(team_id = %team_id, event_name = TEAM_CREATED_EVENT, "team event broadcast"); self.broadcaster.broadcast(WebSocketMessage::new( @@ -735,6 +749,14 @@ impl TeamSessionService { self.sessions.get(team_id).map(|e| e.session.scheduler().clone()) } + #[cfg(test)] + fn session_has_slow_monitor(&self, team_id: &str) -> bool { + self.sessions + .get(team_id) + .map(|entry| !entry.slow_monitor_handle.is_finished()) + .unwrap_or(false) + } + pub async fn stop_session(&self, user_id: &str, team_id: &str) -> Result<(), TeamError> { self.load_owned_team(user_id, team_id).await?; self.stop_session_unchecked(team_id); @@ -743,6 +765,7 @@ impl TeamSessionService { fn stop_session_unchecked(&self, team_id: &str) { if let Some((_, entry)) = self.sessions.remove(team_id) { + entry.slow_monitor_handle.abort(); entry.session.event_loops().shutdown(); entry.session.stop(); } @@ -981,3 +1004,24 @@ impl TeamSessionService { .await } } + +#[cfg(test)] +mod tests { + use crate::test_utils::workspace_harness::{ + setup_with_factory_metadata_team_repo_and_conversation_repo, single_agent_team_request, + }; + + #[tokio::test] + async fn session_has_slow_monitor() { + let (svc, _repo, _task_manager, _conv_repo) = setup_with_factory_metadata_team_repo_and_conversation_repo(); + let created = svc + .create_team("user-test", single_agent_team_request("Slow Monitor")) + .await + .unwrap(); + + svc.ensure_session("user-test", &created.id).await.unwrap(); + + assert!(svc.session_has_slow_monitor(&created.id)); + svc.stop_session("user-test", &created.id).await.unwrap(); + } +} From a4526cc04fbabbb61792da85d3f273ed9470e276 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Tue, 16 Jun 2026 15:44:20 +0800 Subject: [PATCH 5/5] fix(agent): classify busy and disconnected errors --- .../src/protocol/send_error.rs | 31 ++++++++++++++++--- .../src/routes/error_mapping.rs | 4 +-- crates/aionui-team/src/routes.rs | 8 ++++- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/crates/aionui-ai-agent/src/protocol/send_error.rs b/crates/aionui-ai-agent/src/protocol/send_error.rs index 28151355..5c4a69a9 100644 --- a/crates/aionui-ai-agent/src/protocol/send_error.rs +++ b/crates/aionui-ai-agent/src/protocol/send_error.rs @@ -298,14 +298,14 @@ impl AgentSendError { ), AcpError::NotConnected => Self::new( "AionUI lost its Agent protocol connection", - AgentErrorCode::AionuiInternalError, - AgentErrorOwnership::Aionui, + AgentErrorCode::UserAgentDisconnected, + AgentErrorOwnership::UserAgent, Some(detail), true, - true, + false, resolution( - AgentErrorResolutionKind::SendFeedback, - Some(AgentErrorResolutionTarget::Feedback), + AgentErrorResolutionKind::ReconnectAgent, + Some(AgentErrorResolutionTarget::AgentSettings), ), ), AcpError::AgentInternal { .. } => unknown_upstream_error(detail), @@ -1182,6 +1182,27 @@ mod tests { assert_eq!(err.stream_error().feedback_recommended, Some(false)); } + #[test] + fn not_connected_maps_to_user_agent_disconnected() { + let disconnected = AgentSendError::from(AcpError::NotConnected); + assert_eq!( + disconnected.stream_error.code, + Some(AgentErrorCode::UserAgentDisconnected) + ); + assert_eq!( + disconnected.stream_error.ownership, + Some(AgentErrorOwnership::UserAgent) + ); + assert_eq!( + disconnected + .stream_error + .resolution + .as_ref() + .map(|resolution| resolution.kind), + Some(AgentErrorResolutionKind::ReconnectAgent) + ); + } + #[test] fn classifies_acp_internal_provider_failure_from_structured_message() { assert_acp_classification( diff --git a/crates/aionui-ai-agent/src/routes/error_mapping.rs b/crates/aionui-ai-agent/src/routes/error_mapping.rs index 1d87c53f..c635f7da 100644 --- a/crates/aionui-ai-agent/src/routes/error_mapping.rs +++ b/crates/aionui-ai-agent/src/routes/error_mapping.rs @@ -32,7 +32,7 @@ fn acp_error_to_api_error(err: AcpError) -> ApiError { AcpError::MethodNotFound { .. } => ApiError::BadRequest(acp_error_public_message(&err)), AcpError::InvalidParams { .. } => ApiError::BadRequest(acp_error_public_message(&err)), AcpError::AgentInternal { .. } => ApiError::BadGateway(acp_error_public_message(&err)), - AcpError::NotConnected => ApiError::Internal("ACP protocol not connected".into()), + AcpError::NotConnected => ApiError::BadGateway(acp_error_public_message(&err)), AcpError::InitTimeout { .. } => ApiError::BadGateway(acp_error_public_message(&err)), } } @@ -76,7 +76,7 @@ mod tests { }, StatusCode::BAD_GATEWAY, ), - (AcpError::NotConnected, StatusCode::INTERNAL_SERVER_ERROR), + (AcpError::NotConnected, StatusCode::BAD_GATEWAY), (AcpError::InitTimeout { timeout_secs: 30 }, StatusCode::BAD_GATEWAY), ]; diff --git a/crates/aionui-team/src/routes.rs b/crates/aionui-team/src/routes.rs index 983d0e7b..d706b65f 100644 --- a/crates/aionui-team/src/routes.rs +++ b/crates/aionui-team/src/routes.rs @@ -42,7 +42,7 @@ impl From for ApiError { TeamError::AgentNotFound(msg) => ApiError::NotFound(msg), TeamError::TaskNotFound(msg) => ApiError::NotFound(msg), TeamError::InvalidRequest(msg) => ApiError::BadRequest(msg), - TeamError::SlotBusy(msg) => ApiError::BadRequest(format!("Team slot is busy: {msg}")), + TeamError::SlotBusy(msg) => ApiError::Conflict(format!("Team slot is busy: {msg}")), TeamError::LeaderOnly(msg) => ApiError::Forbidden(msg), TeamError::Forbidden(msg) => ApiError::Forbidden(msg), TeamError::SessionNotFound(msg) => ApiError::NotFound(msg), @@ -326,6 +326,12 @@ mod tests { assert!(matches!(err, ApiError::BadRequest(_))); } + #[test] + fn slot_busy_maps_to_conflict() { + let api_error: ApiError = TeamError::SlotBusy("lead-1".into()).into(); + assert_eq!(api_error.status_code(), StatusCode::CONFLICT); + } + #[test] fn leader_only_maps_to_forbidden() { let err: ApiError = TeamError::LeaderOnly("spawn_agent".into()).into();