Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"dependencies": {
"@clack/prompts": "^1.0.0",
"@mariozechner/pi-ai": "^0.54.0",
"@modelcontextprotocol/sdk": "^1.29.0",
Comment thread
vu1n marked this conversation as resolved.
"picocolors": "^1.1.1",
"ws": "^8.19.0"
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/agents/tools/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export const gatewayNativeToolHandlers: NativeToolHandlerMap = {

const path = typeof args.path === "string" ? args.path.trim() : undefined;
if (path) {
const value = await context.gateway.getConfigPath(path);
const value = await context.gateway.getSafeConfigPath(path);
return {
ok: true,
result: { path, value },
Expand Down
3 changes: 3 additions & 0 deletions gateway/src/config/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ export const DEFAULT_CONFIG: GsvConfig = {
maxRunsPerJobHistory: 200,
maxConcurrentRuns: 4,
},
mcp: {
servers: {},
},
userTimezone: "UTC",
};
22 changes: 22 additions & 0 deletions gateway/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ export interface CronConfig {
maxConcurrentRuns: number;
}

export interface McpServerConfig {
// MCP server URL. Must be HTTPS except for localhost/127.0.0.1.
url: string;
// Bearer token for authentication.
token?: string;
// Tool list cache TTL in milliseconds (default: 300000 / 5 min).
cacheTtlMs?: number;
// Request timeout in milliseconds (default: 30000).
timeoutMs?: number;
}

export interface McpConfig {
// Map of server ID → server configuration.
servers: Record<string, McpServerConfig>;
}

export interface GsvConfig {
// Model settings
model: {
Expand Down Expand Up @@ -237,6 +253,9 @@ export interface GsvConfig {
// Cron job scheduler configuration
cron: CronConfig;

// Remote MCP server connections
mcp: McpConfig;

// User timezone (IANA string, e.g. "America/Chicago"). Defaults to "UTC".
// Used in message envelopes, cron scheduling context, and system prompt.
userTimezone: string;
Expand Down Expand Up @@ -268,6 +287,9 @@ export type GsvConfigInput = {
};
compaction?: Partial<CompactionConfig>;
cron?: Partial<CronConfig>;
mcp?: {
servers?: Record<string, Partial<McpServerConfig>>;
};
userTimezone?: string;
};

Expand Down
2 changes: 1 addition & 1 deletion gateway/src/gateway/async-exec/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ export class GatewayAsyncExecStateService implements GatewayAsyncExecStateBridge
}

listTools(): ToolDefinition[] {
return this.#gateway.nodeService.listTools(this.#gateway.nodes.keys());
return this.#gateway.getAllTools();
}

getNodeInventory(): RuntimeNodeInventory {
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/gateway/channel-inbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ export async function handleChannelInboundRpc(
const result = await sessionStub.chatSend(
envelopedMessage,
runId,
JSON.parse(JSON.stringify(gw.nodeService.listTools(gw.nodes.keys()))),
JSON.parse(JSON.stringify(gw.getAllTools())),
JSON.parse(
JSON.stringify(gw.nodeService.getRuntimeNodeInventory(gw.nodes.keys())),
),
Expand Down
2 changes: 1 addition & 1 deletion gateway/src/gateway/cron-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export async function executeCronJob(
await session.chatSend(
cronMessage,
runId,
JSON.parse(JSON.stringify(gw.nodeService.listTools(gw.nodes.keys()))),
JSON.parse(JSON.stringify(gw.getAllTools())),
JSON.parse(
JSON.stringify(gw.nodeService.getRuntimeNodeInventory(gw.nodes.keys())),
),
Expand Down
185 changes: 171 additions & 14 deletions gateway/src/gateway/do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
import { isWebSocketRequest, validateFrame } from "../shared/utils";
import { DEFAULT_CONFIG } from "../config/defaults";
import { GsvConfig, GsvConfigInput, mergeConfig, PendingPair } from "../config";
import { McpService, isValidMcpServerId, type ResolvedMcpTool } from "./mcp-service";
import { getDefaultAgentId } from "../config/parsing";
import {
HeartbeatState,
Expand Down Expand Up @@ -90,6 +91,18 @@ type GatewayAlarmParticipant = {
run: (params: { now: number }) => Promise<void> | void;
};

function traverseDotPath(root: unknown, path: string): unknown {
let current: unknown = root;
for (const part of path.split(".")) {
if (current && typeof current === "object" && part in current) {
current = (current as Record<string, unknown>)[part];
} else {
return undefined;
}
}
return current;
}

export class Gateway extends DurableObject<Env> {
clients: Map<string, WebSocket> = new Map();
nodes: Map<string, WebSocket> = new Map();
Expand All @@ -104,6 +117,7 @@ export class Gateway extends DurableObject<Env> {
readonly nodeService = new GatewayNodeService(
this.ctx.storage.kv,
);
readonly mcpService = new McpService();

readonly configStore = PersistedObject<Record<string, unknown>>(
this.ctx.storage.kv,
Expand Down Expand Up @@ -243,6 +257,9 @@ export class Gateway extends DurableObject<Env> {
`[Gateway] Preserving ${detachedRuntimeNodeIds.length} detached runtime entries for known hosts`,
);
}

// Like nodes, MCP tools only appear in the catalog after the cache warms.
this.warmMcpCacheIfConfigured();
}

async fetch(request: Request): Promise<Response> {
Expand Down Expand Up @@ -372,12 +389,123 @@ export class Gateway extends DurableObject<Env> {
}
}

private resolveMcp(toolName: string): ResolvedMcpTool | null {
return this.mcpService.resolve(toolName, this.getFullConfig().mcp);
}

async toolRequest(
params: ToolRequestParams,
): Promise<{ ok: boolean; error?: string }> {
// MCP first — always-available remote servers take priority over nodes.
const resolved = this.resolveMcp(params.tool);
if (resolved) {
this.ctx.waitUntil(this.executeMcpToolCall(params, resolved));
return { ok: true };
}

// Fall through to node routing
return this.nodeService.requestToolForSession(params, this.nodes);
}

private async executeMcpToolCall(
params: ToolRequestParams,
resolved: ResolvedMcpTool,
): Promise<void> {
const sessionStub = this.env.SESSION.getByName(params.sessionKey);
const mcpResult = await this.mcpService.callTool(
resolved.serverConfig,
resolved.toolName,
params.args,
resolved.serverId,
);
await sessionStub.toolResult({
callId: params.callId,
result: mcpResult.ok ? mcpResult.result : undefined,
error: mcpResult.ok ? undefined : mcpResult.error,
});
}

/** @internal Called by handleToolInvoke RPC handler — needs ctx.waitUntil. */
executeMcpToolInvoke(
ws: WebSocket,
frameId: string,
resolved: ResolvedMcpTool,
args: Record<string, unknown>,
): void {
this.ctx.waitUntil(
this.mcpService
.callTool(resolved.serverConfig, resolved.toolName, args, resolved.serverId)
.then((mcpResult) => {
if (!mcpResult.ok) {
this.sendError(ws, frameId, 500, mcpResult.error || "MCP tool failed");
} else {
this.sendOk(ws, frameId, { result: mcpResult.result });
}
})
.catch((err) => {
console.error("[Gateway] MCP tool invoke error:", err);
}),
);
}

/** Validate MCP server IDs in a config write at any depth. */
private validateMcpConfig(parts: string[], value: unknown): void {
// mcp.servers.{serverId}[.*] — validate the specific server ID
if (parts.length >= 3 && parts[0] === "mcp" && parts[1] === "servers") {
const validation = isValidMcpServerId(parts[2]);
if (!validation.valid) {
throw new Error(`Invalid MCP server ID "${parts[2]}": ${validation.reason}`);
}
return;
}

// mcp.servers = { id: {...}, ... } — validate all server IDs in the object
if (parts.length === 2 && parts[0] === "mcp" && parts[1] === "servers" && value && typeof value === "object") {
this.validateMcpServerIds(value as Record<string, unknown>);
return;
}

// mcp = { servers: { id: {...}, ... } } — validate server IDs in nested object
if (parts.length === 1 && parts[0] === "mcp" && value && typeof value === "object") {
const servers = (value as Record<string, unknown>).servers;
if (servers && typeof servers === "object") {
this.validateMcpServerIds(servers as Record<string, unknown>);
}
}
}

private validateMcpServerIds(servers: Record<string, unknown>): void {
for (const serverId of Object.keys(servers)) {
const validation = isValidMcpServerId(serverId);
if (!validation.valid) {
throw new Error(`Invalid MCP server ID "${serverId}": ${validation.reason}`);
}
}
}

/** Invalidate MCP cache and warm after config change. */
private onMcpConfigChanged(parts: string[]): void {
if (parts.length >= 3 && parts[0] === "mcp" && parts[1] === "servers") {
if (this.nodes.has(parts[2])) {
console.warn(
`[Gateway] MCP server "${parts[2]}" collides with connected node ID. MCP tools will take priority.`,
);
}
}
this.mcpService.invalidateCache();
this.warmMcpCacheIfConfigured();
}

private warmMcpCacheIfConfigured(): void {
const config = this.getFullConfig();
if (Object.keys(config.mcp.servers).length === 0) return;
this.ctx.waitUntil(
this.mcpService.refreshCache(config.mcp).then(() =>
this.scheduleGatewayAlarm(),
),
);
}

sendOk(ws: WebSocket, id: string, payload?: unknown) {
const res: ResponseFrame = { type: "res", id, ok: true, payload };
ws.send(JSON.stringify(res));
Expand Down Expand Up @@ -566,13 +694,19 @@ export class Gateway extends DurableObject<Env> {
`[Gateway] toolRegistry keys: [${this.nodeService.listToolRegistryNodeIds().join(", ")}]`,
);

const tools = this.nodeService.listTools(this.nodes.keys());
// Unified tool list: native + node + MCP
const config = this.getFullConfig();
const mcpTools = this.mcpService.listToolsCached(config.mcp);
const tools = [
...this.nodeService.listTools(this.nodes.keys()),
...mcpTools,
];
const nodeTools = tools.filter(
(tool) => tool.name.includes("__") && !tool.name.startsWith("gsv__"),
);
const nativeTools = tools.length - nodeTools.length;
console.log(
`[Gateway] returning ${tools.length} tools (${nativeTools} native + ${nodeTools.length} node): [${tools.map((t) => t.name).join(", ")}]`,
`[Gateway] returning ${tools.length} tools (${nativeTools} native + ${nodeTools.length} remote [${mcpTools.length} mcp]): [${tools.map((t) => t.name).join(", ")}]`,
);
return tools;
}
Expand Down Expand Up @@ -743,25 +877,21 @@ export class Gateway extends DurableObject<Env> {
}

getConfigPath(path: string): unknown {
const parts = path.split(".");
let current: unknown = this.getFullConfig();

for (const part of parts) {
if (current && typeof current === "object" && part in current) {
current = (current as Record<string, unknown>)[part];
} else {
return undefined;
}
}

return current;
return traverseDotPath(this.getFullConfig(), path);
}

setConfigPath(path: string, value: unknown): void {
const parts = path.split(".");

// Validate MCP server IDs before writing — handles both nested and top-level
this.validateMcpConfig(parts, value);

if (parts.length === 1) {
this.configStore[path] = value;
// Run post-write hooks for top-level MCP config replacement
if (path === "mcp") {
this.onMcpConfigChanged(parts);
}
return;
}

Expand Down Expand Up @@ -796,6 +926,10 @@ export class Gateway extends DurableObject<Env> {

// Clean up any flat key that might exist
delete this.configStore[path];

if (path === "mcp.servers" || path.startsWith("mcp.servers.")) {
this.onMcpConfigChanged(parts);
}
}

getFullConfig(): GsvConfig {
Expand All @@ -817,13 +951,24 @@ export class Gateway extends DurableObject<Env> {
...full.auth,
token: full.auth.token ? "***" : undefined,
};
const mcpServers = Object.fromEntries(
Object.entries(full.mcp.servers).map(([id, server]) => [
id,
{ ...server, token: server.token ? "***" : undefined },
]),
);
return {
...full,
apiKeys,
auth,
mcp: { ...full.mcp, servers: mcpServers },
};
}

getSafeConfigPath(path: string): unknown {
return traverseDotPath(this.getSafeConfig(), path);
}

getConfig(): GsvConfig {
return this.getFullConfig();
}
Expand Down Expand Up @@ -969,6 +1114,18 @@ export class Gateway extends DurableObject<Env> {
);
},
},
{
name: "mcpCacheRefresh",
nextDueMs: this.mcpService.nextCacheExpiryMs(
this.getFullConfig().mcp,
),
run: async () => {
const config = this.getFullConfig();
if (Object.keys(config.mcp.servers).length > 0) {
await this.mcpService.refreshCache(config.mcp);
}
},
},
];
}

Expand Down
2 changes: 1 addition & 1 deletion gateway/src/gateway/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ export async function runHeartbeat(
};
}
const prompt = config.prompt;
const tools = JSON.parse(JSON.stringify(gw.nodeService.listTools(gw.nodes.keys())));
const tools = JSON.parse(JSON.stringify(gw.getAllTools()));
const runtimeNodes = JSON.parse(
JSON.stringify(gw.nodeService.getRuntimeNodeInventory(gw.nodes.keys())),
);
Expand Down
Loading