Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/aionui-ai-agent/src/agent_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,26 @@ impl AgentInstance {
}
}

/// Whether the connected ACP CLI advertised `session/fork`.
pub async fn acp_supports_session_fork(&self) -> bool {
match self {
Self::Acp(m) => m.supports_session_fork().await,
_ => false,
}
}

/// Warm the parent session and return its ACP session id (for side fork).
pub async fn acp_ensure_warm_session_id(&self) -> Result<String, AgentError> {
match self {
Self::Acp(m) => {
m.warmup_session().await?;
let sid = m.current_session_id().await;
sid.ok_or_else(|| AgentError::internal("ACP session id missing after warmup"))
}
_ => Err(AgentError::bad_request("Side fork requires an ACP parent")),
}
}

// ── Cross-variant semi-specific helpers ──────────────────────────
//
// These fan out to inherent methods on concrete managers. Variants
Expand Down
14 changes: 14 additions & 0 deletions crates/aionui-ai-agent/src/factory/acp_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let servers = resolve_mcp_servers(&config, "conv-1", Vec::new());
assert_eq!(servers.len(), 1);
Expand Down Expand Up @@ -268,6 +270,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let servers = resolve_mcp_servers(&config, "conv-1", Vec::new());
assert_eq!(servers.len(), 1);
Expand Down Expand Up @@ -300,6 +304,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let servers = resolve_mcp_servers(&config, "conv-1", Vec::new());
assert!(servers.is_empty());
Expand Down Expand Up @@ -329,6 +335,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let user = vec![user_stdio("ctx7"), user_stdio("playwright")];
let servers = resolve_mcp_servers(&config, "conv-1", user);
Expand Down Expand Up @@ -370,6 +378,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let user = vec![user_stdio("ctx7")];
let servers = resolve_mcp_servers(&config, "conv-1", user);
Expand Down Expand Up @@ -408,6 +418,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let user = vec![user_stdio("ctx7")];
let servers = resolve_mcp_servers(&config, "conv-1", user);
Expand Down Expand Up @@ -444,6 +456,8 @@ mod tests {
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};
let servers = resolve_mcp_servers(&config, "conv-1", Vec::new());
assert!(servers.is_empty());
Expand Down
20 changes: 19 additions & 1 deletion crates/aionui-ai-agent/src/manager/acp/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,21 @@ impl AcpAgentManager {
(s.session_id().map(ToOwned::to_owned), s.is_opened())
};

let fork_parent = self
.params
.config
.fork_parent_session_id
.as_deref()
.filter(|s| !s.is_empty());

let sid = match (session_id, opened) {
(None, _) => self.open_session_new().await?,
(None, _) => {
if let Some(parent_sid) = fork_parent {
self.open_session_fork(parent_sid).await?
} else {
self.open_session_new().await?
}
}
(Some(sid), false) => self.open_session_resume(&sid).await?,
(Some(sid), true) => sid,
};
Expand Down Expand Up @@ -724,6 +737,11 @@ impl AcpAgentManager {
.await
}

/// Current ACP session id if the CLI has assigned one.
pub async fn current_session_id(&self) -> Option<String> {
self.session.read().await.session_id().map(|id| id.to_string())
}

/// Pre-open the ACP session without sending a prompt. Called by the
/// factory after `AcpAgentManager::build` so `POST /warmup` returns
/// only after the session is ready to accept `set_mode` / `set_model`
Expand Down
49 changes: 48 additions & 1 deletion crates/aionui-ai-agent/src/manager/acp/agent_session_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::protocol::events::{
use crate::protocol::send_error::AgentSendError;
use crate::shared_kernel::SessionId as DomainSessionId;
use crate::types::SendMessageData;
use agent_client_protocol::schema::{ContentBlock, LoadSessionRequest, PromptRequest, SessionId, StopReason};
use agent_client_protocol::schema::{
ContentBlock, ForkSessionRequest, LoadSessionRequest, PromptRequest, SessionId, StopReason,
};
use aionui_api_types::SlashCommandItem;
use serde_json::Value;
use tokio::sync::broadcast::error::TryRecvError;
Expand Down Expand Up @@ -73,6 +75,51 @@ impl AcpAgentManager {
Ok(sid)
}

/// Whether the connected CLI advertised `session/fork` during initialize.
pub(crate) async fn supports_session_fork(&self) -> bool {
self.session
.read()
.await
.agent_capabilities()
.and_then(|c| c.session_capabilities.fork.as_ref())
.is_some()
}

/// Fork an existing session into a new session id (ACP `session/fork`).
pub(super) async fn open_session_fork(&self, parent_session_id: &str) -> Result<String, AgentError> {
use std::path::PathBuf;

let req = ForkSessionRequest::new(
SessionId::new(parent_session_id.to_owned()),
PathBuf::from(&self.params.workspace.path),
);
let resp = self.protocol.fork_session(req).await?;
let sid = resp.session_id.to_string();

{
let mut session = self.session.write().await;
if let Some(models) = resp.models {
session.apply_advertised_models(models);
}
if let Some(modes) = resp.modes {
session.apply_advertised_modes(modes);
}
if let Some(config_options) = resp.config_options {
session.apply_advertised_config_options(config_options);
}
session.set_session_id(DomainSessionId::new(sid.clone()));
session.mark_pending_session_new_prelude();
self.commit_session_changes(&mut session).await;
}
self.emit_snapshot_events().await;
self.runtime
.emit(AgentStreamEvent::SessionAssigned(SessionAssignedEventData {
session_id: sid.clone(),
}));
self.reconcile_session(&sid).await?;
Ok(sid)
}

/// Drop the in-aggregate session id and re-run `open_session_new`.
/// Used as the rescue path when resume helpers see `SessionNotFound`.
/// Emits a `warn!` so ops can still see the original failure that
Expand Down
2 changes: 2 additions & 0 deletions crates/aionui-ai-agent/tests/acp_agent_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ async fn make_mock_agent(script: &str, backend: &str) -> (Arc<AcpAgentManager>,
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};

let tmp_skills = tempfile::TempDir::new().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions crates/aionui-ai-agent/tests/prompt_pipeline_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ async fn fixture_params(
mcp_server_ids: None,
session_mcp_servers: vec![],
user_id: None,
fork_parent_session_id: None,
fork_mode: None,
};

Arc::new(
Expand Down
6 changes: 6 additions & 0 deletions crates/aionui-api-types/src/agent_build_extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub struct AcpBuildExtra {
pub session_mcp_servers: Vec<SessionMcpServer>,
#[serde(default)]
pub user_id: Option<String>,
/// Parent ACP session id for side conversations using `session/fork`.
#[serde(default)]
pub fork_parent_session_id: Option<String>,
/// `agent_fork` | `text_snapshot` — set on side child rows.
#[serde(default)]
pub fork_mode: Option<String>,
}

/// Aionrs-specific fields extracted from `extra` in build task options.
Expand Down
25 changes: 25 additions & 0 deletions crates/aionui-api-types/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ pub struct UpdateConversationRequest {
pub extra: Option<serde_json::Value>,
}

/// Body for `POST /api/conversations/:id/side`.
#[derive(Debug, Deserialize)]
pub struct CreateSideConversationRequest {
pub guardrail: Option<String>,
pub initial_prompt: Option<String>,
pub forked_at_msg_id: Option<String>,
}

/// Response for `POST /api/conversations/:id/side`.
#[derive(Debug, Serialize)]
pub struct CreateSideConversationResponse {
pub conversation_id: String,
/// Always `true` in v0.2 — each open creates a new side tab.
pub created: bool,
pub fork_mode: SideForkMode,
}

/// How the side child session inherited parent context.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SideForkMode {
AgentFork,
TextSnapshot,
}

/// Body for `POST /api/conversations/clone`.
///
/// Despite the name, this endpoint no longer supports cloning from an
Expand Down
7 changes: 4 additions & 3 deletions crates/aionui-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ pub use conversation::{
ConversationArtifactKind, ConversationArtifactListResponse, ConversationArtifactResponse,
ConversationArtifactStatus, ConversationListResponse, ConversationMcpStatus, ConversationMcpStatusKind,
ConversationResponse, ConversationRuntimeStateKind, ConversationRuntimeSummary, CreateConversationRequest,
ListConversationsQuery, ListMessagesQuery, MessageListResponse, MessageResponse, MessageSearchItem,
MessageSearchResponse, SearchMessagesQuery, SendMessageRequest, SendMessageResponse,
UpdateConversationArtifactRequest, UpdateConversationRequest,
CreateSideConversationRequest, CreateSideConversationResponse, ListConversationsQuery, ListMessagesQuery,
MessageListResponse, MessageResponse, MessageSearchItem, MessageSearchResponse, SearchMessagesQuery,
SendMessageRequest, SendMessageResponse, SideForkMode, UpdateConversationArtifactRequest,
UpdateConversationRequest,
};
pub use cron::{
CreateCronJobRequest, CronAgentConfigDto, CronJobExecutedEvent, CronJobMetadataDto, CronJobPayloadDto,
Expand Down
2 changes: 1 addition & 1 deletion crates/aionui-conversation/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn row_to_response_with_extra(
/// field that can be an array of model objects. The backend only needs
/// `provider_id`, `model` (the selected model name), and `use_model`.
/// Accepts both snake_case and legacy camelCase key names for backward compatibility.
fn parse_provider_with_model(s: &str) -> Result<ProviderWithModel, ConversationError> {
pub(crate) fn parse_provider_with_model(s: &str) -> Result<ProviderWithModel, ConversationError> {
let v: serde_json::Value =
serde_json::from_str(s).map_err(|e| ConversationError::internal(format!("Invalid model JSON: {e}")))?;

Expand Down
1 change: 1 addition & 0 deletions crates/aionui-conversation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod runtime_persistence;
pub mod runtime_state;
pub mod service;
mod service_ops;
mod service_side;
pub(crate) mod session_context;
pub mod skill_resolver;
pub mod skill_snapshot;
Expand Down
33 changes: 32 additions & 1 deletion crates/aionui-conversation/src/routes_aux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@

use crate::state::ConversationRouterState;
use aionui_api_types::{
AgentModeResponse, ApiResponse, GetModelInfoResponse, SetModeRequest, SetModelRequest, SideQuestionRequest,
AgentModeResponse, ApiResponse, ConversationResponse, CreateSideConversationRequest,
CreateSideConversationResponse, GetModelInfoResponse, SetModeRequest, SetModelRequest, SideQuestionRequest,
SideQuestionResponse, SlashCommandItem, WorkspaceBrowseQuery, WorkspaceEntry,
};
use aionui_auth::CurrentUser;
use aionui_common::ApiError;
use axum::Router;
use axum::extract::rejection::JsonRejection;
use axum::extract::{Extension, Json, Path, Query, State};
use axum::http::StatusCode;
use axum::routing::{get, post};

/// Build the conversation-ops router (no auth layer applied — the caller is
/// responsible for wrapping this with the auth middleware).
pub fn conversation_ops_routes(state: ConversationRouterState) -> Router {
Router::new()
.route("/api/conversations/{id}/side-question", post(side_question))
.route("/api/conversations/{id}/side", get(list_side).post(create_side))
.route("/api/conversations/{id}/slash-commands", get(get_slash_commands))
.route("/api/conversations/{id}/usage", get(get_usage))
.route("/api/conversations/{id}/mode", get(get_mode).put(set_mode))
Expand Down Expand Up @@ -96,6 +99,34 @@ async fn side_question(
)))
}

async fn create_side(
State(state): State<ConversationRouterState>,
Extension(user): Extension<CurrentUser>,
Path(id): Path<String>,
Json(req): Json<CreateSideConversationRequest>,
) -> Result<(StatusCode, Json<ApiResponse<CreateSideConversationResponse>>), ApiError> {
let resp = state
.service
.create_side_conversation(&user.id, &id, req, &state.task_manager)
.await
.map_err(ApiError::from)?;
Ok((StatusCode::CREATED, Json(ApiResponse::ok(resp))))
}

async fn list_side(
State(state): State<ConversationRouterState>,
Extension(user): Extension<CurrentUser>,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<Vec<ConversationResponse>>>, ApiError> {
Ok(Json(ApiResponse::ok(
state
.service
.list_side_conversations(&user.id, &id)
.await
.map_err(ApiError::from)?,
)))
}

async fn get_slash_commands(
State(state): State<ConversationRouterState>,
Extension(_user): Extension<CurrentUser>,
Expand Down
13 changes: 12 additions & 1 deletion crates/aionui-conversation/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ impl ConversationService {
.filter(|r| r.user_id == user_id)
.ok_or_else(|| ConversationError::NotFound { id: id.to_owned() })?;

self.delete_ephemeral_side_children(user_id, id).await?;

let source: Option<ConversationSource> = existing
.source
.as_deref()
Expand Down Expand Up @@ -1458,6 +1460,9 @@ impl ConversationService {

info!(msg_id = %user_msg_id, "User message persisted");

let child_extra: serde_json::Value = serde_json::from_str(&row.extra).unwrap_or_else(|_| serde_json::json!({}));
let agent_content = self.enrich_side_agent_content(&child_extra, &req.content).await?;

self.broadcaster.broadcast(WebSocketMessage::new(
"message.userCreated",
serde_json::json!({
Expand Down Expand Up @@ -1499,11 +1504,17 @@ impl ConversationService {
self.ensure_auto_workspace_skill_links(&row, &build_opts).await;
let stored_workspace = build_opts.context.workspace.stored_path.clone();

let agent_request = SendMessageRequest {
content: agent_content,
files: req.files,
inject_skills: req.inject_skills,
hidden: req.hidden,
};
let user_msg_id_ret = user_msg_id.clone();
ConversationTurnOrchestrator::new(self.clone(), Arc::clone(task_manager)).spawn_user_turn(TurnStartInput {
user_id: user_id.to_owned(),
conversation: row,
request: req,
request: agent_request,
build_options: build_opts,
stored_workspace,
turn_id: turn_id.clone(),
Expand Down
Loading