Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 109 additions & 4 deletions crates/aionui-app/src/commands/cmd_team_guide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,12 +500,95 @@ mod tests {
assert!(!serialized.contains("conv-secret-123"));
}

#[tokio::test]
async fn forward_tool_create_team_object_success_returns_success_text() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/tool"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"teamId": "team-1",
"name": "Dev Team",
"route": "/team/team-1",
"status": "team_created",
"next_step": "Team was created and the Team page is open. End this solo turn now.",
})))
.mount(&mock_server)
.await;

let server = guide_server_for_port(mock_server.address().port());
let result = server
.forward_tool("aion_create_team", &json!({"summary": "redacted"}))
.await;

assert_ne!(result.is_error, Some(true));
let text = first_text(&result);
assert!(text.contains("team_created"));
assert!(text.contains("team-1"));
assert!(text.contains("End this solo turn now"));
}

#[tokio::test]
async fn forward_tool_create_team_malformed_object_remains_unexpected() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/tool"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"name": "Dev Team",
"route": "/team/team-1",
"status": "team_created",
"next_step": "Team was created and the Team page is open. End this solo turn now.",
})))
.mount(&mock_server)
.await;

let server = guide_server_for_port(mock_server.address().port());
let result = server
.forward_tool("aion_create_team", &json!({"summary": "redacted"}))
.await;

assert_eq!(result.is_error, Some(true));
assert_eq!(first_text(&result), "unexpected local guide tool response");
assert_eq!(
result.structured_content.as_ref().unwrap()["code"],
"MCP_TOOL_RESPONSE_UNEXPECTED"
);
}

#[tokio::test]
async fn forward_tool_create_team_legacy_result_body_remains_unexpected() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/tool"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result": "legacy create-team success",
})))
.mount(&mock_server)
.await;

let server = guide_server_for_port(mock_server.address().port());
let result = server
.forward_tool("aion_create_team", &json!({"summary": "redacted"}))
.await;

assert_eq!(result.is_error, Some(true));
assert_eq!(first_text(&result), "unexpected local guide tool response");
assert_eq!(
result.structured_content.as_ref().unwrap()["code"],
"MCP_TOOL_RESPONSE_UNEXPECTED"
);
}

#[tokio::test]
async fn forward_tool_unexpected_2xx_body_returns_unexpected_without_echoing_body() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/tool"))
.respond_with(ResponseTemplate::new(200).set_body_string("unexpected raw body with team-secret-789"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"teamId": "team-secret-789",
"route": "/team/team-secret-789",
"status": "team_created",
"next_step": "Team was created and the Team page is open. End this solo turn now.",
})))
.mount(&mock_server)
.await;

Expand All @@ -520,7 +603,7 @@ mod tests {
);
let serialized = serde_json::to_string(&result).unwrap();
assert!(!serialized.contains("team-secret-789"));
assert!(!serialized.contains("unexpected raw body"));
assert!(!serialized.contains("team_created"));
}

#[tokio::test]
Expand Down Expand Up @@ -599,8 +682,8 @@ impl GuideServer {
match resp.text().await {
Ok(text) => {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
if let Some(result) = v.get("result").and_then(|r| r.as_str()) {
return tool_success(result.to_owned());
if let Some(result) = parse_tool_success_text(tool_name, &v) {
return tool_success(result);
}
if v.get("error").is_some() {
return tool_error(
Expand Down Expand Up @@ -694,3 +777,25 @@ fn extract_nested_code(value: &serde_json::Value, path: &[&str]) -> Option<serde
_ => None,
}
}

fn parse_tool_success_text(tool_name: &str, value: &serde_json::Value) -> Option<String> {
if tool_name == "aion_create_team" {
return is_create_team_success_body(value).then(|| serde_json::to_string(value).ok())?;
}

if let Some(result) = value.get("result").and_then(|result| result.as_str()) {
return Some(result.to_owned());
}

None
}

fn is_create_team_success_body(value: &serde_json::Value) -> bool {
value.get("status").and_then(|status| status.as_str()) == Some("team_created")
&& value.get("teamId").and_then(|team_id| team_id.as_str()).is_some()
&& value.get("route").and_then(|route| route.as_str()).is_some()
&& value
.get("next_step")
.and_then(|next_step| next_step.as_str())
.is_some()
}
53 changes: 41 additions & 12 deletions crates/aionui-app/src/router/team_conversation_adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use aionui_team::{
TeamConversationProvisioningPort, TeamError, TeamProjectionMessageStore,
};
use async_trait::async_trait;
use tracing::info;

pub struct TeamConversationAdapters {
conversation_service: ConversationService,
Expand Down Expand Up @@ -73,18 +74,42 @@ impl AgentTurnExecutionPort for TeamConversationAdapters {
>
});

let outcome = self
.conversation_service
.run_agent_turn(ConversationAgentTurnRequest {
user_id: request.user_id,
conversation_id: request.conversation_id,
content: request.content,
files: request.files,
inject_skills: Vec::new(),
on_started,
})
.await
.map_err(map_conversation_turn_error)?;
let conversation_id = request.conversation_id.clone();
let outcome = loop {
match self
.conversation_service
.run_agent_turn(ConversationAgentTurnRequest {
user_id: request.user_id.clone(),
conversation_id: conversation_id.clone(),
content: request.content.clone(),
files: request.files.clone(),
inject_skills: Vec::new(),
on_started: on_started.clone(),
})
.await
{
Ok(outcome) => break outcome,
Err(error) if is_retryable_conversation_busy(&error) => {
info!(
conversation_id = %conversation_id,
team_run_id = ?request.team_run_id,
slot_id = %request.slot_id,
"team conversation turn waiting for active conversation turn to release"
);
self.conversation_service
.runtime_state()
.wait_until_unclaimed(&conversation_id)
.await;
info!(
conversation_id = %conversation_id,
team_run_id = ?request.team_run_id,
slot_id = %request.slot_id,
"team conversation turn retrying after active conversation turn released"
);
}
Err(error) => return Err(map_conversation_turn_error(error)),
}
};

Ok(AgentTurnOutcome {
conversation_id: outcome.conversation_id,
Expand Down Expand Up @@ -275,6 +300,10 @@ impl TeamConversationLookupPort for TeamConversationAdapters {
}
}

fn is_retryable_conversation_busy(error: &ConversationError) -> bool {
matches!(error, ConversationError::Busy { reason } if reason.contains("already running"))
}

fn map_conversation_create_error(error: ConversationError) -> TeamError {
match error {
ConversationError::WorkspacePathUnavailable { path } => TeamError::WorkspacePathUnavailable(path),
Expand Down
47 changes: 47 additions & 0 deletions crates/aionui-conversation/src/runtime_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use std::{

use aionui_api_types::{ConversationRuntimeStateKind, ConversationRuntimeSummary};
use aionui_common::ConversationStatus;
use tokio::sync::Notify;
use tracing::{info, warn};

use crate::ConversationError;

#[derive(Debug, Default)]
pub struct ConversationRuntimeStateService {
state: Mutex<ConversationRuntimeState>,
release_notify: Notify,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -112,6 +114,16 @@ impl ConversationRuntimeStateService {
.and_then(|state| state.active_turns.get(conversation_id).cloned())
}

pub async fn wait_until_unclaimed(&self, conversation_id: &str) {
loop {
let notified = self.release_notify.notified();
if !self.is_claimed(conversation_id) {
return;
}
notified.await;
}
}

pub fn mark_deleting(&self, conversation_id: &str) -> bool {
match self.state.lock() {
Ok(mut state) => {
Expand Down Expand Up @@ -305,6 +317,8 @@ impl ConversationRuntimeStateService {
deleting = was_deleting,
"conversation runtime turn claim released"
);
drop(state);
self.release_notify.notify_waiters();
was_deleting
}
Err(_) => {
Expand Down Expand Up @@ -417,6 +431,39 @@ mod tests {
assert!(state.try_claim_turn("conv-1", "turn-2").is_ok());
}

#[tokio::test]
async fn wait_until_unclaimed_completes_after_active_claim_releases() {
let state = Arc::new(ConversationRuntimeStateService::default());
let mut claim = state
.try_claim_turn("conv-1", "turn-1")
.expect("claim should be created");

let waiter = {
let state = state.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
state.wait_until_unclaimed("conv-1").await;
let _ = tx.send(());
});
rx
};
tokio::pin!(waiter);

assert!(
tokio::time::timeout(std::time::Duration::from_millis(20), &mut waiter)
.await
.is_err(),
"waiter must stay pending while the claim is active"
);

let _ = claim.release();
assert!(!state.is_claimed("conv-1"));
tokio::time::timeout(std::time::Duration::from_secs(1), &mut waiter)
.await
.expect("waiter should finish after release")
.expect("waiter task should send completion");
}

#[test]
fn deleting_rejects_new_turn_claims() {
let state = Arc::new(ConversationRuntimeStateService::default());
Expand Down
16 changes: 14 additions & 2 deletions crates/aionui-team/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use tracing::{debug, info, warn};

use crate::mailbox::Mailbox;
use crate::ports::{
AgentTurnExecutionPort, AgentTurnRequest, AgentTurnSource, AgentTurnStarted, AgentTurnStartedCallback,
AgentTurnExecutionError, AgentTurnExecutionPort, AgentTurnRequest, AgentTurnSource, AgentTurnStarted,
AgentTurnStartedCallback,
};
use crate::scheduler::TeammateManager;
use crate::session::TeamSession;
Expand Down Expand Up @@ -118,6 +119,10 @@ struct TurnExecution {
turn_id: Option<String>,
}

fn is_retryable_start_skip(error: &AgentTurnExecutionError) -> bool {
matches!(error, AgentTurnExecutionError::Skipped { reason } if reason.contains("already running"))
}

/// The event loop for one agent slot. Spawned as a tokio task.
///
/// Flow:
Expand Down Expand Up @@ -337,6 +342,13 @@ async fn execute_turn(ctx: &AgentLoopContext, input: &crate::session::WakeInput)
{
if started_seen.load(Ordering::SeqCst) {
ctx.session.team_run_manager().complete_failed().await;
} else if is_retryable_start_skip(&e) {
ctx.session
.team_run_manager()
.retry_child_start_later(&reservation.reservation_id, &e.to_string())
.await;
let _ = ctx.scheduler.set_status(&ctx.slot_id, TeammateStatus::Idle).await;
return None;
} else {
ctx.session
.team_run_manager()
Expand Down Expand Up @@ -392,7 +404,7 @@ async fn finalize_turn(ctx: &AgentLoopContext, turn: TurnExecution, input: &crat
if wake_target != ctx.slot_id
&& let Err(e) = ctx
.session
.wake_agent_for_team_work(&wake_target, TeamWakeSource::IdleNotification, None)
.scheduler_wake_agent_for_team_work(&wake_target, TeamWakeSource::IdleNotification)
.await
{
warn!(
Expand Down
Loading
Loading