Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/age-2516-assistant-mcp-server-reconcile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"server": patch
---

Assistants now pick up MCP server additions and removals on the next turn instead of only on a fresh runtime bootstrap. The per-turn dispatch sends the current MCP set to the runner, which reconciles its live connections without recycling the VM. Previously a newly attached integration (e.g. GitHub MCP) stayed invisible to the running assistant until the runtime was restarted, leaving the model unable to use it or to invoke `mcp_force_reconnect` for it.
197 changes: 171 additions & 26 deletions agents/runner/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::panic::AssertUnwindSafe;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -88,6 +89,11 @@ pub enum McpCmd {
server_id: McpServerId,
reply: oneshot::Sender<Result<(), String>>,
},
/// Sent by `/threads/{id}/turn` when the server-side toolset has
/// drifted from the snapshot the runner bootstrapped with. The actor
/// diffs `desired` against currently-registered servers, connects
/// any new ones, and disconnects ones that no longer apply.
Reconcile { desired: Vec<McpServer> },
}

impl ConfiguredThread {
Expand Down Expand Up @@ -278,8 +284,16 @@ async fn spawn_thread(
bootstrap: ThreadBootstrap,
tokens: TokenRegistry,
) -> Result<Arc<ConfiguredThread>, RunnerError> {
let (mcp_cmd_tx, mcp_catalog, mcp_auth_notices) =
build_thread_mcp(host, &thread_id, &bootstrap.mcp_servers, &tokens).await?;
let (inbox_tx, inbox_rx) = mpsc::unbounded_channel::<String>();

let (mcp_cmd_tx, mcp_catalog, mcp_auth_notices) = build_thread_mcp(
host,
&thread_id,
&bootstrap.mcp_servers,
&tokens,
inbox_tx.clone(),
)
.await?;

let chat_id = bootstrap.chat_id.clone();

Expand Down Expand Up @@ -360,9 +374,8 @@ async fn spawn_thread(
let fs_resources = FileSystemToolResources::new()
.with_policy(FileSystemToolPolicy::new().require_read_before_write(true));

let mcp_server_ids: Vec<String> = bootstrap.mcp_servers.iter().map(|s| s.id.clone()).collect();
let native_tools = ToolRegistry::new().with(tools::bun_run::bun_run).with(
tools::mcp_force_reconnect::McpForceReconnectTool::new(Arc::clone(host), mcp_server_ids),
tools::mcp_force_reconnect::McpForceReconnectTool::new(Arc::clone(host)),
);

let mcp_source = ClippedToolSource::new(mcp_catalog, host.spill_root.clone());
Expand Down Expand Up @@ -392,7 +405,6 @@ async fn spawn_thread(
.map_err(|e| RunnerError::AgentStart(e.to_string()))?;

let idle_since = Arc::new(Mutex::new(Some(Instant::now())));
let (inbox_tx, inbox_rx) = mpsc::unbounded_channel::<String>();
let loop_idle = Arc::clone(&idle_since);
let log_thread_id = thread_id.clone();
let host_for_eviction = Arc::clone(host);
Expand Down Expand Up @@ -440,43 +452,157 @@ async fn build_thread_mcp(
thread_id: &str,
servers: &[McpServer],
tokens: &TokenRegistry,
inbox_tx: UnboundedSender<String>,
) -> Result<(mpsc::Sender<McpCmd>, CatalogReader, Vec<String>), RunnerError> {
let mut manager = McpServerManager::new();
let catalog = manager.source();
let mut auth_notices = Vec::new();
let mut known = BTreeSet::new();

for server in servers {
let config = build_mcp_server_config(server, &host.http_client, tokens)?;
let server_id = McpServerId::new(server.id.clone());
manager.register_server(config);
if let Err(err) = connect_and_log(&mut manager, &server_id, "register").await
&& err.auth_required
{
match host
.gram_client
.create_mcp_auth_flow(thread_id, &server.id, &server.url, tokens)
.await
{
Ok(flow) => auth_notices.push(format!(
"<message-context>\nEventType: assistant_mcp_auth_required\nMCPServerID: {server_id}\nMCPSlug: {mcp_slug}\nAuthURL: {auth_url}\n</message-context>",
server_id = flow.server_id,
mcp_slug = flow.mcp_slug,
auth_url = flow.auth_url,
)),
Err(flow_err) => tracing::warn!(
server_id = %server_id,
error = %flow_err,
"failed to create assistant mcp auth flow"
),
match connect_and_log(&mut manager, &server_id, "register").await {
Ok(()) => {
known.insert(server.id.clone());
}
Err(err) if err.auth_required => {
known.insert(server.id.clone());
if let Some(notice) =
create_auth_notice(host, thread_id, &server.id, &server.url, tokens).await
{
auth_notices.push(notice);
}
}
Err(_) => {
// Transient transport failure: leave out of `known` so the
// next /turn reconcile retries instead of silently dropping
// the integration for the rest of the thread.
}
}
}

let (cmd_tx, cmd_rx) = mpsc::channel(MCP_CMD_CAPACITY);
tokio::spawn(run_mcp_actor(manager, cmd_rx));
let actor_ctx = McpActorContext {
host: Arc::clone(host),
thread_id: thread_id.to_string(),
tokens: tokens.clone(),
inbox_tx,
known,
};
tokio::spawn(run_mcp_actor(manager, cmd_rx, actor_ctx));
Ok((cmd_tx, catalog, auth_notices))
}

struct McpActorContext {
host: Arc<RuntimeHost>,
thread_id: String,
tokens: TokenRegistry,
inbox_tx: UnboundedSender<String>,
known: BTreeSet<String>,
}

async fn create_auth_notice(
host: &Arc<RuntimeHost>,
thread_id: &str,
server_id: &str,
server_url: &str,
tokens: &TokenRegistry,
) -> Option<String> {
match host
.gram_client
.create_mcp_auth_flow(thread_id, server_id, server_url, tokens)
.await
{
Ok(flow) => Some(format!(
"<message-context>\nEventType: assistant_mcp_auth_required\nMCPServerID: {server_id}\nMCPSlug: {mcp_slug}\nAuthURL: {auth_url}\n</message-context>",
server_id = flow.server_id,
mcp_slug = flow.mcp_slug,
auth_url = flow.auth_url,
)),
Err(flow_err) => {
tracing::warn!(
server_id,
error = %flow_err,
"failed to create assistant mcp auth flow"
);
None
}
}
}

async fn reconcile_servers(
manager: &mut McpServerManager,
ctx: &mut McpActorContext,
desired: Vec<McpServer>,
) {
let desired_ids: BTreeSet<String> = desired.iter().map(|s| s.id.clone()).collect();

for server in &desired {
if ctx.known.contains(&server.id) {
continue;
}
let config = match build_mcp_server_config(server, &ctx.host.http_client, &ctx.tokens) {
Ok(cfg) => cfg,
Err(err) => {
tracing::warn!(
server_id = %server.id,
error = %err,
"skip reconcile-added mcp server: config build failed"
);
continue;
}
};
manager.register_server(config);
let server_uid = McpServerId::new(server.id.clone());
match connect_and_log(manager, &server_uid, "reconcile_add").await {
Ok(()) => {
ctx.known.insert(server.id.clone());
}
Err(err) if err.auth_required => {
ctx.known.insert(server.id.clone());
if let Some(notice) = create_auth_notice(
Comment on lines +564 to +565
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Retry auth-required servers when auth notice creation fails

In reconcile_servers, the auth-required path marks the server as known before the auth notice is successfully created and delivered. If create_mcp_auth_flow fails transiently (or the inbox send fails), subsequent reconciles will skip this server because of the ctx.known guard, so no later turn can re-emit assistant_mcp_auth_required and the integration can remain unusable for the thread.

Useful? React with 👍 / 👎.

&ctx.host,
&ctx.thread_id,
&server.id,
&server.url,
&ctx.tokens,
)
.await
&& ctx.inbox_tx.send(notice).is_err()
{
tracing::warn!(
server_id = %server.id,
"drop reconcile auth notice: thread inbox closed"
);
}
}
Err(_) => {
// Transient transport failure on connect: leave out of
// `known` so a later reconcile re-attempts the connect.
// The config is still registered, which means a manual
// `mcp_force_reconnect` will not bypass this path — see
// the `known` guard in the ForceReconnect arm.
}
}
}

let removed: Vec<String> = ctx
.known
.iter()
.filter(|id| !desired_ids.contains(*id))
.cloned()
.collect();
for id in removed {
let server_uid = McpServerId::new(id.clone());
if let Err(err) = manager.disconnect_server(&server_uid).await {
tracing::debug!(server_id = %id, error = %err, "reconcile disconnect (ignored)");
}
ctx.known.remove(&id);
Comment on lines +599 to +602
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Remove detached MCP servers from reconnectable state

When a server is removed during reconcile, this code only calls disconnect_server and then deletes the ID from ctx.known, but it never removes the server config from McpServerManager. Because ForceReconnect later calls connect_server by ID against manager configs, a previously detached server can be reconnected and exposed again if the model/user reuses that stale ID. Since that ID is no longer in ctx.known, future reconciles also won’t clean it up, so detached integrations can persist unexpectedly.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in a83ee4f: known now records only servers that connected (or that surfaced an auth notice). Transient transport failures stay out of known and the next /turn reconcile retries the connect.

Comment on lines +599 to +602
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve detach retry when MCP disconnect fails

The removal loop drops the server ID from ctx.known even when disconnect_server returns an error. In failure cases this prevents future reconciles from retrying the detach (the ID is no longer tracked in known), so a server that failed to disconnect can linger in stale state for the lifetime of the thread.

Useful? React with 👍 / 👎.

}
}

struct McpConnectFailure {
message: String,
auth_required: bool,
Expand Down Expand Up @@ -542,10 +668,26 @@ fn build_mcp_server_config(
))
}

async fn run_mcp_actor(mut manager: McpServerManager, mut cmd_rx: mpsc::Receiver<McpCmd>) {
async fn run_mcp_actor(
mut manager: McpServerManager,
mut cmd_rx: mpsc::Receiver<McpCmd>,
mut ctx: McpActorContext,
) {
while let Some(cmd) = cmd_rx.recv().await {
match cmd {
McpCmd::ForceReconnect { server_id, reply } => {
// Reject ids that aren't part of the assistant's current
// catalog. `disconnect_server` only clears `connections`;
// the underlying config lingers in the manager for the
// thread's lifetime (agentkit-mcp exposes no unregister
// path). Without this guard a detached integration could
// be resurrected via force_reconnect.
if !ctx.known.contains(server_id.0.as_str()) {
let _ = reply.send(Err(format!(
"mcp server {server_id} is not part of this assistant's current configuration"
)));
continue;
}
if let Err(e) = manager.disconnect_server(&server_id).await {
tracing::debug!(server_id = %server_id, error = %e, "disconnect during force reconnect");
}
Expand All @@ -554,6 +696,9 @@ async fn run_mcp_actor(mut manager: McpServerManager, mut cmd_rx: mpsc::Receiver
.map_err(|err| err.message);
let _ = reply.send(result);
}
McpCmd::Reconcile { desired } => {
reconcile_servers(&mut manager, &mut ctx, desired).await;
}
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion agents/runner/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::net::TcpListener;
use tokio::sync::{Mutex, Notify};

use crate::runtime::{
AppState, DEFAULT_THREAD_IDLE_TTL, build_host, ensure_thread, snapshot_threads,
AppState, DEFAULT_THREAD_IDLE_TTL, McpCmd, build_host, ensure_thread, snapshot_threads,
};

const IDEMPOTENCY_HEADER: &str = "x-idempotency-key";
Expand Down Expand Up @@ -113,6 +113,21 @@ async fn thread_turn(
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;

// Hand reconcile to the actor and proceed to enqueue. The actor runs
// concurrently with the agent loop, so a server added by this /turn
// may surface on the very next model step or on the one after,
// depending on whether the connect finishes before tool catalog is
// sampled. Either way it lands before the user notices.
if let Some(desired) = request.mcp_servers
&& thread
.mcp_cmd_tx
.send(McpCmd::Reconcile { desired })
.await
.is_err()
{
tracing::warn!(thread_id = %thread_id, "drop mcp reconcile: actor channel closed");
}

thread
.enqueue(request.input)
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?;
Expand Down
54 changes: 21 additions & 33 deletions agents/runner/src/tools/mcp_force_reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use agentkit_tools_core::{
use async_trait::async_trait;
use futures::future::join_all;
use serde::Deserialize;
use serde_json::{Value, json};
use serde_json::json;
use tokio::sync::oneshot;

use crate::runtime::{McpCmd, RuntimeHost};
Expand All @@ -21,9 +21,11 @@ pub struct McpForceReconnectTool {
}

impl McpForceReconnectTool {
pub fn new(host: Arc<RuntimeHost>, server_ids: Vec<String>) -> Self {
let spec = build_spec(&server_ids);
Self { host, spec }
pub fn new(host: Arc<RuntimeHost>) -> Self {
Self {
host,
spec: build_spec(),
}
}
}

Expand All @@ -32,43 +34,29 @@ struct McpForceReconnectInput {
server_id: String,
}

fn build_spec(server_ids: &[String]) -> ToolSpec {
// The model only sees tools advertised in /tools/list. Without the enum,
// the assistant has to guess server_id from the `mcp_<id>_<tool>` prefix
// — which fails the moment the server is disconnected (no prefixed tools
// exposed). Baking the configured server IDs into the schema keeps the
// tool callable even when MCP is fully offline.
let server_ids_json: Vec<Value> = server_ids.iter().cloned().map(Value::String).collect();

let server_id_property = if server_ids_json.is_empty() {
json!({
"type": "string",
"description": "ID of the MCP server to reconnect.",
})
} else {
json!({
"type": "string",
"description": "ID of the MCP server to reconnect.",
"enum": server_ids_json,
})
};

fn build_spec() -> ToolSpec {
// server_id is intentionally not enumerated. The set of registered MCP
// servers can drift mid-thread (assistant toolset edits flow in via
// /turn reconcile), so any frozen enum becomes stale as soon as the
// user attaches a new integration. The model discovers live server
// ids from the `mcp_<id>_<tool>` prefix of catalog entries and from
// the `assistant_mcp_auth` event context.
let input_schema = json!({
"type": "object",
"properties": { "server_id": server_id_property },
"properties": {
"server_id": {
"type": "string",
"description": "ID of the MCP server to reconnect.",
},
},
"required": ["server_id"],
"additionalProperties": false,
});

let description = if server_ids.is_empty() {
"Disconnect and reconnect a registered MCP server. No MCP servers are \
configured for this assistant; calling this tool will fail."
} else {
"Disconnect and reconnect a registered MCP server. Use this when an \
let description = "Disconnect and reconnect a registered MCP server. Use this when an \
MCP-backed tool returns a connection-related error (timeout, transport closed, \
auth failure that the backend has since refreshed) or when no MCP-backed tools \
appear in the catalog."
};
appear in the catalog.";

ToolSpec::new(TOOL_NAME, description, input_schema)
.with_annotations(ToolAnnotations::default().with_idempotent(true))
Expand Down
Loading
Loading