Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions crates/aionui-ai-agent/src/protocol/send_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,14 @@ impl AgentSendError {
),
AcpError::NotConnected => Self::new(
"AionUI lost its Agent protocol connection",
AgentErrorCode::AionuiInternalError,
AgentErrorOwnership::Aionui,
AgentErrorCode::UserAgentDisconnected,
AgentErrorOwnership::UserAgent,
Some(detail),
true,
true,
false,
resolution(
AgentErrorResolutionKind::SendFeedback,
Some(AgentErrorResolutionTarget::Feedback),
AgentErrorResolutionKind::ReconnectAgent,
Some(AgentErrorResolutionTarget::AgentSettings),
),
),
AcpError::AgentInternal { .. } => unknown_upstream_error(detail),
Expand Down Expand Up @@ -1182,6 +1182,27 @@ mod tests {
assert_eq!(err.stream_error().feedback_recommended, Some(false));
}

#[test]
fn not_connected_maps_to_user_agent_disconnected() {
let disconnected = AgentSendError::from(AcpError::NotConnected);
assert_eq!(
disconnected.stream_error.code,
Some(AgentErrorCode::UserAgentDisconnected)
);
assert_eq!(
disconnected.stream_error.ownership,
Some(AgentErrorOwnership::UserAgent)
);
assert_eq!(
disconnected
.stream_error
.resolution
.as_ref()
.map(|resolution| resolution.kind),
Some(AgentErrorResolutionKind::ReconnectAgent)
);
}

#[test]
fn classifies_acp_internal_provider_failure_from_structured_message() {
assert_acp_classification(
Expand Down
4 changes: 2 additions & 2 deletions crates/aionui-ai-agent/src/routes/error_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn acp_error_to_api_error(err: AcpError) -> ApiError {
AcpError::MethodNotFound { .. } => ApiError::BadRequest(acp_error_public_message(&err)),
AcpError::InvalidParams { .. } => ApiError::BadRequest(acp_error_public_message(&err)),
AcpError::AgentInternal { .. } => ApiError::BadGateway(acp_error_public_message(&err)),
AcpError::NotConnected => ApiError::Internal("ACP protocol not connected".into()),
AcpError::NotConnected => ApiError::BadGateway(acp_error_public_message(&err)),
AcpError::InitTimeout { .. } => ApiError::BadGateway(acp_error_public_message(&err)),
}
}
Expand Down Expand Up @@ -76,7 +76,7 @@ mod tests {
},
StatusCode::BAD_GATEWAY,
),
(AcpError::NotConnected, StatusCode::INTERNAL_SERVER_ERROR),
(AcpError::NotConnected, StatusCode::BAD_GATEWAY),
(AcpError::InitTimeout { timeout_secs: 30 }, StatusCode::BAD_GATEWAY),
];

Expand Down
4 changes: 3 additions & 1 deletion crates/aionui-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ pub use team::{
TeamAgentRemovedPayload, TeamAgentRenamedPayload, TeamAgentResponse, TeamAgentSpawnedPayload,
TeamAgentStatusPayload, TeamChildTurnPayload, TeamListResponse, TeamMcpPhase, TeamMcpRuntimeConfig,
TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole,
TeamRuntimeSeed, TeamSessionBinding, TeamSlotWorkPayload, TeammateMessagePayload,
TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason,
TeamSendMessageStatus, TeamSendMessageTargetQueueState, TeamSessionBinding, TeamSlotRuntimeHealth,
TeamSlotWorkPayload, TeammateMessagePayload,
};
pub use team_mcp::{GuideMcpConfig, TEAM_MCP_SERVER_NAME, TeamMcpStdioConfig};
pub use websocket::WebSocketMessage;
Expand Down
135 changes: 135 additions & 0 deletions crates/aionui-api-types/src/team.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ pub struct TeamRunAckResponse {
pub message_id: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TeamSlotRuntimeHealth {
Disconnected,
Unhealthy,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TeamSlotWorkPayload {
pub slot_id: String,
Expand All @@ -246,6 +253,16 @@ pub struct TeamSlotWorkPayload {
pub suppressed_wake_count: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_turn_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_turn_started_at_ms: Option<TimestampMs>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_turn_elapsed_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_turn_slow: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_turn_slow_threshold_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_health: Option<TeamSlotRuntimeHealth>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -273,6 +290,57 @@ pub struct TeamChildTurnPayload {
pub status: TeamRunStatus,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TeamSendMessageStatus {
Queued,
Rejected,
Error,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TeamSendMessageDelivery {
WakeRecorded,
WakeSuppressed,
NotRecorded,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TeamSendMessageReason {
QueuedForIdle,
BehindStartingTurn,
BehindActiveTurn,
SuppressedByPause,
NoActiveTeamRun,
TargetNotFound,
TargetDisconnected,
TargetUnhealthy,
InternalError,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TeamSendMessageTargetQueueState {
pub slot_id: String,
pub role: TeamRunTargetRole,
pub queue_state: TeamSendMessageReason,
pub pending_wake_count: usize,
pub starting_child_count: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_turn_id: Option<String>,
pub suppressed_wake_count: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TeamSendMessageQueuedResponse {
pub status: TeamSendMessageStatus,
pub delivery: TeamSendMessageDelivery,
pub reason: TeamSendMessageReason,
pub team_run_id: String,
pub targets: Vec<TeamSendMessageTargetQueueState>,
}

// ---------------------------------------------------------------------------
// E. Team management — Response DTOs
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1117,6 +1185,11 @@ mod tests {
paused: false,
suppressed_wake_count: 0,
active_turn_id: Some("turn-lead".into()),
active_turn_started_at_ms: None,
active_turn_elapsed_ms: None,
active_turn_slow: None,
active_turn_slow_threshold_ms: None,
runtime_health: None,
},
TeamSlotWorkPayload {
slot_id: "worker-1".into(),
Expand All @@ -1126,6 +1199,11 @@ mod tests {
paused: false,
suppressed_wake_count: 0,
active_turn_id: None,
active_turn_started_at_ms: None,
active_turn_elapsed_ms: None,
active_turn_slow: None,
active_turn_slow_threshold_ms: None,
runtime_health: None,
},
],
};
Expand Down Expand Up @@ -1159,6 +1237,11 @@ mod tests {
active_turn_id: None,
paused: true,
suppressed_wake_count: 2,
active_turn_started_at_ms: None,
active_turn_elapsed_ms: None,
active_turn_slow: None,
active_turn_slow_threshold_ms: None,
runtime_health: None,
};

let value = serde_json::to_value(&payload).unwrap();
Expand All @@ -1168,6 +1251,58 @@ mod tests {
assert!(value.get(format!("{}_pending_count", "background")).is_none());
}

#[test]
fn team_slot_work_payload_serializes_active_turn_slow_fields() {
let payload = TeamSlotWorkPayload {
slot_id: "worker-1".into(),
role: TeamRunTargetRole::Teammate,
pending_wake_count: 0,
starting_child_count: 0,
paused: false,
suppressed_wake_count: 0,
active_turn_id: Some("turn-worker".into()),
active_turn_started_at_ms: Some(1_000),
active_turn_elapsed_ms: Some(600_001),
active_turn_slow: Some(true),
active_turn_slow_threshold_ms: Some(600_000),
runtime_health: Some(TeamSlotRuntimeHealth::Unhealthy),
};

let value = serde_json::to_value(payload).unwrap();

assert_eq!(value["active_turn_started_at_ms"], 1_000);
assert_eq!(value["active_turn_elapsed_ms"], 600_001);
assert_eq!(value["active_turn_slow"], true);
assert_eq!(value["active_turn_slow_threshold_ms"], 600_000);
assert_eq!(value["runtime_health"], "unhealthy");
}

#[test]
fn team_send_message_queued_response_serializes_stable_contract() {
let response = TeamSendMessageQueuedResponse {
status: TeamSendMessageStatus::Queued,
delivery: TeamSendMessageDelivery::WakeRecorded,
reason: TeamSendMessageReason::BehindActiveTurn,
team_run_id: "run-1".into(),
targets: vec![TeamSendMessageTargetQueueState {
slot_id: "worker-1".into(),
role: TeamRunTargetRole::Teammate,
queue_state: TeamSendMessageReason::BehindActiveTurn,
pending_wake_count: 1,
starting_child_count: 0,
active_turn_id: Some("turn-worker".into()),
suppressed_wake_count: 0,
}],
};

let value = serde_json::to_value(response).unwrap();

assert_eq!(value["status"], "queued");
assert_eq!(value["delivery"], "wake_recorded");
assert_eq!(value["reason"], "behind_active_turn");
assert_eq!(value["targets"][0]["queue_state"], "behind_active_turn");
}

#[test]
fn team_slot_work_payload_defaults_pause_fields_for_old_payloads() {
let decoded: TeamSlotWorkPayload = serde_json::from_value(serde_json::json!({
Expand Down
2 changes: 2 additions & 0 deletions crates/aionui-team/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ async fn execute_turn(ctx: &AgentLoopContext, input: &crate::session::WakeInput)
role: started.role,
conversation_id: started.conversation_id,
turn_id: started.turn_id,
started_at_ms: aionui_common::now_ms(),
last_slow_notified_at_ms: None,
};
match team_run_manager
.record_child_started(&reservation_id, child.clone())
Expand Down
58 changes: 54 additions & 4 deletions crates/aionui-team/src/mcp/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::net::SocketAddr;
use std::sync::{Arc, Weak};

use aionui_api_types::{TeamMcpPhase, TeamMcpStatusPayload, WebSocketMessage};
use aionui_api_types::{
TeamMcpPhase, TeamMcpStatusPayload, TeamSendMessageQueuedResponse, TeamSendMessageStatus, WebSocketMessage,
};
use aionui_realtime::EventBroadcaster;
use serde_json::{Value, json};
use tokio::net::{TcpListener, TcpStream};
Expand All @@ -12,7 +14,7 @@ use crate::error::TeamError;
use crate::events::TEAM_MCP_STATUS_EVENT;
use crate::scheduler::TeammateManager;
use crate::service::TeamSessionService;
use crate::session::SpawnAgentRequest;
use crate::session::{AgentMessageQueueResult, SpawnAgentRequest};
use crate::types::{TeammateRole, TeammateStatus};
use crate::wake::TeamWakeSource;

Expand Down Expand Up @@ -562,14 +564,33 @@ async fn exec_send_message(
} else {
vec![resolved_to.clone()]
};
let mut target_results = Vec::with_capacity(targets.len());
for target in &targets {
service
let result = service
.send_agent_message_from_agent(team_id, caller_slot_id, target, &input.message)
.await
.map_err(|e| e.to_string())?;
target_results.push(result);
}

Ok(format!("Message sent to {}", input.to))
let response = build_send_message_queued_response(target_results)?;

serde_json::to_string(&response).map_err(|e| format!("Serialization error: {e}"))
}

fn build_send_message_queued_response(
target_results: Vec<AgentMessageQueueResult>,
) -> Result<TeamSendMessageQueuedResponse, String> {
let first = target_results
.first()
.ok_or_else(|| "No message targets resolved".to_string())?;
Ok(TeamSendMessageQueuedResponse {
status: TeamSendMessageStatus::Queued,
delivery: first.delivery.clone(),
reason: first.target.queue_state.clone(),
team_run_id: first.team_run_id.clone(),
targets: target_results.into_iter().map(|result| result.target).collect(),
})
}

async fn exec_spawn_agent(
Expand Down Expand Up @@ -888,6 +909,35 @@ async fn http_mcp_loop(
#[cfg(test)]
mod tests {
use super::*;
use aionui_api_types::{
TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, TeamSendMessageTargetQueueState,
};

#[test]
fn build_send_message_queued_response_serializes_json_contract() {
let response = build_send_message_queued_response(vec![AgentMessageQueueResult {
team_run_id: "run-1".into(),
delivery: TeamSendMessageDelivery::WakeRecorded,
target: TeamSendMessageTargetQueueState {
slot_id: "worker-1".into(),
role: TeamRunTargetRole::Teammate,
queue_state: TeamSendMessageReason::QueuedForIdle,
pending_wake_count: 1,
starting_child_count: 0,
active_turn_id: None,
suppressed_wake_count: 0,
},
}])
.unwrap();

let text = serde_json::to_string(&response).unwrap();
let payload: serde_json::Value = serde_json::from_str(&text).expect("team_send_message result must be JSON");
assert_eq!(payload["status"], "queued");
assert_eq!(payload["delivery"], "wake_recorded");
assert_eq!(payload["reason"], "queued_for_idle");
assert_eq!(payload["targets"][0]["slot_id"], "worker-1");
assert_eq!(payload["targets"][0]["queue_state"], "queued_for_idle");
}

/// Non-Lead callers are rejected at the dispatch layer with the
/// "Only Lead ..." phrasing. Service weak is never upgraded because
Expand Down
8 changes: 7 additions & 1 deletion crates/aionui-team/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl From<TeamError> for ApiError {
TeamError::AgentNotFound(msg) => ApiError::NotFound(msg),
TeamError::TaskNotFound(msg) => ApiError::NotFound(msg),
TeamError::InvalidRequest(msg) => ApiError::BadRequest(msg),
TeamError::SlotBusy(msg) => ApiError::BadRequest(format!("Team slot is busy: {msg}")),
TeamError::SlotBusy(msg) => ApiError::Conflict(format!("Team slot is busy: {msg}")),
TeamError::LeaderOnly(msg) => ApiError::Forbidden(msg),
TeamError::Forbidden(msg) => ApiError::Forbidden(msg),
TeamError::SessionNotFound(msg) => ApiError::NotFound(msg),
Expand Down Expand Up @@ -326,6 +326,12 @@ mod tests {
assert!(matches!(err, ApiError::BadRequest(_)));
}

#[test]
fn slot_busy_maps_to_conflict() {
let api_error: ApiError = TeamError::SlotBusy("lead-1".into()).into();
assert_eq!(api_error.status_code(), StatusCode::CONFLICT);
}

#[test]
fn leader_only_maps_to_forbidden() {
let err: ApiError = TeamError::LeaderOnly("spawn_agent".into()).into();
Expand Down
Loading
Loading