diff --git a/.changeset/age-2516-assistant-mcp-server-reconcile.md b/.changeset/age-2516-assistant-mcp-server-reconcile.md new file mode 100644 index 0000000000..5762ee4e5b --- /dev/null +++ b/.changeset/age-2516-assistant-mcp-server-reconcile.md @@ -0,0 +1,5 @@ +--- +"server": patch +--- + +Assistants now pick up MCP server additions and removals on the next turn instead of only on a fresh runtime bootstrap. The per-turn dispatch sends the current MCP set to the runner, which reconciles its live connections without recycling the VM. Previously a newly attached integration (e.g. GitHub MCP) stayed invisible to the running assistant until the runtime was restarted, leaving the model unable to use it or to invoke `mcp_force_reconnect` for it. diff --git a/agents/runner/src/runtime.rs b/agents/runner/src/runtime.rs index 85730cefdf..801e69cdb8 100644 --- a/agents/runner/src/runtime.rs +++ b/agents/runner/src/runtime.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::panic::AssertUnwindSafe; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -88,6 +89,11 @@ pub enum McpCmd { server_id: McpServerId, reply: oneshot::Sender>, }, + /// Sent by `/threads/{id}/turn` when the server-side toolset has + /// drifted from the snapshot the runner bootstrapped with. The actor + /// diffs `desired` against currently-registered servers, connects + /// any new ones, and disconnects ones that no longer apply. + Reconcile { desired: Vec }, } impl ConfiguredThread { @@ -278,8 +284,16 @@ async fn spawn_thread( bootstrap: ThreadBootstrap, tokens: TokenRegistry, ) -> Result, RunnerError> { - let (mcp_cmd_tx, mcp_catalog, mcp_auth_notices) = - build_thread_mcp(host, &thread_id, &bootstrap.mcp_servers, &tokens).await?; + let (inbox_tx, inbox_rx) = mpsc::unbounded_channel::(); + + let (mcp_cmd_tx, mcp_catalog, mcp_auth_notices) = build_thread_mcp( + host, + &thread_id, + &bootstrap.mcp_servers, + &tokens, + inbox_tx.clone(), + ) + .await?; let chat_id = bootstrap.chat_id.clone(); @@ -360,9 +374,8 @@ async fn spawn_thread( let fs_resources = FileSystemToolResources::new() .with_policy(FileSystemToolPolicy::new().require_read_before_write(true)); - let mcp_server_ids: Vec = bootstrap.mcp_servers.iter().map(|s| s.id.clone()).collect(); let native_tools = ToolRegistry::new().with(tools::bun_run::bun_run).with( - tools::mcp_force_reconnect::McpForceReconnectTool::new(Arc::clone(host), mcp_server_ids), + tools::mcp_force_reconnect::McpForceReconnectTool::new(Arc::clone(host)), ); let mcp_source = ClippedToolSource::new(mcp_catalog, host.spill_root.clone()); @@ -392,7 +405,6 @@ async fn spawn_thread( .map_err(|e| RunnerError::AgentStart(e.to_string()))?; let idle_since = Arc::new(Mutex::new(Some(Instant::now()))); - let (inbox_tx, inbox_rx) = mpsc::unbounded_channel::(); let loop_idle = Arc::clone(&idle_since); let log_thread_id = thread_id.clone(); let host_for_eviction = Arc::clone(host); @@ -440,43 +452,157 @@ async fn build_thread_mcp( thread_id: &str, servers: &[McpServer], tokens: &TokenRegistry, + inbox_tx: UnboundedSender, ) -> Result<(mpsc::Sender, CatalogReader, Vec), RunnerError> { let mut manager = McpServerManager::new(); let catalog = manager.source(); let mut auth_notices = Vec::new(); + let mut known = BTreeSet::new(); for server in servers { let config = build_mcp_server_config(server, &host.http_client, tokens)?; let server_id = McpServerId::new(server.id.clone()); manager.register_server(config); - if let Err(err) = connect_and_log(&mut manager, &server_id, "register").await - && err.auth_required - { - match host - .gram_client - .create_mcp_auth_flow(thread_id, &server.id, &server.url, tokens) - .await - { - Ok(flow) => auth_notices.push(format!( - "\nEventType: assistant_mcp_auth_required\nMCPServerID: {server_id}\nMCPSlug: {mcp_slug}\nAuthURL: {auth_url}\n", - server_id = flow.server_id, - mcp_slug = flow.mcp_slug, - auth_url = flow.auth_url, - )), - Err(flow_err) => tracing::warn!( - server_id = %server_id, - error = %flow_err, - "failed to create assistant mcp auth flow" - ), + match connect_and_log(&mut manager, &server_id, "register").await { + Ok(()) => { + known.insert(server.id.clone()); + } + Err(err) if err.auth_required => { + known.insert(server.id.clone()); + if let Some(notice) = + create_auth_notice(host, thread_id, &server.id, &server.url, tokens).await + { + auth_notices.push(notice); + } + } + Err(_) => { + // Transient transport failure: leave out of `known` so the + // next /turn reconcile retries instead of silently dropping + // the integration for the rest of the thread. } } } let (cmd_tx, cmd_rx) = mpsc::channel(MCP_CMD_CAPACITY); - tokio::spawn(run_mcp_actor(manager, cmd_rx)); + let actor_ctx = McpActorContext { + host: Arc::clone(host), + thread_id: thread_id.to_string(), + tokens: tokens.clone(), + inbox_tx, + known, + }; + tokio::spawn(run_mcp_actor(manager, cmd_rx, actor_ctx)); Ok((cmd_tx, catalog, auth_notices)) } +struct McpActorContext { + host: Arc, + thread_id: String, + tokens: TokenRegistry, + inbox_tx: UnboundedSender, + known: BTreeSet, +} + +async fn create_auth_notice( + host: &Arc, + thread_id: &str, + server_id: &str, + server_url: &str, + tokens: &TokenRegistry, +) -> Option { + match host + .gram_client + .create_mcp_auth_flow(thread_id, server_id, server_url, tokens) + .await + { + Ok(flow) => Some(format!( + "\nEventType: assistant_mcp_auth_required\nMCPServerID: {server_id}\nMCPSlug: {mcp_slug}\nAuthURL: {auth_url}\n", + server_id = flow.server_id, + mcp_slug = flow.mcp_slug, + auth_url = flow.auth_url, + )), + Err(flow_err) => { + tracing::warn!( + server_id, + error = %flow_err, + "failed to create assistant mcp auth flow" + ); + None + } + } +} + +async fn reconcile_servers( + manager: &mut McpServerManager, + ctx: &mut McpActorContext, + desired: Vec, +) { + let desired_ids: BTreeSet = desired.iter().map(|s| s.id.clone()).collect(); + + for server in &desired { + if ctx.known.contains(&server.id) { + continue; + } + let config = match build_mcp_server_config(server, &ctx.host.http_client, &ctx.tokens) { + Ok(cfg) => cfg, + Err(err) => { + tracing::warn!( + server_id = %server.id, + error = %err, + "skip reconcile-added mcp server: config build failed" + ); + continue; + } + }; + manager.register_server(config); + let server_uid = McpServerId::new(server.id.clone()); + match connect_and_log(manager, &server_uid, "reconcile_add").await { + Ok(()) => { + ctx.known.insert(server.id.clone()); + } + Err(err) if err.auth_required => { + ctx.known.insert(server.id.clone()); + if let Some(notice) = create_auth_notice( + &ctx.host, + &ctx.thread_id, + &server.id, + &server.url, + &ctx.tokens, + ) + .await + && ctx.inbox_tx.send(notice).is_err() + { + tracing::warn!( + server_id = %server.id, + "drop reconcile auth notice: thread inbox closed" + ); + } + } + Err(_) => { + // Transient transport failure on connect: leave out of + // `known` so a later reconcile re-attempts the connect. + // The config is still registered, which means a manual + // `mcp_force_reconnect` will not bypass this path — see + // the `known` guard in the ForceReconnect arm. + } + } + } + + let removed: Vec = ctx + .known + .iter() + .filter(|id| !desired_ids.contains(*id)) + .cloned() + .collect(); + for id in removed { + let server_uid = McpServerId::new(id.clone()); + if let Err(err) = manager.disconnect_server(&server_uid).await { + tracing::debug!(server_id = %id, error = %err, "reconcile disconnect (ignored)"); + } + ctx.known.remove(&id); + } +} + struct McpConnectFailure { message: String, auth_required: bool, @@ -542,10 +668,26 @@ fn build_mcp_server_config( )) } -async fn run_mcp_actor(mut manager: McpServerManager, mut cmd_rx: mpsc::Receiver) { +async fn run_mcp_actor( + mut manager: McpServerManager, + mut cmd_rx: mpsc::Receiver, + mut ctx: McpActorContext, +) { while let Some(cmd) = cmd_rx.recv().await { match cmd { McpCmd::ForceReconnect { server_id, reply } => { + // Reject ids that aren't part of the assistant's current + // catalog. `disconnect_server` only clears `connections`; + // the underlying config lingers in the manager for the + // thread's lifetime (agentkit-mcp exposes no unregister + // path). Without this guard a detached integration could + // be resurrected via force_reconnect. + if !ctx.known.contains(server_id.0.as_str()) { + let _ = reply.send(Err(format!( + "mcp server {server_id} is not part of this assistant's current configuration" + ))); + continue; + } if let Err(e) = manager.disconnect_server(&server_id).await { tracing::debug!(server_id = %server_id, error = %e, "disconnect during force reconnect"); } @@ -554,6 +696,9 @@ async fn run_mcp_actor(mut manager: McpServerManager, mut cmd_rx: mpsc::Receiver .map_err(|err| err.message); let _ = reply.send(result); } + McpCmd::Reconcile { desired } => { + reconcile_servers(&mut manager, &mut ctx, desired).await; + } } } } diff --git a/agents/runner/src/server.rs b/agents/runner/src/server.rs index 88d18ce423..3d2bed308a 100644 --- a/agents/runner/src/server.rs +++ b/agents/runner/src/server.rs @@ -9,7 +9,7 @@ use tokio::net::TcpListener; use tokio::sync::{Mutex, Notify}; use crate::runtime::{ - AppState, DEFAULT_THREAD_IDLE_TTL, build_host, ensure_thread, snapshot_threads, + AppState, DEFAULT_THREAD_IDLE_TTL, McpCmd, build_host, ensure_thread, snapshot_threads, }; const IDEMPOTENCY_HEADER: &str = "x-idempotency-key"; @@ -113,6 +113,21 @@ async fn thread_turn( .await .map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?; + // Hand reconcile to the actor and proceed to enqueue. The actor runs + // concurrently with the agent loop, so a server added by this /turn + // may surface on the very next model step or on the one after, + // depending on whether the connect finishes before tool catalog is + // sampled. Either way it lands before the user notices. + if let Some(desired) = request.mcp_servers + && thread + .mcp_cmd_tx + .send(McpCmd::Reconcile { desired }) + .await + .is_err() + { + tracing::warn!(thread_id = %thread_id, "drop mcp reconcile: actor channel closed"); + } + thread .enqueue(request.input) .map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, e.to_string()))?; diff --git a/agents/runner/src/tools/mcp_force_reconnect.rs b/agents/runner/src/tools/mcp_force_reconnect.rs index 79ddcfe71e..2ab0b0fd9e 100644 --- a/agents/runner/src/tools/mcp_force_reconnect.rs +++ b/agents/runner/src/tools/mcp_force_reconnect.rs @@ -8,7 +8,7 @@ use agentkit_tools_core::{ use async_trait::async_trait; use futures::future::join_all; use serde::Deserialize; -use serde_json::{Value, json}; +use serde_json::json; use tokio::sync::oneshot; use crate::runtime::{McpCmd, RuntimeHost}; @@ -21,9 +21,11 @@ pub struct McpForceReconnectTool { } impl McpForceReconnectTool { - pub fn new(host: Arc, server_ids: Vec) -> Self { - let spec = build_spec(&server_ids); - Self { host, spec } + pub fn new(host: Arc) -> Self { + Self { + host, + spec: build_spec(), + } } } @@ -32,43 +34,29 @@ struct McpForceReconnectInput { server_id: String, } -fn build_spec(server_ids: &[String]) -> ToolSpec { - // The model only sees tools advertised in /tools/list. Without the enum, - // the assistant has to guess server_id from the `mcp__` prefix - // — which fails the moment the server is disconnected (no prefixed tools - // exposed). Baking the configured server IDs into the schema keeps the - // tool callable even when MCP is fully offline. - let server_ids_json: Vec = server_ids.iter().cloned().map(Value::String).collect(); - - let server_id_property = if server_ids_json.is_empty() { - json!({ - "type": "string", - "description": "ID of the MCP server to reconnect.", - }) - } else { - json!({ - "type": "string", - "description": "ID of the MCP server to reconnect.", - "enum": server_ids_json, - }) - }; - +fn build_spec() -> ToolSpec { + // server_id is intentionally not enumerated. The set of registered MCP + // servers can drift mid-thread (assistant toolset edits flow in via + // /turn reconcile), so any frozen enum becomes stale as soon as the + // user attaches a new integration. The model discovers live server + // ids from the `mcp__` prefix of catalog entries and from + // the `assistant_mcp_auth` event context. let input_schema = json!({ "type": "object", - "properties": { "server_id": server_id_property }, + "properties": { + "server_id": { + "type": "string", + "description": "ID of the MCP server to reconnect.", + }, + }, "required": ["server_id"], "additionalProperties": false, }); - let description = if server_ids.is_empty() { - "Disconnect and reconnect a registered MCP server. No MCP servers are \ -configured for this assistant; calling this tool will fail." - } else { - "Disconnect and reconnect a registered MCP server. Use this when an \ + let description = "Disconnect and reconnect a registered MCP server. Use this when an \ MCP-backed tool returns a connection-related error (timeout, transport closed, \ auth failure that the backend has since refreshed) or when no MCP-backed tools \ -appear in the catalog." - }; +appear in the catalog."; ToolSpec::new(TOOL_NAME, description, input_schema) .with_annotations(ToolAnnotations::default().with_idempotent(true)) diff --git a/agents/runner/src/wire.rs b/agents/runner/src/wire.rs index 5cd6d78b09..e8a849194a 100644 --- a/agents/runner/src/wire.rs +++ b/agents/runner/src/wire.rs @@ -13,12 +13,16 @@ pub struct McpServer { /// `/threads/{thread_id}/turn` request body. The runner looks up — or /// bootstraps — a per-thread tokio task on first hit and enqueues `input` /// onto its inbox. `auth_token` rotates the host's shared bearer; an -/// optional `mcp_servers` reconciles the assistant-wide MCP set. +/// optional `mcp_servers` reconciles the assistant-wide MCP set so +/// toolset edits made after bootstrap take effect without recycling the +/// VM. #[derive(Debug, Deserialize)] pub struct ThreadTurnRequest { pub input: String, #[serde(default)] pub auth_token: Option, + #[serde(default)] + pub mcp_servers: Option>, } /// 202-style ack returned by `/threads/{thread_id}/turn`. The actual turn diff --git a/server/internal/assistants/runtime.go b/server/internal/assistants/runtime.go index e225d08b8c..ea7f3ecbf8 100644 --- a/server/internal/assistants/runtime.go +++ b/server/internal/assistants/runtime.go @@ -75,8 +75,9 @@ type runtimeToolCall struct { } type runtimeTurnRequest struct { - Input string `json:"input"` - AuthToken string `json:"auth_token,omitempty"` + Input string `json:"input"` + AuthToken string `json:"auth_token,omitempty"` + MCPServers []runtimeMCPServer `json:"mcp_servers,omitempty"` } type runtimeHTTPRequest struct { diff --git a/server/internal/assistants/runtime_backend.go b/server/internal/assistants/runtime_backend.go index 143582f965..b25d0b297b 100644 --- a/server/internal/assistants/runtime_backend.go +++ b/server/internal/assistants/runtime_backend.go @@ -25,7 +25,10 @@ type RuntimeBackend interface { // RunTurn delivers a turn for `threadID` to the runner backing // `runtime`. The call lands on /threads/{threadID}/turn so the // runner can dispatch to the right per-thread tokio task. - RunTurn(ctx context.Context, runtime assistantRuntimeRecord, threadID uuid.UUID, idempotencyKey string, authToken string, prompt string) error + // mcpServers carries the assistant's current MCP set so the runner + // can reconcile newly attached or detached servers into a live + // thread without re-running the full thread bootstrap. + RunTurn(ctx context.Context, runtime assistantRuntimeRecord, threadID uuid.UUID, idempotencyKey string, authToken string, prompt string, mcpServers []runtimeMCPServer) error Status(ctx context.Context, runtime assistantRuntimeRecord) (RuntimeBackendStatus, error) // Stop halts the active runtime so it can be re-admitted later. Backends // may keep persisted state (e.g. Fly app + IP) intact for warm reuse. diff --git a/server/internal/assistants/runtime_fly.go b/server/internal/assistants/runtime_fly.go index bb8fd39695..58d1c445a5 100644 --- a/server/internal/assistants/runtime_fly.go +++ b/server/internal/assistants/runtime_fly.go @@ -759,7 +759,7 @@ func (f *FlyRuntimeBackend) tracedWaitHealth(ctx context.Context, target flyRunt return f.waitForRuntimeHealth(ctx, target) } -func (f *FlyRuntimeBackend) RunTurn(ctx context.Context, runtime assistantRuntimeRecord, threadID uuid.UUID, idempotencyKey string, authToken string, prompt string) error { +func (f *FlyRuntimeBackend) RunTurn(ctx context.Context, runtime assistantRuntimeRecord, threadID uuid.UUID, idempotencyKey string, authToken string, prompt string, mcpServers []runtimeMCPServer) error { if err := validateRuntimeBackend(f, runtime.Backend); err != nil { return err } @@ -772,8 +772,9 @@ func (f *FlyRuntimeBackend) RunTurn(ctx context.Context, runtime assistantRuntim } reqBody, err := json.Marshal(runtimeTurnRequest{ - Input: prompt, - AuthToken: authToken, + Input: prompt, + AuthToken: authToken, + MCPServers: mcpServers, }) if err != nil { return fmt.Errorf("marshal assistant fly runtime turn request: %w", err) diff --git a/server/internal/assistants/runtime_fly_test.go b/server/internal/assistants/runtime_fly_test.go index 44d6a8d062..4dff4f64cf 100644 --- a/server/internal/assistants/runtime_fly_test.go +++ b/server/internal/assistants/runtime_fly_test.go @@ -791,6 +791,7 @@ func TestFlyRuntimeBackendRunTurnHitsThreadScopedRoute(t *testing.T) { admittingThreadID := uuid.New() turnThreadID := uuid.New() var observedPath string + var observedBody runtimeTurnRequest mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { @@ -798,6 +799,7 @@ func TestFlyRuntimeBackendRunTurnHitsThreadScopedRoute(t *testing.T) { }) mux.HandleFunc(fmt.Sprintf("/threads/%s/turn", turnThreadID), func(w http.ResponseWriter, r *http.Request) { observedPath = r.URL.Path + _ = json.NewDecoder(r.Body).Decode(&observedBody) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"finish_reason":"accepted"}`)) }) @@ -820,8 +822,14 @@ func TestFlyRuntimeBackendRunTurnHitsThreadScopedRoute(t *testing.T) { BackendMetadataJSON: rawMetadata, } - require.NoError(t, backend.RunTurn(context.Background(), rec, turnThreadID, "idem-1", "tok", "hi")) + servers := []runtimeMCPServer{ + {ID: "github", URL: "https://example/mcp/github", Headers: map[string]string{"Gram-Environment": "prod"}}, + } + require.NoError(t, backend.RunTurn(context.Background(), rec, turnThreadID, "idem-1", "tok", "hi", servers)) require.Equal(t, fmt.Sprintf("/threads/%s/turn", turnThreadID), observedPath) + require.Equal(t, "hi", observedBody.Input) + require.Equal(t, "tok", observedBody.AuthToken) + require.Equal(t, servers, observedBody.MCPServers) } func TestFlyRuntimeConfigValidateRequiresServerURL(t *testing.T) { diff --git a/server/internal/assistants/runtime_telemetry.go b/server/internal/assistants/runtime_telemetry.go index f89211d0a5..efefa4a87f 100644 --- a/server/internal/assistants/runtime_telemetry.go +++ b/server/internal/assistants/runtime_telemetry.go @@ -120,9 +120,10 @@ func (t *telemetryRuntimeBackend) RunTurn( idempotencyKey string, authToken string, prompt string, + mcpServers []runtimeMCPServer, ) error { t.emit(ctx, runtime, "runtime_turn", "runtime turn dispatched", "INFO", nil) - if err := t.inner.RunTurn(ctx, runtime, threadID, idempotencyKey, authToken, prompt); err != nil { + if err := t.inner.RunTurn(ctx, runtime, threadID, idempotencyKey, authToken, prompt, mcpServers); err != nil { t.emit(ctx, runtime, "runtime_turn", "runtime turn errored", "ERROR", err) return fmt.Errorf("runtime run turn: %w", err) } diff --git a/server/internal/assistants/service.go b/server/internal/assistants/service.go index b5c254cff6..46132f402e 100644 --- a/server/internal/assistants/service.go +++ b/server/internal/assistants/service.go @@ -1625,12 +1625,14 @@ func (s *ServiceCore) processEventTurn( runtime assistantRuntimeRecord, event assistantThreadEventRecord, ) error { + mcpServers := s.currentRuntimeMCPServers(ctx, assistant.Toolsets) + if prompt, ok := decodeMCPAuthTurn(ctx, s.logger, event); ok { turnToken, err := s.MintThreadScopedRuntimeToken(assistant, thread.ID) if err != nil { return err } - if err := s.runtime.RunTurn(ctx, runtime, thread.ID, event.ID.String(), turnToken, prompt); err != nil { + if err := s.runtime.RunTurn(ctx, runtime, thread.ID, event.ID.String(), turnToken, prompt, mcpServers); err != nil { return fmt.Errorf("run assistant turn: %w", err) } return nil @@ -1648,12 +1650,27 @@ func (s *ServiceCore) processEventTurn( if err != nil { return err } - if err := s.runtime.RunTurn(ctx, runtime, thread.ID, event.ID.String(), turnToken, prompt); err != nil { + if err := s.runtime.RunTurn(ctx, runtime, thread.ID, event.ID.String(), turnToken, prompt, mcpServers); err != nil { return fmt.Errorf("run assistant turn: %w", err) } return nil } +// currentRuntimeMCPServers builds the MCP server set the runner should be +// running with right now. The runner reconciles its live thread state +// against this list on every turn, so toolset edits (added/removed MCP +// servers) take effect on the next event without recycling the VM. Falls +// back to nil when the runtime server URL is not configured — the runner +// already has the bootstrap-time set and we'd rather skip reconcile than +// dispatch a turn with bogus URLs. +func (s *ServiceCore) currentRuntimeMCPServers(ctx context.Context, toolsets []assistantToolsetRow) []runtimeMCPServer { + serverURL := s.runtime.ServerURL() + if serverURL == nil { + return nil + } + return resolveAssistantMCPServers(ctx, s.logger, serverURL, toolsets) +} + func (s *ServiceCore) startProcessingLeaseHeartbeat( ctx context.Context, projectID uuid.UUID, diff --git a/server/internal/assistants/service_test.go b/server/internal/assistants/service_test.go index 73b93b68bb..95ddfd6121 100644 --- a/server/internal/assistants/service_test.go +++ b/server/internal/assistants/service_test.go @@ -20,6 +20,7 @@ import ( "github.com/speakeasy-api/gram/server/internal/auth/assistanttokens" bgtriggers "github.com/speakeasy-api/gram/server/internal/background/triggers" chatrepo "github.com/speakeasy-api/gram/server/internal/chat/repo" + "github.com/speakeasy-api/gram/server/internal/platformtools" projectsrepo "github.com/speakeasy-api/gram/server/internal/projects/repo" "github.com/speakeasy-api/gram/server/internal/telemetry" "github.com/speakeasy-api/gram/server/internal/testenv" @@ -843,7 +844,9 @@ func TestServiceCoreProcessThreadEventsCompletesEvent(t *testing.T) { logger := testenv.NewLogger(t) tokens := assistanttokens.New("test-jwt-secret", conn, nil) - core := NewServiceCore(logger, testenv.NewTracerProvider(t), conn, nil, nil, testRuntimeBackend{backend: runtimeBackendFlyIO, runTurnErr: nil}, nil, tokens, nil, telemetry.NewStub(logger), nil) + runTurnMCP := &atomic.Pointer[[]runtimeMCPServer]{} + backend := testRuntimeBackend{backend: runtimeBackendFlyIO, runTurnErr: nil, runTurnMCPServers: runTurnMCP} + core := NewServiceCore(logger, testenv.NewTracerProvider(t), conn, nil, nil, backend, nil, tokens, nil, telemetry.NewStub(logger), nil) admitted, err := core.AdmitPendingThreads(t.Context(), assistantID) require.NoError(t, err) @@ -863,6 +866,16 @@ func TestServiceCoreProcessThreadEventsCompletesEvent(t *testing.T) { runtime, err := assistantsrepo.New(conn).GetActiveAssistantRuntimeByThreadID(t.Context(), assistantsrepo.GetActiveAssistantRuntimeByThreadIDParams{AssistantThreadID: threadID, ProjectID: projectID}) require.NoError(t, err) require.Equal(t, runtimeStateActive, runtime.State) + + // Resolved MCP set must flow through processEventTurn → RunTurn each + // turn — that is the channel through which assistant toolset edits + // reach a live runner without recycling the VM. The bare fixture has + // no user toolsets, so we assert on the always-present implicit + // platform entry. + captured := runTurnMCP.Load() + require.NotNil(t, captured, "RunTurn must receive mcp_servers") + require.NotEmpty(t, *captured) + require.Equal(t, "_platform-"+platformtools.AssistantsPlatformToolsetSlug, (*captured)[0].ID) } func TestServiceCoreProcessThreadEventsRequeuesOnTurnFailure(t *testing.T) { @@ -1295,16 +1308,17 @@ func TestServiceCoreReapInactiveAssistantRuntimesReapsStaleRowsRegardlessOfState } type testRuntimeBackend struct { - backend string - ensureResult RuntimeBackendEnsureResult - ensureErr error - runTurnErr error - statusResult RuntimeBackendStatus - statusErr error - stopErr error - stopCalls *atomic.Int64 - reapCalls *atomic.Int64 - reapErr error + backend string + ensureResult RuntimeBackendEnsureResult + ensureErr error + runTurnErr error + runTurnMCPServers *atomic.Pointer[[]runtimeMCPServer] + statusResult RuntimeBackendStatus + statusErr error + stopErr error + stopCalls *atomic.Int64 + reapCalls *atomic.Int64 + reapErr error } func (t testRuntimeBackend) Backend() string { @@ -1326,7 +1340,11 @@ func (t testRuntimeBackend) Ensure(context.Context, assistantRuntimeRecord) (Run return t.ensureResult, nil } -func (t testRuntimeBackend) RunTurn(context.Context, assistantRuntimeRecord, uuid.UUID, string, string, string) error { +func (t testRuntimeBackend) RunTurn(_ context.Context, _ assistantRuntimeRecord, _ uuid.UUID, _ string, _ string, _ string, mcpServers []runtimeMCPServer) error { + if t.runTurnMCPServers != nil { + captured := append([]runtimeMCPServer(nil), mcpServers...) + t.runTurnMCPServers.Store(&captured) + } return t.runTurnErr }