diff --git a/crates/aionui-ai-agent/src/agent_task.rs b/crates/aionui-ai-agent/src/agent_task.rs index 10464dc1e..670583477 100644 --- a/crates/aionui-ai-agent/src/agent_task.rs +++ b/crates/aionui-ai-agent/src/agent_task.rs @@ -110,21 +110,9 @@ pub trait IMockAgent: IAgentTask { initialized: false, }) } - async fn set_mode(&self, _mode: &str) -> Result<(), AgentError> { - Err(AgentError::bad_request("Mode switching is not supported for this mock")) - } async fn get_model(&self) -> Result { Ok(GetModelInfoResponse { model_info: None }) } - async fn set_model(&self, _model_id: &str) -> Result<(), AgentError> { - Err(AgentError::bad_request( - "Model switching is not supported for this mock", - )) - } - async fn set_model_confirmed(&self, model_id: &str) -> Result { - self.set_model(model_id).await?; - self.get_model().await - } async fn get_config_options(&self) -> Result { Ok(GetConfigOptionsResponse { config_options: Vec::new(), @@ -316,18 +304,6 @@ impl AgentInstance { } } - /// Set the session mode. Unsupported for variants other than ACP / - /// Aionrs — returns a `BadRequest` so the caller can surface an - /// actionable error rather than silently no-op. - pub async fn set_mode(&self, mode: &str) -> Result<(), AgentError> { - match self { - Self::Acp(m) => m.set_mode(mode).await, - Self::Aionrs(m) => m.set_mode(mode).await, - #[cfg(any(test, feature = "test-support"))] - Self::Mock(m) => m.set_mode(mode).await, - } - } - /// Get the current session model info. Only ACP exposes a model /// catalog; other variants report `model_info = None` so the UI can /// hide the model picker without an error. @@ -350,42 +326,6 @@ impl AgentInstance { } } - /// Switch the active model. Unsupported for variants other than ACP — - /// returns a `BadRequest` so the caller can surface an actionable - /// error rather than silently no-op. - pub async fn set_model(&self, model_id: &str) -> Result<(), AgentError> { - if model_id.trim().is_empty() { - return Err(AgentError::bad_request("model_id must not be empty")); - } - match self { - Self::Acp(m) => m.set_model(model_id).await, - Self::Aionrs(_) => Err(AgentError::bad_request( - "Model switching is not supported for this agent type", - )), - #[cfg(any(test, feature = "test-support"))] - Self::Mock(m) => m.set_model(model_id).await, - } - } - - /// Switch the active model and return the confirmed model payload for - /// this specific mutation, rather than re-reading potentially stale - /// cached state. - pub async fn set_model_confirmed(&self, model_id: &str) -> Result { - if model_id.trim().is_empty() { - return Err(AgentError::bad_request("model_id must not be empty")); - } - match self { - Self::Acp(m) => Ok(GetModelInfoResponse { - model_info: Some(map_sdk_model_to_payload(m.set_model_confirmed(model_id).await?)), - }), - Self::Aionrs(_) => Err(AgentError::bad_request( - "Model switching is not supported for this agent type", - )), - #[cfg(any(test, feature = "test-support"))] - Self::Mock(m) => m.set_model_confirmed(model_id).await, - } - } - pub async fn get_config_options(&self) -> Result { match self { Self::Acp(m) => m.config_options().await, diff --git a/crates/aionui-ai-agent/src/manager/acp/agent.rs b/crates/aionui-ai-agent/src/manager/acp/agent.rs index f3e48fa22..a5519b48b 100644 --- a/crates/aionui-ai-agent/src/manager/acp/agent.rs +++ b/crates/aionui-ai-agent/src/manager/acp/agent.rs @@ -245,6 +245,25 @@ fn matched_slash_command(raw_user_input: &str, commands: &[AvailableCommand]) -> /// (Claude, Qwen, CodeBuddy, Codex, etc.). Communication now happens via /// the `agent-client-protocol` SDK's JSON-RPC transport, replacing the /// previous hand-crafted JSON-over-stdin/stdout approach. +fn mark_session_opened_after_protocol_ready( + session: &mut AcpSession, + sid: String, + protocol_connected: bool, + conversation_id: &str, + backend: Option<&str>, +) -> Result { + if !protocol_connected { + warn!( + conversation_id = %conversation_id, + backend = backend.unwrap_or("-"), + "ACP session open returned after protocol disconnected; rejecting opened transition" + ); + return Err(AcpError::NotConnected.into()); + } + session.mark_opened(); + Ok(sid) +} + pub struct AcpAgentManager { /// Pre-computed, immutable session parameters assembled by the factory. pub(super) params: Arc, @@ -465,6 +484,19 @@ impl AcpAgentManager { runtime.bump_activity(); } + fn ensure_protocol_connected_for_operation(&self, operation: &'static str) -> Result<(), AgentError> { + if self.protocol.is_connected() { + return Ok(()); + } + warn!( + conversation_id = %self.params.conversation_id, + agent_backend = ?self.params.metadata.backend, + operation, + "ACP operation rejected because protocol is disconnected" + ); + Err(AcpError::NotConnected.into()) + } + pub(crate) async fn mode(&self) -> Result { let desired = self .session @@ -551,6 +583,8 @@ impl AcpAgentManager { option_id: &str, value: &str, ) -> Result { + self.ensure_protocol_connected_for_operation("set_config_option")?; + let (session_id, set_path, is_mode_option) = { let session = self.session.read().await; let snapshot = session.config_snapshot(); @@ -761,176 +795,6 @@ impl AcpAgentManager { } } - /// Set the mode for the current session. - pub(crate) async fn set_mode(&self, mode: &str) -> Result<(), AgentError> { - let normalized_mode = normalize_requested_mode(&self.params.metadata, mode); - if normalized_mode.is_empty() { - return Err(AgentError::bad_request("mode must not be empty")); - } - - let session_id = { - let session = self.session.read().await; - if !session.can_select_mode(&normalized_mode) { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_mode_id = %normalized_mode, - "acp_set_mode_rejected_unavailable" - ); - return Err(AgentError::bad_request(format!( - "Mode '{normalized_mode}' is not available for this ACP session" - ))); - } - session.session_id().map(ToOwned::to_owned) - } - .ok_or_else(|| { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_mode_id = %normalized_mode, - "acp_set_command_missing_session" - ); - AgentError::bad_request("No active session") - })?; - - info!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_mode_id = %normalized_mode, - "acp_set_mode_requested" - ); - codex_sandbox::sync_for_agent(&self.params.metadata, Some(&normalized_mode)).await; - - if let Err(e) = self - .protocol - .set_mode(SetSessionModeRequest::new( - SessionId::new(session_id.clone()), - normalized_mode.clone(), - )) - .await - { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_mode_id = %normalized_mode, - error = %e, - "acp_set_mode_failed" - ); - return Err(AgentError::from(e)); - } - - let mut session = self.session.write().await; - if session.session_id() != Some(session_id.as_str()) { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_mode_id = %normalized_mode, - confirmed_session_id = %session_id, - active_session_id = ?session.session_id(), - "acp_set_mode_session_changed" - ); - return Err(AgentError::conflict("Active ACP session changed while applying mode")); - } - session.confirm_mode(ModeId::new(&normalized_mode)); - self.commit_session_changes(&mut session).await; - info!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - confirmed_mode_id = %normalized_mode, - "acp_set_mode_confirmed" - ); - Ok(()) - } - - async fn apply_confirmed_model_selection(&self, model_id: &str) -> Result { - let session_id = { - let session = self.session.read().await; - if !session.can_select_model(model_id) { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_model_id = %model_id, - "acp_set_model_rejected_unavailable" - ); - return Err(AgentError::bad_request(format!( - "Model '{model_id}' is not available for this ACP session" - ))); - } - session.session_id().map(ToOwned::to_owned) - } - .ok_or_else(|| { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_model_id = %model_id, - "acp_set_command_missing_session" - ); - AgentError::bad_request("No active session") - })?; - - info!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_model_id = %model_id, - "acp_set_model_requested" - ); - if let Err(e) = self - .protocol - .set_model(SetSessionModelRequest::new( - SessionId::new(session_id.clone()), - model_id.to_owned(), - )) - .await - { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_model_id = %model_id, - error = %e, - "acp_set_model_failed" - ); - return Err(AgentError::from(e)); - } - - let mut session = self.session.write().await; - if session.session_id() != Some(session_id.as_str()) { - warn!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - requested_model_id = %model_id, - confirmed_session_id = %session_id, - active_session_id = ?session.session_id(), - "acp_set_model_session_changed" - ); - return Err(AgentError::conflict("Active ACP session changed while applying model")); - } - session.confirm_model(ModelId::new(model_id)); - let confirmed_model = session - .model_info() - .cloned() - .unwrap_or_else(|| SessionModelState::new(model_id.to_owned(), Vec::new())); - self.commit_session_changes(&mut session).await; - info!( - conversation_id = %self.params.conversation_id, - agent_backend = ?self.params.metadata.backend, - confirmed_model_id = %model_id, - "acp_set_model_confirmed" - ); - Ok(confirmed_model) - } - - /// Set the model for the current session. - pub(crate) async fn set_model(&self, model_id: &str) -> Result<(), AgentError> { - self.apply_confirmed_model_selection(model_id).await?; - Ok(()) - } - - /// Set the model and return the confirmed model state from this write, - /// without re-reading the asynchronously mutable session cache. - pub(crate) async fn set_model_confirmed(&self, model_id: &str) -> Result { - self.apply_confirmed_model_selection(model_id).await - } - /// Return available slash commands from the session aggregate. pub(crate) async fn load_slash_commands(&self) -> Result, AgentError> { let session = self.session.read().await; @@ -990,6 +854,7 @@ impl AcpAgentManager { async fn ensure_session_opened(&self) -> Result { debug!("Ensuring ACP session is opened"); let _lock = self.session_lock.lock().await; + self.ensure_protocol_connected_for_operation("ensure_session_opened")?; let (session_id, opened) = { let s = self.session.read().await; @@ -1004,10 +869,16 @@ impl AcpAgentManager { { let mut s = self.session.write().await; - s.mark_opened(); + let sid = mark_session_opened_after_protocol_ready( + &mut s, + sid, + self.protocol.is_connected(), + &self.params.conversation_id, + self.backend(), + )?; self.commit_session_changes(&mut s).await; + Ok(sid) } - Ok(sid) } /// Initialize or resume a session, then send the user message. @@ -1312,8 +1183,8 @@ mod tests { use crate::agent_runtime::AgentRuntime; use crate::error::AgentError; use crate::manager::acp::{AcpAgentManager, AcpSession}; - use crate::protocol::error::CloseReason; - use crate::shared_kernel::{ConfigKey, ConfigValue}; + use crate::protocol::error::{AcpError, CloseReason}; + use crate::shared_kernel::{ConfigKey, ConfigValue, SessionId as DomainSessionId}; use agent_client_protocol::schema::{AvailableCommand, SessionConfigOptionCategory}; use serde_json::json; use std::collections::HashMap; @@ -1354,6 +1225,31 @@ mod tests { assert_eq!(user_facing_message(&err), "Rate limited"); } + #[test] + fn warmup_does_not_mark_opened_when_protocol_disconnected_after_open() { + let mut session = AcpSession::new(None, None, Default::default()); + session.set_session_id(DomainSessionId::new("sess-disconnected")); + + let err = super::mark_session_opened_after_protocol_ready( + &mut session, + "sess-disconnected".to_owned(), + false, + "conv-test", + Some("codex"), + ) + .expect_err("disconnected protocol must reject the opened transition"); + + assert!( + matches!(err, AgentError::Acp(AcpError::NotConnected)), + "expected AcpError::NotConnected, got {err:?}" + ); + assert_eq!(session.session_id(), Some("sess-disconnected")); + assert!( + !session.is_opened(), + "warmup must not mark the aggregate opened when the protocol is already disconnected" + ); + } + #[test] fn nested_colons_only_strip_first() { // "Bad gateway: Internal error: API Error: ..." → keep everything after the first ": " diff --git a/crates/aionui-ai-agent/src/protocol/acp.rs b/crates/aionui-ai-agent/src/protocol/acp.rs index 444165684..4d966e756 100644 --- a/crates/aionui-ai-agent/src/protocol/acp.rs +++ b/crates/aionui-ai-agent/src/protocol/acp.rs @@ -23,7 +23,7 @@ //! shared connection, each awaited in its own caller task. use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use agent_client_protocol::schema::{ AGENT_METHOD_NAMES, AuthenticateResponse, ClientNotification, ClientRequest, CloseSessionResponse, ExtResponse, @@ -62,6 +62,14 @@ const INIT_TIMEOUT_SECS: u64 = 30; const ACP_CLIENT_NAME: &str = "AionUi"; const ACP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION"); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AcpConnectionPhase { + Starting, + Initializing, + Ready, + ShuttingDown, +} + /// Build the ACP `initialize` request, always populating `clientInfo` with a /// non-empty name and version so downstream agents that require client metadata /// (e.g. Mistral Vibe) accept the request. See issue #3326. @@ -394,6 +402,8 @@ async fn run_sdk_background( let mut init_tx = Some(init_tx); let mut ready_tx = Some(ready_tx); let mut shutdown_rx = Some(shutdown_rx); + let phase = Arc::new(Mutex::new(AcpConnectionPhase::Starting)); + let phase_for_main = Arc::clone(&phase); let result = Client .builder() @@ -439,6 +449,7 @@ async fn run_sdk_background( let init_result = { let req = build_initialize_request(); log_client_request("initialize", &json_str(&req)); + *phase_for_main.lock().unwrap() = AcpConnectionPhase::Initializing; let raw = connection.send_request(req).block_task().await; log_agent_response("initialize", &json_or_err(&raw)); raw.map_err(|e| AcpError::from_sdk(e, "initialize")) @@ -466,11 +477,13 @@ async fn run_sdk_background( // Owner dropped before we became ready — nothing more to do. return Ok(()); } + *phase_for_main.lock().unwrap() = AcpConnectionPhase::Ready; // Step 3 — keep the connection alive until AcpProtocol::drop // releases the shutdown oneshot. if let Some(rx) = shutdown_rx.take() { let _ = rx.await; + *phase_for_main.lock().unwrap() = AcpConnectionPhase::ShuttingDown; } Ok(()) }) @@ -478,9 +491,10 @@ async fn run_sdk_background( alive.store(false, Ordering::Release); + let close_phase = *phase.lock().unwrap(); match result { - Ok(_) => debug!("ACP SDK connection closed normally"), - Err(e) => warn!(error = %ErrorChain(&e), "ACP SDK connection closed with error"), + Ok(_) => debug!(?close_phase, "ACP SDK connection closed normally"), + Err(e) => warn!(?close_phase, error = %ErrorChain(&e), "ACP SDK connection closed with error"), } } diff --git a/crates/aionui-conversation/src/service.rs b/crates/aionui-conversation/src/service.rs index 03af64321..feb025642 100644 --- a/crates/aionui-conversation/src/service.rs +++ b/crates/aionui-conversation/src/service.rs @@ -466,6 +466,10 @@ impl ConversationService { }) } + pub(crate) fn task_manager(&self) -> &Arc { + &self.task_manager + } + pub async fn runtime_summary_for(&self, conversation_id: &str) -> ConversationRuntimeSummary { let agent = self.task_manager.get_task(conversation_id); let has_task = agent.is_some(); diff --git a/crates/aionui-conversation/src/service_ops.rs b/crates/aionui-conversation/src/service_ops.rs index 339ad36c1..a6ad36174 100644 --- a/crates/aionui-conversation/src/service_ops.rs +++ b/crates/aionui-conversation/src/service_ops.rs @@ -9,11 +9,12 @@ use std::path::Component; +use aionui_ai_agent::{AcpError, AgentError}; use aionui_api_types::{ ConfigOptionConfirmation, GetConfigOptionsResponse, SetConfigOptionRequest, SetConfigOptionResponse, SideQuestionRequest, SideQuestionResponse, SlashCommandItem, WorkspaceBrowseQuery, WorkspaceEntry, }; -use aionui_common::ErrorChain; +use aionui_common::{AgentKillReason, ErrorChain}; use tracing::warn; use crate::ConversationError; @@ -50,11 +51,24 @@ impl ConversationService { reason: "value must not be empty".into(), }); } - let response = self - .task(conversation_id)? - .set_config_option(option_id, &req.value) - .await - .map_err(ConversationError::from)?; + let agent = self.task(conversation_id)?; + let response = match agent.set_config_option(option_id, &req.value).await { + Ok(response) => response, + Err(err @ AgentError::Acp(AcpError::NotConnected)) => { + warn!( + conversation_id, + option_id, + reason = ?AgentKillReason::AgentErrorRecovery, + error = %ErrorChain(&err), + "ACP config option failed because protocol is disconnected; evicting task" + ); + self.task_manager() + .kill_and_wait(conversation_id, Some(AgentKillReason::AgentErrorRecovery)) + .await; + return Err(ConversationError::from(err)); + } + Err(err) => return Err(ConversationError::from(err)), + }; // Mirror runtime model/mode switches into the persisted assistant // snapshot + preference so the next conversation seeded from this diff --git a/crates/aionui-conversation/src/service_test.rs b/crates/aionui-conversation/src/service_test.rs index 1d5b26452..2198f9297 100644 --- a/crates/aionui-conversation/src/service_test.rs +++ b/crates/aionui-conversation/src/service_test.rs @@ -9,7 +9,7 @@ use std::time::Duration; use aionui_ai_agent::agent_task::{AgentInstance, IAgentTask, IMockAgent}; use aionui_ai_agent::protocol::events::{AgentStreamEvent, ErrorEventData, FinishEventData, TextEventData}; use aionui_ai_agent::types::{BuildTaskOptions, SendMessageData}; -use aionui_ai_agent::{AgentError, AgentSendError, IWorkerTaskManager}; +use aionui_ai_agent::{AcpError, AgentError, AgentSendError, IWorkerTaskManager}; use crate::response_middleware::{CronCommandResult, CronCreateParams, CronUpdateParams, ICronService}; use aionui_api_types::{ @@ -1571,8 +1571,8 @@ struct MockAgent { model_id: Mutex, config_options: Arc>>, set_config_option_calls: Arc>>, + set_config_option_error: Arc>>, set_config_option_response: Arc>>, - keep_reported_model_on_set: bool, confirmations: Mutex>, approval_memory: Mutex>, allow_direct_confirm: bool, @@ -1610,8 +1610,8 @@ impl MockAgent { model_id: Mutex::new("model-a".to_owned()), config_options: Arc::new(Mutex::new(Vec::new())), set_config_option_calls: Arc::new(Mutex::new(Vec::new())), + set_config_option_error: Arc::new(Mutex::new(None)), set_config_option_response: Arc::new(Mutex::new(None)), - keep_reported_model_on_set: false, confirmations: Mutex::new(vec![]), approval_memory: Mutex::new(std::collections::HashMap::new()), allow_direct_confirm: false, @@ -1629,8 +1629,8 @@ impl MockAgent { model_id: Mutex::new("model-a".to_owned()), config_options: Arc::new(Mutex::new(Vec::new())), set_config_option_calls: Arc::new(Mutex::new(Vec::new())), + set_config_option_error: Arc::new(Mutex::new(None)), set_config_option_response: Arc::new(Mutex::new(None)), - keep_reported_model_on_set: false, confirmations: Mutex::new(confirmations), approval_memory: Mutex::new(std::collections::HashMap::new()), allow_direct_confirm: false, @@ -1648,8 +1648,8 @@ impl MockAgent { model_id: Mutex::new("model-a".to_owned()), config_options: Arc::new(Mutex::new(Vec::new())), set_config_option_calls: Arc::new(Mutex::new(Vec::new())), + set_config_option_error: Arc::new(Mutex::new(None)), set_config_option_response: Arc::new(Mutex::new(None)), - keep_reported_model_on_set: false, confirmations: Mutex::new(vec![]), approval_memory: Mutex::new(std::collections::HashMap::new()), allow_direct_confirm: true, @@ -1666,6 +1666,11 @@ impl MockAgent { *self.set_config_option_response.lock().unwrap() = Some(response); self } + + fn with_set_config_option_error(self, error: AgentError) -> Self { + *self.set_config_option_error.lock().unwrap() = Some(error); + self + } } #[async_trait::async_trait] @@ -1747,28 +1752,11 @@ impl IMockAgent for MockAgent { }) } - async fn set_mode(&self, mode: &str) -> Result<(), AgentError> { - *self.mode.lock().unwrap() = mode.to_owned(); - Ok(()) - } - async fn get_model(&self) -> Result { let current = self.model_id.lock().unwrap().clone(); Ok(Self::build_model_response(¤t)) } - async fn set_model(&self, model_id: &str) -> Result<(), AgentError> { - if !self.keep_reported_model_on_set { - *self.model_id.lock().unwrap() = model_id.to_owned(); - } - Ok(()) - } - - async fn set_model_confirmed(&self, model_id: &str) -> Result { - self.set_model(model_id).await?; - Ok(Self::build_model_response(model_id)) - } - async fn get_config_options(&self) -> Result { Ok(GetConfigOptionsResponse { config_options: self.config_options.lock().unwrap().clone(), @@ -1780,6 +1768,9 @@ impl IMockAgent for MockAgent { .lock() .unwrap() .push((option_id.to_owned(), value.to_owned())); + if let Some(error) = self.set_config_option_error.lock().unwrap().take() { + return Err(error); + } if let Some(response) = self.set_config_option_response.lock().unwrap().clone() { return Ok(response); } @@ -2476,6 +2467,36 @@ async fn set_config_option_returns_observed_confirmation() { ); } +#[tokio::test] +async fn set_config_option_evicts_task_when_acp_protocol_is_not_connected() { + let task_mgr = Arc::new(MockTaskManager::new()); + let (svc, _broadcaster, _repo) = make_service_with_mock_task_manager(task_mgr.clone()); + let conv = svc.create("user_1", make_create_req()).await.unwrap(); + let agent = + Arc::new(MockAgent::new(&conv.id).with_set_config_option_error(AgentError::Acp(AcpError::NotConnected))); + task_mgr.insert_agent(&conv.id, AgentInstance::Mock(agent)); + + let err = svc + .set_config_option( + &conv.id, + "model", + SetConfigOptionRequest { + value: "gpt-5".to_owned(), + }, + ) + .await + .expect_err("set_config_option must surface ACP NotConnected"); + + assert!( + matches!(err, ConversationError::Acp(AcpError::NotConnected)), + "expected ACP NotConnected, got {err:?}" + ); + assert_eq!( + task_mgr.kill_records(), + vec![(conv.id.clone(), Some(AgentKillReason::AgentErrorRecovery))] + ); +} + #[tokio::test] async fn command_ack_does_not_persist_assistant_preference_in_core_service() { let task_mgr = Arc::new(MockTaskManager::new()); diff --git a/crates/aionui-cron/src/executor.rs b/crates/aionui-cron/src/executor.rs index 34e0594c8..2aa1109be 100644 --- a/crates/aionui-cron/src/executor.rs +++ b/crates/aionui-cron/src/executor.rs @@ -885,7 +885,7 @@ impl JobExecutor { } agent - .set_mode(desired_mode) + .set_config_option("mode", desired_mode) .await .map_err(|e| CronError::Scheduler(format!("set session mode to {desired_mode}: {e}")))?; @@ -1224,7 +1224,7 @@ mod tests { use aionui_ai_agent::agent_task::{AgentInstance, IAgentTask, IMockAgent}; use aionui_ai_agent::protocol::events::FinishEventData; use aionui_ai_agent::types::BuildTaskOptions; - use aionui_api_types::{AgentModeResponse, WebSocketMessage}; + use aionui_api_types::{AgentModeResponse, ConfigOptionConfirmation, SetConfigOptionResponse, WebSocketMessage}; use aionui_common::{AgentKillReason, ConversationStatus, PaginatedResult, TimestampMs}; use aionui_db::{ ConversationArtifactRow, ConversationFilters, ConversationRowUpdate, MessageRowUpdate, MessageSearchRow, @@ -2394,11 +2394,23 @@ mod tests { }) } - async fn set_mode(&self, mode: &str) -> Result<(), aionui_ai_agent::AgentError> { + async fn set_config_option( + &self, + option_id: &str, + value: &str, + ) -> Result { + if option_id != "mode" { + return Err(aionui_ai_agent::AgentError::bad_request(format!( + "unsupported config option: {option_id}" + ))); + } self.set_mode_calls.fetch_add(1, Ordering::Relaxed); let mut guard = self.mode.write().await; - *guard = mode.to_owned(); - Ok(()) + *guard = value.to_owned(); + Ok(SetConfigOptionResponse { + confirmation: ConfigOptionConfirmation::Observed, + config_options: None, + }) } } diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index 986309948..2ad61abe6 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -4,7 +4,7 @@ pub(crate) mod spawn_support; use std::path::PathBuf; use std::sync::{Arc, Weak}; -use aionui_ai_agent::IWorkerTaskManager; +use aionui_ai_agent::{AgentError, AgentInstance, IWorkerTaskManager}; use aionui_api_types::{ AddAgentRequest, CreateTeamRequest, GuideMcpConfig, TeamAgentResponse, TeamMcpPhase, TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunTargetRole, WebSocketMessage, @@ -869,7 +869,7 @@ impl TeamSessionService { for agent in &team.agents { if let Some(instance) = self.task_manager.get_task(&agent.conversation_id) - && let Err(e) = instance.set_mode(mode).await + && let Err(e) = set_active_agent_session_mode(&instance, mode).await { warn!( team_id, @@ -1002,6 +1002,15 @@ impl TeamSessionService { } } +async fn set_active_agent_session_mode(instance: &AgentInstance, mode: &str) -> Result<(), AgentError> { + #[allow(unreachable_patterns)] + match instance { + AgentInstance::Acp(_) => instance.set_config_option("mode", mode).await.map(|_| ()), + AgentInstance::Aionrs(manager) => manager.set_mode(mode).await, + _ => instance.set_config_option("mode", mode).await.map(|_| ()), + } +} + #[cfg(test)] mod tests { use crate::test_utils::workspace_harness::{