diff --git a/crates/aionui-ai-agent/src/agent_task.rs b/crates/aionui-ai-agent/src/agent_task.rs index eb869c6f2..230369f01 100644 --- a/crates/aionui-ai-agent/src/agent_task.rs +++ b/crates/aionui-ai-agent/src/agent_task.rs @@ -240,6 +240,26 @@ impl AgentInstance { } } + /// Whether the connected ACP CLI advertised `session/fork`. + pub async fn acp_supports_session_fork(&self) -> bool { + match self { + Self::Acp(m) => m.supports_session_fork().await, + _ => false, + } + } + + /// Warm the parent session and return its ACP session id (for side fork). + pub async fn acp_ensure_warm_session_id(&self) -> Result { + match self { + Self::Acp(m) => { + m.warmup_session().await?; + let sid = m.current_session_id().await; + sid.ok_or_else(|| AgentError::internal("ACP session id missing after warmup")) + } + _ => Err(AgentError::bad_request("Side fork requires an ACP parent")), + } + } + // ── Cross-variant semi-specific helpers ────────────────────────── // // These fan out to inherent methods on concrete managers. Variants diff --git a/crates/aionui-ai-agent/src/factory/acp_assembler.rs b/crates/aionui-ai-agent/src/factory/acp_assembler.rs index 9fee2c5b2..949dd8497 100644 --- a/crates/aionui-ai-agent/src/factory/acp_assembler.rs +++ b/crates/aionui-ai-agent/src/factory/acp_assembler.rs @@ -236,6 +236,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let servers = resolve_mcp_servers(&config, "conv-1", Vec::new()); assert_eq!(servers.len(), 1); @@ -268,6 +270,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let servers = resolve_mcp_servers(&config, "conv-1", Vec::new()); assert_eq!(servers.len(), 1); @@ -300,6 +304,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let servers = resolve_mcp_servers(&config, "conv-1", Vec::new()); assert!(servers.is_empty()); @@ -329,6 +335,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let user = vec![user_stdio("ctx7"), user_stdio("playwright")]; let servers = resolve_mcp_servers(&config, "conv-1", user); @@ -370,6 +378,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let user = vec![user_stdio("ctx7")]; let servers = resolve_mcp_servers(&config, "conv-1", user); @@ -408,6 +418,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let user = vec![user_stdio("ctx7")]; let servers = resolve_mcp_servers(&config, "conv-1", user); @@ -444,6 +456,8 @@ mod tests { mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let servers = resolve_mcp_servers(&config, "conv-1", Vec::new()); assert!(servers.is_empty()); diff --git a/crates/aionui-ai-agent/src/manager/acp/agent.rs b/crates/aionui-ai-agent/src/manager/acp/agent.rs index 9a41a0dbc..d4cd940ff 100644 --- a/crates/aionui-ai-agent/src/manager/acp/agent.rs +++ b/crates/aionui-ai-agent/src/manager/acp/agent.rs @@ -672,8 +672,21 @@ impl AcpAgentManager { (s.session_id().map(ToOwned::to_owned), s.is_opened()) }; + let fork_parent = self + .params + .config + .fork_parent_session_id + .as_deref() + .filter(|s| !s.is_empty()); + let sid = match (session_id, opened) { - (None, _) => self.open_session_new().await?, + (None, _) => { + if let Some(parent_sid) = fork_parent { + self.open_session_fork(parent_sid).await? + } else { + self.open_session_new().await? + } + } (Some(sid), false) => self.open_session_resume(&sid).await?, (Some(sid), true) => sid, }; @@ -724,6 +737,11 @@ impl AcpAgentManager { .await } + /// Current ACP session id if the CLI has assigned one. + pub async fn current_session_id(&self) -> Option { + self.session.read().await.session_id().map(|id| id.to_string()) + } + /// Pre-open the ACP session without sending a prompt. Called by the /// factory after `AcpAgentManager::build` so `POST /warmup` returns /// only after the session is ready to accept `set_mode` / `set_model` diff --git a/crates/aionui-ai-agent/src/manager/acp/agent_session_flow.rs b/crates/aionui-ai-agent/src/manager/acp/agent_session_flow.rs index 44c7386fc..8262ebb89 100644 --- a/crates/aionui-ai-agent/src/manager/acp/agent_session_flow.rs +++ b/crates/aionui-ai-agent/src/manager/acp/agent_session_flow.rs @@ -9,7 +9,9 @@ use crate::protocol::events::{ use crate::protocol::send_error::AgentSendError; use crate::shared_kernel::SessionId as DomainSessionId; use crate::types::SendMessageData; -use agent_client_protocol::schema::{ContentBlock, LoadSessionRequest, PromptRequest, SessionId, StopReason}; +use agent_client_protocol::schema::{ + ContentBlock, ForkSessionRequest, LoadSessionRequest, PromptRequest, SessionId, StopReason, +}; use aionui_api_types::SlashCommandItem; use serde_json::Value; use tokio::sync::broadcast::error::TryRecvError; @@ -73,6 +75,51 @@ impl AcpAgentManager { Ok(sid) } + /// Whether the connected CLI advertised `session/fork` during initialize. + pub(crate) async fn supports_session_fork(&self) -> bool { + self.session + .read() + .await + .agent_capabilities() + .and_then(|c| c.session_capabilities.fork.as_ref()) + .is_some() + } + + /// Fork an existing session into a new session id (ACP `session/fork`). + pub(super) async fn open_session_fork(&self, parent_session_id: &str) -> Result { + use std::path::PathBuf; + + let req = ForkSessionRequest::new( + SessionId::new(parent_session_id.to_owned()), + PathBuf::from(&self.params.workspace.path), + ); + let resp = self.protocol.fork_session(req).await?; + let sid = resp.session_id.to_string(); + + { + let mut session = self.session.write().await; + if let Some(models) = resp.models { + session.apply_advertised_models(models); + } + if let Some(modes) = resp.modes { + session.apply_advertised_modes(modes); + } + if let Some(config_options) = resp.config_options { + session.apply_advertised_config_options(config_options); + } + session.set_session_id(DomainSessionId::new(sid.clone())); + session.mark_pending_session_new_prelude(); + self.commit_session_changes(&mut session).await; + } + self.emit_snapshot_events().await; + self.runtime + .emit(AgentStreamEvent::SessionAssigned(SessionAssignedEventData { + session_id: sid.clone(), + })); + self.reconcile_session(&sid).await?; + Ok(sid) + } + /// Drop the in-aggregate session id and re-run `open_session_new`. /// Used as the rescue path when resume helpers see `SessionNotFound`. /// Emits a `warn!` so ops can still see the original failure that diff --git a/crates/aionui-ai-agent/tests/acp_agent_integration.rs b/crates/aionui-ai-agent/tests/acp_agent_integration.rs index 61f7c11ed..a67ca0532 100644 --- a/crates/aionui-ai-agent/tests/acp_agent_integration.rs +++ b/crates/aionui-ai-agent/tests/acp_agent_integration.rs @@ -75,6 +75,8 @@ async fn make_mock_agent(script: &str, backend: &str) -> (Arc, mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; let tmp_skills = tempfile::TempDir::new().unwrap(); diff --git a/crates/aionui-ai-agent/tests/prompt_pipeline_integration.rs b/crates/aionui-ai-agent/tests/prompt_pipeline_integration.rs index 8756f8315..046f596fd 100644 --- a/crates/aionui-ai-agent/tests/prompt_pipeline_integration.rs +++ b/crates/aionui-ai-agent/tests/prompt_pipeline_integration.rs @@ -51,6 +51,8 @@ async fn fixture_params( mcp_server_ids: None, session_mcp_servers: vec![], user_id: None, + fork_parent_session_id: None, + fork_mode: None, }; Arc::new( diff --git a/crates/aionui-api-types/src/agent_build_extra.rs b/crates/aionui-api-types/src/agent_build_extra.rs index aaf937ab1..8b2920924 100644 --- a/crates/aionui-api-types/src/agent_build_extra.rs +++ b/crates/aionui-api-types/src/agent_build_extra.rs @@ -73,6 +73,12 @@ pub struct AcpBuildExtra { pub session_mcp_servers: Vec, #[serde(default)] pub user_id: Option, + /// Parent ACP session id for side conversations using `session/fork`. + #[serde(default)] + pub fork_parent_session_id: Option, + /// `agent_fork` | `text_snapshot` — set on side child rows. + #[serde(default)] + pub fork_mode: Option, } /// Aionrs-specific fields extracted from `extra` in build task options. diff --git a/crates/aionui-api-types/src/conversation.rs b/crates/aionui-api-types/src/conversation.rs index b3cd85592..56a7599c3 100644 --- a/crates/aionui-api-types/src/conversation.rs +++ b/crates/aionui-api-types/src/conversation.rs @@ -48,6 +48,31 @@ pub struct UpdateConversationRequest { pub extra: Option, } +/// Body for `POST /api/conversations/:id/side`. +#[derive(Debug, Deserialize)] +pub struct CreateSideConversationRequest { + pub guardrail: Option, + pub initial_prompt: Option, + pub forked_at_msg_id: Option, +} + +/// Response for `POST /api/conversations/:id/side`. +#[derive(Debug, Serialize)] +pub struct CreateSideConversationResponse { + pub conversation_id: String, + /// Always `true` in v0.2 — each open creates a new side tab. + pub created: bool, + pub fork_mode: SideForkMode, +} + +/// How the side child session inherited parent context. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SideForkMode { + AgentFork, + TextSnapshot, +} + /// Body for `POST /api/conversations/clone`. /// /// Despite the name, this endpoint no longer supports cloning from an diff --git a/crates/aionui-api-types/src/lib.rs b/crates/aionui-api-types/src/lib.rs index 4c05c4eee..e27a76ea0 100644 --- a/crates/aionui-api-types/src/lib.rs +++ b/crates/aionui-api-types/src/lib.rs @@ -68,9 +68,10 @@ pub use conversation::{ ConversationArtifactKind, ConversationArtifactListResponse, ConversationArtifactResponse, ConversationArtifactStatus, ConversationListResponse, ConversationMcpStatus, ConversationMcpStatusKind, ConversationResponse, ConversationRuntimeStateKind, ConversationRuntimeSummary, CreateConversationRequest, - ListConversationsQuery, ListMessagesQuery, MessageListResponse, MessageResponse, MessageSearchItem, - MessageSearchResponse, SearchMessagesQuery, SendMessageRequest, SendMessageResponse, - UpdateConversationArtifactRequest, UpdateConversationRequest, + CreateSideConversationRequest, CreateSideConversationResponse, ListConversationsQuery, ListMessagesQuery, + MessageListResponse, MessageResponse, MessageSearchItem, MessageSearchResponse, SearchMessagesQuery, + SendMessageRequest, SendMessageResponse, SideForkMode, UpdateConversationArtifactRequest, + UpdateConversationRequest, }; pub use cron::{ CreateCronJobRequest, CronAgentConfigDto, CronJobExecutedEvent, CronJobMetadataDto, CronJobPayloadDto, diff --git a/crates/aionui-conversation/src/convert.rs b/crates/aionui-conversation/src/convert.rs index 7e8e8519e..5119e9b07 100644 --- a/crates/aionui-conversation/src/convert.rs +++ b/crates/aionui-conversation/src/convert.rs @@ -84,7 +84,7 @@ pub fn row_to_response_with_extra( /// field that can be an array of model objects. The backend only needs /// `provider_id`, `model` (the selected model name), and `use_model`. /// Accepts both snake_case and legacy camelCase key names for backward compatibility. -fn parse_provider_with_model(s: &str) -> Result { +pub(crate) fn parse_provider_with_model(s: &str) -> Result { let v: serde_json::Value = serde_json::from_str(s).map_err(|e| ConversationError::internal(format!("Invalid model JSON: {e}")))?; diff --git a/crates/aionui-conversation/src/lib.rs b/crates/aionui-conversation/src/lib.rs index a126fb198..6155206a7 100644 --- a/crates/aionui-conversation/src/lib.rs +++ b/crates/aionui-conversation/src/lib.rs @@ -14,6 +14,7 @@ mod runtime_persistence; pub mod runtime_state; pub mod service; mod service_ops; +mod service_side; pub(crate) mod session_context; pub mod skill_resolver; pub mod skill_snapshot; diff --git a/crates/aionui-conversation/src/routes_aux.rs b/crates/aionui-conversation/src/routes_aux.rs index 1a9db053d..6f928bb42 100644 --- a/crates/aionui-conversation/src/routes_aux.rs +++ b/crates/aionui-conversation/src/routes_aux.rs @@ -2,7 +2,8 @@ use crate::state::ConversationRouterState; use aionui_api_types::{ - AgentModeResponse, ApiResponse, GetModelInfoResponse, SetModeRequest, SetModelRequest, SideQuestionRequest, + AgentModeResponse, ApiResponse, ConversationResponse, CreateSideConversationRequest, + CreateSideConversationResponse, GetModelInfoResponse, SetModeRequest, SetModelRequest, SideQuestionRequest, SideQuestionResponse, SlashCommandItem, WorkspaceBrowseQuery, WorkspaceEntry, }; use aionui_auth::CurrentUser; @@ -10,6 +11,7 @@ use aionui_common::ApiError; use axum::Router; use axum::extract::rejection::JsonRejection; use axum::extract::{Extension, Json, Path, Query, State}; +use axum::http::StatusCode; use axum::routing::{get, post}; /// Build the conversation-ops router (no auth layer applied — the caller is @@ -17,6 +19,7 @@ use axum::routing::{get, post}; pub fn conversation_ops_routes(state: ConversationRouterState) -> Router { Router::new() .route("/api/conversations/{id}/side-question", post(side_question)) + .route("/api/conversations/{id}/side", get(list_side).post(create_side)) .route("/api/conversations/{id}/slash-commands", get(get_slash_commands)) .route("/api/conversations/{id}/usage", get(get_usage)) .route("/api/conversations/{id}/mode", get(get_mode).put(set_mode)) @@ -96,6 +99,34 @@ async fn side_question( ))) } +async fn create_side( + State(state): State, + Extension(user): Extension, + Path(id): Path, + Json(req): Json, +) -> Result<(StatusCode, Json>), ApiError> { + let resp = state + .service + .create_side_conversation(&user.id, &id, req, &state.task_manager) + .await + .map_err(ApiError::from)?; + Ok((StatusCode::CREATED, Json(ApiResponse::ok(resp)))) +} + +async fn list_side( + State(state): State, + Extension(user): Extension, + Path(id): Path, +) -> Result>>, ApiError> { + Ok(Json(ApiResponse::ok( + state + .service + .list_side_conversations(&user.id, &id) + .await + .map_err(ApiError::from)?, + ))) +} + async fn get_slash_commands( State(state): State, Extension(_user): Extension, diff --git a/crates/aionui-conversation/src/service.rs b/crates/aionui-conversation/src/service.rs index d4060ddbd..9a40bcc14 100644 --- a/crates/aionui-conversation/src/service.rs +++ b/crates/aionui-conversation/src/service.rs @@ -942,6 +942,8 @@ impl ConversationService { .filter(|r| r.user_id == user_id) .ok_or_else(|| ConversationError::NotFound { id: id.to_owned() })?; + self.delete_ephemeral_side_children(user_id, id).await?; + let source: Option = existing .source .as_deref() @@ -1458,6 +1460,9 @@ impl ConversationService { info!(msg_id = %user_msg_id, "User message persisted"); + let child_extra: serde_json::Value = serde_json::from_str(&row.extra).unwrap_or_else(|_| serde_json::json!({})); + let agent_content = self.enrich_side_agent_content(&child_extra, &req.content).await?; + self.broadcaster.broadcast(WebSocketMessage::new( "message.userCreated", serde_json::json!({ @@ -1499,11 +1504,17 @@ impl ConversationService { self.ensure_auto_workspace_skill_links(&row, &build_opts).await; let stored_workspace = build_opts.context.workspace.stored_path.clone(); + let agent_request = SendMessageRequest { + content: agent_content, + files: req.files, + inject_skills: req.inject_skills, + hidden: req.hidden, + }; let user_msg_id_ret = user_msg_id.clone(); ConversationTurnOrchestrator::new(self.clone(), Arc::clone(task_manager)).spawn_user_turn(TurnStartInput { user_id: user_id.to_owned(), conversation: row, - request: req, + request: agent_request, build_options: build_opts, stored_workspace, turn_id: turn_id.clone(), diff --git a/crates/aionui-conversation/src/service_side.rs b/crates/aionui-conversation/src/service_side.rs new file mode 100644 index 000000000..8a217b854 --- /dev/null +++ b/crates/aionui-conversation/src/service_side.rs @@ -0,0 +1,598 @@ +//! Side-conversation fork primitive (`POST /api/conversations/:id/side`). +//! +//! v0.2: multi-tab sides, dual fork paths (`agent_fork` | `text_snapshot`). + +use std::sync::Arc; + +use aionui_ai_agent::IWorkerTaskManager; +use aionui_api_types::{ + ConversationResponse, CreateConversationRequest, CreateSideConversationRequest, CreateSideConversationResponse, + SendMessageRequest, SideForkMode, +}; +use aionui_common::{AgentType, TimestampMs, now_ms}; +use aionui_db::SortOrder; +use serde_json::{Value, json}; +use tracing::{info, warn}; + +use crate::ConversationError; +use crate::service::ConversationService; + +/// Safety cap when building a one-time parent transcript snapshot (path B). +const PARENT_SNAPSHOT_PAGE_SIZE: u32 = 100; + +impl ConversationService { + /// Restore side children for a parent row. + #[tracing::instrument(skip_all, fields(parent_id = %parent_id))] + pub async fn list_side_conversations( + &self, + user_id: &str, + parent_id: &str, + ) -> Result, ConversationError> { + self.conversation_repo() + .get(parent_id) + .await? + .filter(|r| r.user_id == user_id) + .ok_or_else(|| ConversationError::NotFound { + id: parent_id.to_owned(), + })?; + + let children = self.conversation_repo().list_side_children(user_id, parent_id).await?; + let mut responses = Vec::with_capacity(children.len()); + for child in children { + match self.get(user_id, &child.id).await { + Ok(resp) => responses.push(resp), + Err(err) => warn!(%err, child_id = %child.id, "Failed to restore side child"), + } + } + Ok(responses) + } + + /// Fork a new side conversation from a parent row (always creates a new child). + #[tracing::instrument(skip_all, fields(parent_id = %parent_id))] + pub async fn create_side_conversation( + &self, + user_id: &str, + parent_id: &str, + req: CreateSideConversationRequest, + task_manager: &Arc, + ) -> Result { + let parent = self + .conversation_repo() + .get(parent_id) + .await? + .filter(|r| r.user_id == user_id) + .ok_or_else(|| ConversationError::NotFound { + id: parent_id.to_owned(), + })?; + + let parent_extra: Value = serde_json::from_str(&parent.extra).unwrap_or_else(|_| json!({})); + let parent_type: AgentType = crate::convert::string_to_enum(&parent.r#type)?; + + if !is_side_supported_parent_type(parent_type) { + return Err(ConversationError::BadRequest { + reason: "Side conversation is not supported for this agent type".into(), + }); + } + + let (fork_mode, fork_parent_session_id) = self + .resolve_fork_strategy(parent_type, &parent, &parent_extra, task_manager) + .await?; + + let bootstrap = match fork_mode { + SideForkMode::AgentFork => build_side_fork_boundary_message(&parent, &parent_extra, &req), + SideForkMode::TextSnapshot => { + let transcript = self.build_parent_reference_transcript(parent_id).await?; + build_side_snapshot_bootstrap_message(&parent, &parent_extra, &req, &transcript) + } + }; + let create_req = build_child_create_request( + &parent, + &parent_extra, + parent_type, + &req, + fork_mode, + fork_parent_session_id.as_deref(), + &bootstrap, + )?; + let child = self.create(user_id, create_req).await?; + let child_id = child.id.clone(); + + self.insert_hidden_context_message(&child_id, &bootstrap, now_ms()) + .await?; + + if let Some(prompt) = req.initial_prompt.as_ref().map(|s| s.trim()).filter(|s| !s.is_empty()) { + self.send_message( + user_id, + &child_id, + SendMessageRequest { + content: prompt.to_owned(), + files: Vec::new(), + inject_skills: Vec::new(), + hidden: false, + }, + task_manager, + ) + .await?; + } + + info!(parent_id, child_id = %child.id, ?fork_mode, "Side conversation created"); + Ok(CreateSideConversationResponse { + conversation_id: child_id, + created: true, + fork_mode, + }) + } + + /// Prefix agent input with parent snapshot — **only** for legacy rows without `fork_mode`. + pub(super) async fn enrich_side_agent_content( + &self, + child_extra: &Value, + user_content: &str, + ) -> Result { + if !is_side_conversation_extra(child_extra) { + return Ok(user_content.to_owned()); + } + // v0.2 rows carry `fork_mode`; snapshot is one-time at create — no per-turn enrich. + if child_extra.get("fork_mode").is_some() { + return Ok(user_content.to_owned()); + } + + let Some(parent_id) = child_extra + .get("parent_conversation_id") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + else { + return Ok(user_content.to_owned()); + }; + + let transcript = self.build_parent_reference_transcript(parent_id).await?; + let workspace = child_extra + .get("workspace") + .and_then(|v| v.as_str()) + .unwrap_or("(shared with parent)"); + + let reference = if transcript.trim().is_empty() { + format!( + "[主线程参考 · 只读 · 本回合自动刷新]\n\ + 主会话 ID: {parent_id}\n\ + 工作区: {workspace}\n\ + (主会话暂无可引用的文本消息;请结合工作区与侧边栏左侧主线程 UI 判断进展。)" + ) + } else { + format!( + "[主线程参考 · 只读 · 本回合自动刷新]\n\ + 主会话 ID: {parent_id}\n\ + 工作区: {workspace}\n\n\ + {transcript}" + ) + }; + + Ok(format!("{reference}\n\n---\n\n{user_content}")) + } + + async fn build_parent_reference_transcript(&self, parent_id: &str) -> Result { + let mut page = 1u32; + let mut lines = Vec::new(); + loop { + let batch = self + .conversation_repo() + .get_messages(parent_id, page, PARENT_SNAPSHOT_PAGE_SIZE, SortOrder::Asc) + .await?; + for row in batch.items { + if row.hidden { + continue; + } + if !is_reference_snapshot_message_type(&row.r#type) { + continue; + } + let content = extract_message_text(&row.content); + if content.trim().is_empty() { + continue; + } + let role = match row.position.as_deref() { + Some("right") => "用户", + _ => "助手", + }; + lines.push(format!("{role}: {content}")); + } + if !batch.has_more { + break; + } + page += 1; + } + Ok(lines.join("\n")) + } + + async fn insert_hidden_context_message( + &self, + conversation_id: &str, + body: &str, + created_at: TimestampMs, + ) -> Result<(), ConversationError> { + let msg_id = Self::mint_msg_id(); + let row = aionui_db::models::MessageRow { + id: msg_id.clone(), + conversation_id: conversation_id.to_owned(), + msg_id: Some(msg_id), + r#type: "text".into(), + content: json!({ "content": body }).to_string(), + position: Some("left".into()), + status: Some("finish".into()), + hidden: true, + created_at, + }; + self.conversation_repo().insert_message(&row).await?; + Ok(()) + } + + /// When deleting a parent, cascade-delete ephemeral side children. + pub(super) async fn delete_ephemeral_side_children( + &self, + user_id: &str, + parent_id: &str, + ) -> Result<(), ConversationError> { + let children = self.conversation_repo().list_side_children(user_id, parent_id).await?; + for child in children { + let child_extra: Value = serde_json::from_str(&child.extra).unwrap_or_else(|_| json!({})); + if child_extra.get("side_mode").and_then(|v| v.as_bool()) == Some(true) + && child_extra.get("ephemeral").and_then(|v| v.as_bool()) != Some(false) + && child.user_id == user_id + && let Err(err) = self.conversation_repo().delete(&child.id).await + { + warn!(%err, child_id = %child.id, "Failed to delete ephemeral side child"); + } + } + Ok(()) + } + + async fn resolve_fork_strategy( + &self, + parent_type: AgentType, + parent: &aionui_db::models::ConversationRow, + parent_extra: &Value, + task_manager: &Arc, + ) -> Result<(SideForkMode, Option), ConversationError> { + match parent_type { + AgentType::Aionrs => Ok((SideForkMode::TextSnapshot, None)), + AgentType::Acp => { + let backend = parent_extra.get("backend").and_then(|v| v.as_str()).unwrap_or(""); + // Snapshot backends never call session/fork — skip warming the parent CLI. + if !acp_backend_has_spec_session_fork(backend) { + return Ok((SideForkMode::TextSnapshot, None)); + } + let opts = self.build_task_options(parent).await?; + let parent_id = parent.id.as_str(); + let instance = task_manager.get_or_build_task(parent_id, opts).await?; + match instance.acp_ensure_warm_session_id().await { + Ok(parent_sid) => Ok((SideForkMode::AgentFork, Some(parent_sid))), + Err(err) => Err(ConversationError::BadRequest { + reason: format!("Side session fork requires a ready parent ACP session: {err}"), + }), + } + } + _ => Err(ConversationError::BadRequest { + reason: "Side conversation is not supported for this agent type".into(), + }), + } + } +} + +pub(super) fn is_side_conversation_extra(extra: &Value) -> bool { + extra.get("side_mode").and_then(|v| v.as_bool()) == Some(true) +} + +fn is_side_supported_parent_type(parent_type: AgentType) -> bool { + matches!(parent_type, AgentType::Acp | AgentType::Aionrs) +} + +fn is_reference_snapshot_message_type(message_type: &str) -> bool { + message_type == "text" +} + +/// ACP backends audited in side-conversation spec §3.5 as implementing `session/fork`. +/// For these, path A is the product default whenever the parent session is warm — +/// not gated on a flaky `sessionCapabilities.fork` field in every adapter build. +fn acp_backend_has_spec_session_fork(backend: &str) -> bool { + matches!(backend, "claude" | "opencode" | "vibe") +} + +#[cfg(test)] +mod fork_policy_tests { + use super::{acp_backend_has_spec_session_fork, is_reference_snapshot_message_type}; + + #[test] + fn fork_backends_match_spec_section_3_5() { + assert!(acp_backend_has_spec_session_fork("claude")); + assert!(acp_backend_has_spec_session_fork("opencode")); + assert!(acp_backend_has_spec_session_fork("vibe")); + assert!(!acp_backend_has_spec_session_fork("codex")); + assert!(!acp_backend_has_spec_session_fork("gemini")); + } + + #[test] + fn reference_snapshot_only_uses_visible_text_messages() { + assert!(is_reference_snapshot_message_type("text")); + assert!(!is_reference_snapshot_message_type("thinking")); + assert!(!is_reference_snapshot_message_type("tool")); + } +} + +fn build_child_create_request( + parent: &aionui_db::models::ConversationRow, + parent_extra: &Value, + parent_type: AgentType, + req: &CreateSideConversationRequest, + fork_mode: SideForkMode, + fork_parent_session_id: Option<&str>, + side_context: &str, +) -> Result { + let child_extra = sanitize_child_extra( + parent_extra, + &parent.id, + parent_type, + req, + fork_mode, + fork_parent_session_id, + side_context, + ); + + let display_name = if parent.name.trim().is_empty() { + "Side".to_owned() + } else { + format!("↳ {}", parent.name) + }; + + let model = parent + .model + .as_deref() + .and_then(|raw| crate::convert::parse_provider_with_model(raw).ok()); + + Ok(CreateConversationRequest { + r#type: parent_type, + name: Some(display_name), + model, + source: parent + .source + .as_deref() + .and_then(|s| crate::convert::string_to_enum(s).ok()), + channel_chat_id: parent.channel_chat_id.clone(), + extra: child_extra, + }) +} + +/// Copy only fork-safe fields. Do not clone immutable post-create snapshots (`skills`, MCP_*). +fn sanitize_child_extra( + parent_extra: &Value, + parent_id: &str, + parent_type: AgentType, + req: &CreateSideConversationRequest, + fork_mode: SideForkMode, + fork_parent_session_id: Option<&str>, + side_context: &str, +) -> Value { + let mut obj = serde_json::Map::new(); + if let Some(parent) = parent_extra.as_object() { + for key in [ + "workspace", + "backend", + "agent_name", + "agent_id", + "cli_path", + "session_mode", + "current_model_id", + "preset_context", + "system_prompt", + "preset_rules", + "max_tokens", + "max_turns", + "gateway", + "remote_agent_id", + "remoteAgentId", + ] { + if let Some(value) = parent.get(key) { + obj.insert(key.to_owned(), value.clone()); + } + } + if let Some(skills) = parent.get("skills").and_then(|v| v.as_array()) { + obj.insert("preset_enabled_skills".to_owned(), Value::Array(skills.clone())); + } + } + + obj.insert("parent_conversation_id".into(), json!(parent_id)); + obj.insert("side_mode".into(), json!(true)); + obj.insert("ephemeral".into(), json!(true)); + let guardrail = req.guardrail.as_deref().unwrap_or("reference_readonly"); + obj.insert("side_guardrail".into(), json!(guardrail)); + obj.insert( + "fork_mode".into(), + json!(match fork_mode { + SideForkMode::AgentFork => "agent_fork", + SideForkMode::TextSnapshot => "text_snapshot", + }), + ); + if let Some(parent_sid) = fork_parent_session_id.filter(|s| !s.is_empty()) { + obj.insert("fork_parent_session_id".into(), json!(parent_sid)); + } + if let Some(fork_id) = &req.forked_at_msg_id + && !fork_id.is_empty() + { + obj.insert("forked_at_msg_id".into(), json!(fork_id)); + } + + merge_side_context_for_agent(&mut obj, parent_type, side_context); + + Value::Object(obj) +} + +fn merge_side_context_for_agent(obj: &mut serde_json::Map, parent_type: AgentType, side_context: &str) { + let context = side_context.trim(); + if context.is_empty() { + return; + } + let key = match parent_type { + AgentType::Acp => "preset_context", + AgentType::Aionrs => "system_prompt", + _ => return, + }; + let merged = match obj + .get(key) + .and_then(|v| v.as_str()) + .map(str::trim) + .filter(|s| !s.is_empty()) + { + Some(existing) => format!("{existing}\n\n{context}"), + None => context.to_owned(), + }; + obj.insert(key.to_owned(), json!(merged)); +} + +fn build_side_fork_boundary_message( + parent: &aionui_db::models::ConversationRow, + parent_extra: &Value, + req: &CreateSideConversationRequest, +) -> String { + let parent_id = parent.id.as_str(); + let title = if parent.name.trim().is_empty() { + "(untitled)" + } else { + parent.name.trim() + }; + let status = parent.status.as_deref().unwrap_or("unknown"); + let mode = req.guardrail.as_deref().unwrap_or("reference_readonly"); + let workspace = parent_extra + .get("workspace") + .and_then(|v| v.as_str()) + .unwrap_or("(same as parent)"); + let fork_note = req + .forked_at_msg_id + .as_deref() + .filter(|s| !s.is_empty()) + .map(|id| format!("\n分叉锚点消息: {id}")) + .unwrap_or_default(); + + format!( + "【侧边会话 · agent fork】你从主会话 {parent_id} 通过 ACP session/fork 分叉。\n\ + 主会话标题: {title}\n\ + 主会话状态: {status}\n\ + 主会话仍在左侧继续;本侧边栏只展示侧边自己的对话。\n\ + 工作区: {workspace}{fork_note}\n\ + 护栏: {mode} — 默认只读参考主线程,不要擅自改工作区或执行有副作用命令。\n\ + 用户在侧边里说“进度”“刚才”“主线”“现在做到哪了”时,默认是在问分叉时继承到的父主会话。\n\ + 分叉之后主线程的新 turn **不会** 自动同步到本 tab;需要更新认知请新开 tab。\n\ + 不要要求用户再说明“主会话进度”,除非问题确实无法从继承上下文回答。" + ) +} + +fn build_side_snapshot_bootstrap_message( + parent: &aionui_db::models::ConversationRow, + parent_extra: &Value, + req: &CreateSideConversationRequest, + transcript: &str, +) -> String { + let parent_id = parent.id.as_str(); + let title = if parent.name.trim().is_empty() { + "(untitled)" + } else { + parent.name.trim() + }; + let status = parent.status.as_deref().unwrap_or("unknown"); + let mode = req.guardrail.as_deref().unwrap_or("reference_readonly"); + let workspace = parent_extra + .get("workspace") + .and_then(|v| v.as_str()) + .unwrap_or("(same as parent)"); + let fork_note = req + .forked_at_msg_id + .as_deref() + .filter(|s| !s.is_empty()) + .map(|id| format!("\n分叉锚点消息: {id}")) + .unwrap_or_default(); + + let snapshot_block = if transcript.trim().is_empty() { + "(主会话暂无可引用的文本消息;请结合工作区与左侧主线程 UI 判断进展。)".to_owned() + } else { + format!("[主线程快照 · 只读 · 创建时固定]\n{transcript}") + }; + + format!( + "【侧边会话 · 摘要模式】你从主会话 {parent_id} 分叉(text snapshot)。\n\ + 主会话标题: {title}\n\ + 主会话状态: {status}\n\ + 主会话仍在左侧继续;本侧边栏只展示侧边自己的对话。\n\ + 工作区: {workspace}{fork_note}\n\ + 护栏: {mode} — 默认只读参考主线程,不要擅自改工作区或执行有副作用命令。\n\ + 用户在侧边里说“进度”“刚才”“主线”“现在做到哪了”时,默认是在问下方主线程快照。\n\ + 以下为主线程在创建本 tab 时的快照(之后主线新 turn 不会自动写入):\n\n\ + {snapshot_block}" + ) +} + +fn extract_message_text(content_json: &str) -> String { + let Ok(value) = serde_json::from_str::(content_json) else { + return String::new(); + }; + value.get("content").and_then(|v| v.as_str()).unwrap_or("").to_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sanitize_child_extra_drops_immutable_snapshots() { + let parent_extra = json!({ + "workspace": "/w", + "backend": "codex", + "skills": ["a"], + "mcp_server_ids": ["m1"], + "mcp_statuses": [], + "side_conversation_id": "old-child" + }); + let req = CreateSideConversationRequest { + guardrail: None, + initial_prompt: None, + forked_at_msg_id: Some("msg-1".into()), + }; + let child = sanitize_child_extra( + &parent_extra, + "parent-1", + AgentType::Acp, + &req, + SideForkMode::TextSnapshot, + None, + "side context", + ); + let obj = child.as_object().unwrap(); + assert_eq!(obj.get("workspace").unwrap(), "/w"); + assert_eq!(obj.get("preset_enabled_skills").unwrap(), &json!(["a"])); + assert!(obj.get("skills").is_none()); + assert!(obj.get("mcp_server_ids").is_none()); + assert!(obj.get("side_conversation_id").is_none()); + assert_eq!(obj.get("parent_conversation_id").unwrap(), "parent-1"); + assert_eq!(obj.get("fork_mode").unwrap(), "text_snapshot"); + assert_eq!(obj.get("preset_context").unwrap(), "side context"); + } + + #[test] + fn sanitize_child_extra_merges_side_context_into_existing_agent_context() { + let parent_extra = json!({ + "system_prompt": "base system", + }); + let req = CreateSideConversationRequest { + guardrail: None, + initial_prompt: None, + forked_at_msg_id: None, + }; + let child = sanitize_child_extra( + &parent_extra, + "parent-1", + AgentType::Aionrs, + &req, + SideForkMode::TextSnapshot, + None, + "side context", + ); + let obj = child.as_object().unwrap(); + assert_eq!(obj.get("system_prompt").unwrap(), "base system\n\nside context"); + } +} diff --git a/crates/aionui-conversation/src/service_test.rs b/crates/aionui-conversation/src/service_test.rs index 582692a06..60b1d2739 100644 --- a/crates/aionui-conversation/src/service_test.rs +++ b/crates/aionui-conversation/src/service_test.rs @@ -249,6 +249,30 @@ impl IConversationRepository for MockRepo { Ok(vec![]) } + async fn list_side_children( + &self, + user_id: &str, + parent_conversation_id: &str, + ) -> Result, aionui_db::DbError> { + let rows = self.rows.lock().unwrap(); + Ok(rows + .iter() + .filter(|row| row.user_id == user_id) + .filter(|row| { + serde_json::from_str::(&row.extra) + .ok() + .and_then(|extra| { + let is_side = extra.get("side_mode").and_then(|value| value.as_bool()) == Some(true); + let parent_matches = extra.get("parent_conversation_id").and_then(|value| value.as_str()) + == Some(parent_conversation_id); + (is_side && parent_matches).then_some(()) + }) + .is_some() + }) + .cloned() + .collect()) + } + async fn get_messages( &self, conv_id: &str, diff --git a/crates/aionui-conversation/src/stream_relay.rs b/crates/aionui-conversation/src/stream_relay.rs index ff353babe..ef1b2dc77 100644 --- a/crates/aionui-conversation/src/stream_relay.rs +++ b/crates/aionui-conversation/src/stream_relay.rs @@ -1770,6 +1770,13 @@ mod tests { ) -> Result, DbError> { Ok(vec![]) } + async fn list_side_children( + &self, + _user_id: &str, + _parent_conversation_id: &str, + ) -> Result, DbError> { + Ok(vec![]) + } async fn get_messages( &self, _conv_id: &str, diff --git a/crates/aionui-db/src/repository/conversation.rs b/crates/aionui-db/src/repository/conversation.rs index 901f2b85b..901d7823f 100644 --- a/crates/aionui-db/src/repository/conversation.rs +++ b/crates/aionui-db/src/repository/conversation.rs @@ -53,6 +53,13 @@ pub trait IConversationRepository: Send + Sync { /// The conversation identified by `conversation_id` is excluded. async fn list_associated(&self, user_id: &str, conversation_id: &str) -> Result, DbError>; + /// Ephemeral side children forked from `parent_conversation_id` in `extra`. + async fn list_side_children( + &self, + user_id: &str, + parent_conversation_id: &str, + ) -> Result, DbError>; + // ── Message operations ────────────────────────────────────────── /// Returns paginated messages for a conversation, ordered by `created_at`. diff --git a/crates/aionui-db/src/repository/sqlite_conversation.rs b/crates/aionui-db/src/repository/sqlite_conversation.rs index 21bc49156..3ae9b283c 100644 --- a/crates/aionui-db/src/repository/sqlite_conversation.rs +++ b/crates/aionui-db/src/repository/sqlite_conversation.rs @@ -229,6 +229,26 @@ impl IConversationRepository for SqliteConversationRepository { Ok(rows) } + async fn list_side_children( + &self, + user_id: &str, + parent_conversation_id: &str, + ) -> Result, DbError> { + let rows = sqlx::query_as::<_, ConversationRow>( + "SELECT * FROM conversations \ + WHERE user_id = ? \ + AND json_extract(extra, '$.parent_conversation_id') = ? \ + AND json_extract(extra, '$.side_mode') = true \ + ORDER BY updated_at DESC", + ) + .bind(user_id) + .bind(parent_conversation_id) + .fetch_all(&self.pool) + .await?; + + Ok(rows) + } + async fn list_associated(&self, user_id: &str, conversation_id: &str) -> Result, DbError> { // First get the target conversation's workspace let target = sqlx::query_as::<_, ConversationRow>("SELECT * FROM conversations WHERE id = ? AND user_id = ?")