-
Notifications
You must be signed in to change notification settings - Fork 29
fix(assistants): reconcile MCP server set on every turn #3013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "server": patch | ||
| --- | ||
|
|
||
| Assistants now pick up MCP server additions and removals on the next turn instead of only on a fresh runtime bootstrap. The per-turn dispatch sends the current MCP set to the runner, which reconciles its live connections without recycling the VM. Previously a newly attached integration (e.g. GitHub MCP) stayed invisible to the running assistant until the runtime was restarted, leaving the model unable to use it or to invoke `mcp_force_reconnect` for it. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| use std::collections::BTreeSet; | ||
| use std::panic::AssertUnwindSafe; | ||
| use std::path::PathBuf; | ||
| use std::sync::{Arc, Mutex}; | ||
|
|
@@ -88,6 +89,11 @@ pub enum McpCmd { | |
| server_id: McpServerId, | ||
| reply: oneshot::Sender<Result<(), String>>, | ||
| }, | ||
| /// Sent by `/threads/{id}/turn` when the server-side toolset has | ||
| /// drifted from the snapshot the runner bootstrapped with. The actor | ||
| /// diffs `desired` against currently-registered servers, connects | ||
| /// any new ones, and disconnects ones that no longer apply. | ||
| Reconcile { desired: Vec<McpServer> }, | ||
| } | ||
|
|
||
| impl ConfiguredThread { | ||
|
|
@@ -278,8 +284,16 @@ async fn spawn_thread( | |
| bootstrap: ThreadBootstrap, | ||
| tokens: TokenRegistry, | ||
| ) -> Result<Arc<ConfiguredThread>, RunnerError> { | ||
| let (mcp_cmd_tx, mcp_catalog, mcp_auth_notices) = | ||
| build_thread_mcp(host, &thread_id, &bootstrap.mcp_servers, &tokens).await?; | ||
| let (inbox_tx, inbox_rx) = mpsc::unbounded_channel::<String>(); | ||
|
|
||
| let (mcp_cmd_tx, mcp_catalog, mcp_auth_notices) = build_thread_mcp( | ||
| host, | ||
| &thread_id, | ||
| &bootstrap.mcp_servers, | ||
| &tokens, | ||
| inbox_tx.clone(), | ||
| ) | ||
| .await?; | ||
|
|
||
| let chat_id = bootstrap.chat_id.clone(); | ||
|
|
||
|
|
@@ -360,9 +374,8 @@ async fn spawn_thread( | |
| let fs_resources = FileSystemToolResources::new() | ||
| .with_policy(FileSystemToolPolicy::new().require_read_before_write(true)); | ||
|
|
||
| let mcp_server_ids: Vec<String> = bootstrap.mcp_servers.iter().map(|s| s.id.clone()).collect(); | ||
| let native_tools = ToolRegistry::new().with(tools::bun_run::bun_run).with( | ||
| tools::mcp_force_reconnect::McpForceReconnectTool::new(Arc::clone(host), mcp_server_ids), | ||
| tools::mcp_force_reconnect::McpForceReconnectTool::new(Arc::clone(host)), | ||
| ); | ||
|
|
||
| let mcp_source = ClippedToolSource::new(mcp_catalog, host.spill_root.clone()); | ||
|
|
@@ -392,7 +405,6 @@ async fn spawn_thread( | |
| .map_err(|e| RunnerError::AgentStart(e.to_string()))?; | ||
|
|
||
| let idle_since = Arc::new(Mutex::new(Some(Instant::now()))); | ||
| let (inbox_tx, inbox_rx) = mpsc::unbounded_channel::<String>(); | ||
| let loop_idle = Arc::clone(&idle_since); | ||
| let log_thread_id = thread_id.clone(); | ||
| let host_for_eviction = Arc::clone(host); | ||
|
|
@@ -440,43 +452,157 @@ async fn build_thread_mcp( | |
| thread_id: &str, | ||
| servers: &[McpServer], | ||
| tokens: &TokenRegistry, | ||
| inbox_tx: UnboundedSender<String>, | ||
| ) -> Result<(mpsc::Sender<McpCmd>, CatalogReader, Vec<String>), RunnerError> { | ||
| let mut manager = McpServerManager::new(); | ||
| let catalog = manager.source(); | ||
| let mut auth_notices = Vec::new(); | ||
| let mut known = BTreeSet::new(); | ||
|
|
||
| for server in servers { | ||
| let config = build_mcp_server_config(server, &host.http_client, tokens)?; | ||
| let server_id = McpServerId::new(server.id.clone()); | ||
| manager.register_server(config); | ||
| if let Err(err) = connect_and_log(&mut manager, &server_id, "register").await | ||
| && err.auth_required | ||
| { | ||
| match host | ||
| .gram_client | ||
| .create_mcp_auth_flow(thread_id, &server.id, &server.url, tokens) | ||
| .await | ||
| { | ||
| Ok(flow) => auth_notices.push(format!( | ||
| "<message-context>\nEventType: assistant_mcp_auth_required\nMCPServerID: {server_id}\nMCPSlug: {mcp_slug}\nAuthURL: {auth_url}\n</message-context>", | ||
| server_id = flow.server_id, | ||
| mcp_slug = flow.mcp_slug, | ||
| auth_url = flow.auth_url, | ||
| )), | ||
| Err(flow_err) => tracing::warn!( | ||
| server_id = %server_id, | ||
| error = %flow_err, | ||
| "failed to create assistant mcp auth flow" | ||
| ), | ||
| match connect_and_log(&mut manager, &server_id, "register").await { | ||
| Ok(()) => { | ||
| known.insert(server.id.clone()); | ||
| } | ||
| Err(err) if err.auth_required => { | ||
| known.insert(server.id.clone()); | ||
| if let Some(notice) = | ||
| create_auth_notice(host, thread_id, &server.id, &server.url, tokens).await | ||
| { | ||
| auth_notices.push(notice); | ||
| } | ||
| } | ||
| Err(_) => { | ||
| // Transient transport failure: leave out of `known` so the | ||
| // next /turn reconcile retries instead of silently dropping | ||
| // the integration for the rest of the thread. | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let (cmd_tx, cmd_rx) = mpsc::channel(MCP_CMD_CAPACITY); | ||
| tokio::spawn(run_mcp_actor(manager, cmd_rx)); | ||
| let actor_ctx = McpActorContext { | ||
| host: Arc::clone(host), | ||
| thread_id: thread_id.to_string(), | ||
| tokens: tokens.clone(), | ||
| inbox_tx, | ||
| known, | ||
| }; | ||
| tokio::spawn(run_mcp_actor(manager, cmd_rx, actor_ctx)); | ||
| Ok((cmd_tx, catalog, auth_notices)) | ||
| } | ||
|
|
||
| struct McpActorContext { | ||
| host: Arc<RuntimeHost>, | ||
| thread_id: String, | ||
| tokens: TokenRegistry, | ||
| inbox_tx: UnboundedSender<String>, | ||
| known: BTreeSet<String>, | ||
| } | ||
|
|
||
| async fn create_auth_notice( | ||
| host: &Arc<RuntimeHost>, | ||
| thread_id: &str, | ||
| server_id: &str, | ||
| server_url: &str, | ||
| tokens: &TokenRegistry, | ||
| ) -> Option<String> { | ||
| match host | ||
| .gram_client | ||
| .create_mcp_auth_flow(thread_id, server_id, server_url, tokens) | ||
| .await | ||
| { | ||
| Ok(flow) => Some(format!( | ||
| "<message-context>\nEventType: assistant_mcp_auth_required\nMCPServerID: {server_id}\nMCPSlug: {mcp_slug}\nAuthURL: {auth_url}\n</message-context>", | ||
| server_id = flow.server_id, | ||
| mcp_slug = flow.mcp_slug, | ||
| auth_url = flow.auth_url, | ||
| )), | ||
| Err(flow_err) => { | ||
| tracing::warn!( | ||
| server_id, | ||
| error = %flow_err, | ||
| "failed to create assistant mcp auth flow" | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn reconcile_servers( | ||
| manager: &mut McpServerManager, | ||
| ctx: &mut McpActorContext, | ||
| desired: Vec<McpServer>, | ||
| ) { | ||
| let desired_ids: BTreeSet<String> = desired.iter().map(|s| s.id.clone()).collect(); | ||
|
|
||
| for server in &desired { | ||
| if ctx.known.contains(&server.id) { | ||
| continue; | ||
| } | ||
| let config = match build_mcp_server_config(server, &ctx.host.http_client, &ctx.tokens) { | ||
| Ok(cfg) => cfg, | ||
| Err(err) => { | ||
| tracing::warn!( | ||
| server_id = %server.id, | ||
| error = %err, | ||
| "skip reconcile-added mcp server: config build failed" | ||
| ); | ||
| continue; | ||
| } | ||
| }; | ||
| manager.register_server(config); | ||
| let server_uid = McpServerId::new(server.id.clone()); | ||
| match connect_and_log(manager, &server_uid, "reconcile_add").await { | ||
| Ok(()) => { | ||
| ctx.known.insert(server.id.clone()); | ||
| } | ||
| Err(err) if err.auth_required => { | ||
| ctx.known.insert(server.id.clone()); | ||
| if let Some(notice) = create_auth_notice( | ||
| &ctx.host, | ||
| &ctx.thread_id, | ||
| &server.id, | ||
| &server.url, | ||
| &ctx.tokens, | ||
| ) | ||
| .await | ||
| && ctx.inbox_tx.send(notice).is_err() | ||
| { | ||
| tracing::warn!( | ||
| server_id = %server.id, | ||
| "drop reconcile auth notice: thread inbox closed" | ||
| ); | ||
| } | ||
| } | ||
| Err(_) => { | ||
| // Transient transport failure on connect: leave out of | ||
| // `known` so a later reconcile re-attempts the connect. | ||
| // The config is still registered, which means a manual | ||
| // `mcp_force_reconnect` will not bypass this path — see | ||
| // the `known` guard in the ForceReconnect arm. | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let removed: Vec<String> = ctx | ||
| .known | ||
| .iter() | ||
| .filter(|id| !desired_ids.contains(*id)) | ||
| .cloned() | ||
| .collect(); | ||
| for id in removed { | ||
| let server_uid = McpServerId::new(id.clone()); | ||
| if let Err(err) = manager.disconnect_server(&server_uid).await { | ||
| tracing::debug!(server_id = %id, error = %err, "reconcile disconnect (ignored)"); | ||
| } | ||
| ctx.known.remove(&id); | ||
|
Comment on lines
+599
to
+602
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When a server is removed during reconcile, this code only calls Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in a83ee4f:
Comment on lines
+599
to
+602
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The removal loop drops the server ID from Useful? React with 👍 / 👎. |
||
| } | ||
| } | ||
|
|
||
| struct McpConnectFailure { | ||
| message: String, | ||
| auth_required: bool, | ||
|
|
@@ -542,10 +668,26 @@ fn build_mcp_server_config( | |
| )) | ||
| } | ||
|
|
||
| async fn run_mcp_actor(mut manager: McpServerManager, mut cmd_rx: mpsc::Receiver<McpCmd>) { | ||
| async fn run_mcp_actor( | ||
| mut manager: McpServerManager, | ||
| mut cmd_rx: mpsc::Receiver<McpCmd>, | ||
| mut ctx: McpActorContext, | ||
| ) { | ||
| while let Some(cmd) = cmd_rx.recv().await { | ||
| match cmd { | ||
| McpCmd::ForceReconnect { server_id, reply } => { | ||
| // Reject ids that aren't part of the assistant's current | ||
| // catalog. `disconnect_server` only clears `connections`; | ||
| // the underlying config lingers in the manager for the | ||
| // thread's lifetime (agentkit-mcp exposes no unregister | ||
| // path). Without this guard a detached integration could | ||
| // be resurrected via force_reconnect. | ||
| if !ctx.known.contains(server_id.0.as_str()) { | ||
| let _ = reply.send(Err(format!( | ||
| "mcp server {server_id} is not part of this assistant's current configuration" | ||
| ))); | ||
| continue; | ||
| } | ||
| if let Err(e) = manager.disconnect_server(&server_id).await { | ||
| tracing::debug!(server_id = %server_id, error = %e, "disconnect during force reconnect"); | ||
| } | ||
|
|
@@ -554,6 +696,9 @@ async fn run_mcp_actor(mut manager: McpServerManager, mut cmd_rx: mpsc::Receiver | |
| .map_err(|err| err.message); | ||
| let _ = reply.send(result); | ||
| } | ||
| McpCmd::Reconcile { desired } => { | ||
| reconcile_servers(&mut manager, &mut ctx, desired).await; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
reconcile_servers, the auth-required path marks the server as known before the auth notice is successfully created and delivered. Ifcreate_mcp_auth_flowfails transiently (or the inbox send fails), subsequent reconciles will skip this server because of thectx.knownguard, so no later turn can re-emitassistant_mcp_auth_requiredand the integration can remain unusable for the thread.Useful? React with 👍 / 👎.