diff --git a/tunnel/build/src/client.js b/tunnel/build/src/client.js index 6b443725..d834595f 100644 --- a/tunnel/build/src/client.js +++ b/tunnel/build/src/client.js @@ -1,6 +1,7 @@ import { EventEmitter } from 'node:events'; import { WebSocket } from './third_party/index.js'; import { parseOriginPatterns } from './allowlist.js'; +import { EventRing } from './history.js'; import { RECONNECT_BASE_MS, RECONNECT_MAX_MS, RESUME_SUBPROTOCOL_PREFIX, STALE_CONNECTION_MS, YAMUX_PING_INTERVAL_MS, } from './types.js'; import { LegacyTransport } from './transport_legacy.js'; import { YamuxTransport } from './transport_yamux.js'; @@ -33,6 +34,12 @@ export class TunnelClient extends EventEmitter { #intentionalDisconnect = false; #resumeToken; #traceId; + /** + * Per-client lifecycle event ring. Read-only from the outside; surfaced via + * the `tunnel-history` MCP tool so callers (esp. agents) can self-diagnose + * why a tunnel went stale or why a resume failed without needing kubectl. + */ + history = new EventRing(); constructor(opts) { super(); this.#relayUrl = opts.relayUrl; @@ -62,9 +69,11 @@ export class TunnelClient extends EventEmitter { } connect() { this.#intentionalDisconnect = false; + this.history.push('connect-start', { resume: false }); this.#doConnect(); } disconnect() { + this.history.push('disconnect-requested'); this.#intentionalDisconnect = true; this.#cleanup(); this.#state = 'disconnected'; @@ -100,10 +109,12 @@ export class TunnelClient extends EventEmitter { // Handle non-101 upgrade responses (e.g. 401 on resume token replay). ws.on('unexpected-response', (_req, res) => { this.#log(`Relay rejected upgrade: ${res.statusCode}`); + this.history.push('unexpected-response', { statusCode: res.statusCode }); if (res.statusCode === 401) { this.#resumeToken = undefined; this.#traceId = undefined; this.#intentionalDisconnect = true; + this.history.push('need-live-tunnel', { reason: '401 on upgrade' }); this.emit('need_live_tunnel'); } res.resume(); // drain so socket can be released @@ -112,6 +123,7 @@ export class TunnelClient extends EventEmitter { this.#state = 'connected'; this.#connectedSince = Date.now(); this.#log('WebSocket open, sending hello'); + this.history.push('ws-open'); const hello = { type: 'hello', protocol: 'yamux', @@ -126,6 +138,10 @@ export class TunnelClient extends EventEmitter { hello.connectionId = this.#initialConnectionId; } ws.send(JSON.stringify(hello)); + this.history.push('hello-sent', { + resume: !!this.#resumeToken, + hasAllowedOrigins: !!hello.allowedOrigins, + }); this.#resetStaleTimer(); }); // Listen for the ready message (always JSON, regardless of protocol). @@ -144,11 +160,13 @@ export class TunnelClient extends EventEmitter { // Token already revoked server-side; skip reconnect and request a // fresh live-tunnel instead. this.#log(`Relay handshake error: ${msg.message}`); + this.history.push('handshake-error', { message: msg.message }); this.#resumeToken = undefined; this.#traceId = undefined; ws.removeListener('message', handshakeHandler); this.#intentionalDisconnect = true; ws.close(); + this.history.push('need-live-tunnel', { reason: 'handshake error' }); this.emit('need_live_tunnel'); return; } @@ -170,6 +188,13 @@ export class TunnelClient extends EventEmitter { this.#state = 'ready'; this.#reconnectAttempts = 0; this.#log(`Tunnel ready: ${msg.tunnelId} (connection ${msg.connectionId})`); + this.history.push('ready', { + tunnelId: msg.tunnelId, + connectionId: msg.connectionId, + gotResumeToken: msg.resumeToken !== undefined, + gotTraceId: msg.traceId !== undefined, + protocol: msg.protocol, + }); // Create the transport based on negotiated protocol. Both transports // wire onActivity to the stale-timer reset: yamux server-initiated // pings alone are not sufficient for liveness, since a silently dropped @@ -184,6 +209,11 @@ export class TunnelClient extends EventEmitter { allowedOrigins: this.#allowedOrigins, onActivity: () => this.#resetStaleTimer(), pingIntervalMs: this.#yamuxPingIntervalMs, + // Diagnostic: record every successful ping enqueue. Lets callers + // verify the keepalive timer is actually firing during quiet idle + // windows; an absence of ping-sent events between connect and + // stale-fired is a strong signal of event-loop starvation. + onPingSent: () => this.history.push('ping-sent'), }); } else { @@ -199,17 +229,21 @@ export class TunnelClient extends EventEmitter { // Catch unexpected rejections so they don't become unhandled and kill // the process -- treat them the same as a connection drop. this.#transport.serve().catch((err) => { - this.#log(`Transport error: ${err instanceof Error ? err.message : String(err)}`); + const message = err instanceof Error ? err.message : String(err); + this.#log(`Transport error: ${message}`); + this.history.push('transport-error', { message }); this.#onDisconnect(); }); }; ws.on('message', handshakeHandler); ws.on('close', (code, reason) => { this.#log(`WebSocket closed: ${code} ${reason.toString()}`); + this.history.push('ws-close', { code, reason: reason.toString() }); this.#onDisconnect(); }); ws.on('error', (err) => { this.#log(`WebSocket error: ${err.message}`); + this.history.push('ws-error', { message: err.message }); // 'close' fires after 'error', so reconnect happens there }); this.#ws = ws; @@ -240,6 +274,11 @@ export class TunnelClient extends EventEmitter { const total = Math.round(delay + jitter); this.#reconnectAttempts++; this.#log(`Reconnecting in ${total}ms (attempt ${this.#reconnectAttempts})`); + this.history.push('reconnect-scheduled', { + delayMs: total, + attempt: this.#reconnectAttempts, + resume: !!this.#resumeToken, + }); this.#reconnectTimer = setTimeout(() => { this.#reconnectTimer = null; this.#doConnect(); @@ -250,6 +289,7 @@ export class TunnelClient extends EventEmitter { this.#clearStaleTimer(); this.#staleTimer = setTimeout(() => { this.#log('Connection stale, reconnecting'); + this.history.push('stale-fired', { timeoutMs: this.#staleTimeoutMs }); this.#ws?.close(); }, this.#staleTimeoutMs); } diff --git a/tunnel/build/src/history.js b/tunnel/build/src/history.js new file mode 100644 index 00000000..81e02662 --- /dev/null +++ b/tunnel/build/src/history.js @@ -0,0 +1,35 @@ +/** + * Per-client event ring for tunnel diagnostics. + * + * Captures the WS lifecycle a tunnel went through (connect, ws-open, ready, + * ping-sent, ws-close, stale-fired, reconnect-scheduled, need-live-tunnel, + * handshake-error, etc.) so callers — especially MCP agents that have no + * other way to inspect the tunnel client process — can answer questions like + * "did the stale timer fire?" or "did pings actually go out during the quiet + * period?" without kubectl access. + * + * Bounded ring (default 64). Events older than the cap are dropped; this is + * a debug aid, not an audit log. + */ +/** + * Fixed-capacity event ring. Push newest at the tail; oldest is dropped when + * full. `snapshot()` returns a chronologically-ordered copy safe to serialize. + */ +export class EventRing { + #cap; + #buf = []; + constructor(cap = 64) { + this.#cap = cap; + } + push(kind, detail) { + this.#buf.push({ ts: Date.now(), kind, detail }); + if (this.#buf.length > this.#cap) + this.#buf.shift(); + } + snapshot() { + return [...this.#buf]; + } + get length() { + return this.#buf.length; + } +} diff --git a/tunnel/build/src/main.js b/tunnel/build/src/main.js index 9fe1365d..dc380911 100644 --- a/tunnel/build/src/main.js +++ b/tunnel/build/src/main.js @@ -27,6 +27,18 @@ const log = (msg) => { }; // Multiple tunnels can be active simultaneously, keyed by tunnelId. const clients = new Map(); +const deadHistories = []; +const MAX_DEAD_HISTORIES = 4; +function recordDead(id, client, reason) { + deadHistories.push({ + tunnelId: id, + closedAt: Date.now(), + reason, + events: client.history.snapshot(), + }); + while (deadHistories.length > MAX_DEAD_HISTORIES) + deadHistories.shift(); +} const server = new McpServer({ name: "subtext_tunnel", version: VERSION, @@ -103,7 +115,12 @@ server.registerTool("tunnel-connect", { // Capture id now: tunnelId is cleared by #onDisconnect() before the // reconnect that triggers need_live_tunnel, so reading client.tunnelId // at event-fire time is always undefined → stale map entry. - client.once('need_live_tunnel', () => clients.delete(id)); + // Record the history before deleting so `tunnel-history` can show + // *why* the tunnel went away after the fact (the most useful case). + client.once('need_live_tunnel', () => { + recordDead(id, client, 'need_live_tunnel'); + clients.delete(id); + }); } if (needsLiveTunnel) { return { @@ -197,6 +214,76 @@ server.registerTool("tunnel-status", { ], }; }); +server.registerTool("tunnel-history", { + description: "Returns recent lifecycle events for active and recently-closed tunnels. " + + "Use this to diagnose why a tunnel went away or stopped working — events " + + "include connect/reconnect attempts, ws-open/close, ready, ping-sent, " + + "stale-fired, handshake-error, and need-live-tunnel. Each event has a " + + "timestamp (ms since epoch) and optional structured detail. Read events " + + "chronologically as a story: e.g. ws-open → ready → ping-sent ×N → " + + "stale-fired → ws-close → reconnect-scheduled → unexpected-response 401 → " + + "need-live-tunnel tells you the WS went idle, the stale timer fired, and " + + "the resume reconnect was rejected. The absence of ping-sent events " + + "during a long quiet window suggests the keepalive timer wasn't firing " + + "(e.g. event-loop starvation in this MCP child process).", + inputSchema: z.object({ + tunnelId: z + .string() + .optional() + .describe("Optional tunnelId to filter to. Omit to receive history for all " + + "currently-active tunnels plus the last few recently-dead ones."), + }), +}, async ({ tunnelId }) => { + const now = Date.now(); + const formatLive = (id, client) => ({ + tunnelId: id, + state: client.state, + traceId: client.traceId ?? null, + events: client.history.snapshot(), + }); + if (tunnelId) { + const live = clients.get(tunnelId); + if (live) { + return { + content: [ + { + type: "text", + text: JSON.stringify({ now, tunnel: formatLive(tunnelId, live) }, null, 2), + }, + ], + }; + } + const dead = deadHistories.find((d) => d.tunnelId === tunnelId); + if (dead) { + return { + content: [ + { + type: "text", + text: JSON.stringify({ now, dead }, null, 2), + }, + ], + }; + } + return { + content: [ + { + type: "text", + text: JSON.stringify({ error: `No tunnel (alive or recently-dead) with id ${tunnelId}` }, null, 2), + }, + ], + isError: true, + }; + } + const live = [...clients.entries()].map(([id, c]) => formatLive(id, c)); + return { + content: [ + { + type: "text", + text: JSON.stringify({ now, live, dead: deadHistories }, null, 2), + }, + ], + }; +}); const transport = new StdioServerTransport(); await server.connect(transport); log("MCP server started"); diff --git a/tunnel/build/src/transport_yamux.js b/tunnel/build/src/transport_yamux.js index 0b107c15..a38c1e41 100644 --- a/tunnel/build/src/transport_yamux.js +++ b/tunnel/build/src/transport_yamux.js @@ -26,6 +26,7 @@ export class YamuxTransport { this.#session = new YamuxSession(opts.ws, { onActivity: opts.onActivity, pingIntervalMs: opts.pingIntervalMs, + onPingSent: opts.onPingSent, }); this.#streaming = opts.streaming ?? false; this.#allowedOrigins = opts.allowedOrigins ?? []; diff --git a/tunnel/build/src/yamux.js b/tunnel/build/src/yamux.js index 626165b0..ee8aab6b 100644 --- a/tunnel/build/src/yamux.js +++ b/tunnel/build/src/yamux.js @@ -208,6 +208,7 @@ export class YamuxSession { #ws; #streams = new Map(); #onActivity; + #onPingSent; #acceptQueue = []; #acceptWaiters = []; #closed = false; @@ -218,6 +219,7 @@ export class YamuxSession { constructor(ws, opts = {}) { this.#ws = ws; this.#onActivity = opts.onActivity; + this.#onPingSent = opts.onPingSent; ws.on('message', (data) => { // Any WS message — yamux frame, ping ack, anything — counts as the // peer being alive. Reset the upstream stale timer before parsing so @@ -254,6 +256,10 @@ export class YamuxSession { const nonce = (this.#pingNonce = (this.#pingNonce + 1) >>> 0); try { this.#ws.send(makeHeader(TYPE_PING, FLAG_SYN, 0, nonce)); + // Notify the diagnostic hook only on successful enqueue so the absence + // of a 'ping-sent' event in the history reliably means "the timer + // fired but the send threw" rather than "the timer didn't fire." + this.#onPingSent?.(); } catch { // WS may have torn down between ticks; close handler will reconcile. diff --git a/tunnel/package.json b/tunnel/package.json index de5fa219..6795ff0d 100644 --- a/tunnel/package.json +++ b/tunnel/package.json @@ -1,6 +1,6 @@ { "name": "@fullstory/subtext-tunnel", - "version": "0.1.14", + "version": "0.1.15", "description": "MCP server for reverse tunneling to localhost — connects hosted Subtext browser tools to local dev servers.", "type": "module", "bin": { diff --git a/tunnel/scripts/probe.mjs b/tunnel/scripts/probe.mjs new file mode 100644 index 00000000..f8483ff1 --- /dev/null +++ b/tunnel/scripts/probe.mjs @@ -0,0 +1,259 @@ +#!/usr/bin/env node +/** + * Tunnel diagnostics probe. + * + * Mints a relayUrl via the live-tunnel MCP tool, then drives a + * TunnelClient directly (no MCP wrapper, no Remy) so we can watch the + * full WS lifecycle in one process. Use this to reproduce intermittent + * disconnect / reconnect failures against staging without round-tripping + * through the agent harness. + * + * By default the probe also spins up a tiny local HTTP server and tells + * lidar to point chromium at it via live-view-new. That makes the WS + * carry actual proxied traffic and starts the screencast — closer to + * what real Remy sessions look like, which seems to be required to + * trigger the 30s death (see SUBTEXT-344). Pass --no-view to skip and + * only exercise the bare WS lifecycle. + * + * Usage: + * SUBTEXT_API_KEY=... node scripts/probe.mjs \ + * [--mcp-url https://api.staging.fullstory.com/mcp/subtext] \ + * [--allow https://*.fullstory.test:8043] \ + * [--ping-ms 30000] \ + * [--snapshot-every 10000] \ + * [--no-view] + * + * Requires `npm run build` first so build/src/client.js is up to date. + * + * Picks the API key by inspecting --mcp-url: + * *.staging.fullstory.com → $SUBTEXT_STAGING_API_KEY + * *.eu1.staging.fullstory.com → $SUBTEXT_EU1_STAGING_API_KEY + * api.onfire.fyi → $SUBTEXT_PLAYPEN_API_KEY + * anything else → $SUBTEXT_API_KEY + * Override the auto-pick with --api-key-env VAR if needed. + */ + +import {TunnelClient} from '../build/src/client.js'; + +// ----- args ----- + +function arg(name, fallback) { + const i = process.argv.indexOf(name); + if (i > 0 && i + 1 < process.argv.length) return process.argv[i + 1]; + return fallback; +} +function flag(name) { + return process.argv.includes(name); +} + +const mcpUrl = arg('--mcp-url', 'https://api.staging.fullstory.com/mcp/subtext'); +const allowedOrigins = (arg('--allow', 'https://*.fullstory.test:8043')) + .split(',') + .map(s => s.trim()) + .filter(Boolean); +const pingIntervalMs = Number(arg('--ping-ms', '30000')); +const snapshotEveryMs = Number(arg('--snapshot-every', '10000')); +const viewUrl = arg('--url', 'https://app.fullstory.test:8043/ui'); +const skipView = flag('--no-view'); + +function pickApiKeyEnv(url) { + const explicit = arg('--api-key-env', null); + if (explicit) return explicit; + if (url.includes('eu1.staging.fullstory.com')) return 'SUBTEXT_EU1_STAGING_API_KEY'; + if (url.includes('staging.fullstory.com')) return 'SUBTEXT_STAGING_API_KEY'; + if (url.includes('onfire.fyi')) return 'SUBTEXT_PLAYPEN_API_KEY'; + return 'SUBTEXT_API_KEY'; +} +const apiKeyEnv = pickApiKeyEnv(mcpUrl); +const apiKey = process.env[apiKeyEnv] || ''; +if (!apiKey) { + console.error(`error: set $${apiKeyEnv} (auto-picked from --mcp-url; override with --api-key-env)`); + process.exit(1); +} + +const ts = () => new Date().toISOString(); +const log = msg => console.error(`${ts()} ${msg}`); + +// ----- MCP call helper ----- +// +// Stateless one-shot: a single tools/call POST is enough. We skip the +// initialize handshake because: +// 1. It would create a server-side session that can only be used from +// the same pod, and our subsequent calls have no affinity hint to +// land on the same pod again. +// 2. live-tunnel and live-view-new don't need session state — they're +// both routed by connection_id (or for tunnel-first mint, an +// affinity-extractor minted UUID), so all calls for the same +// connection land on the same pod regardless of session id. +// +// Raw fetch instead of @modelcontextprotocol/sdk's HTTP transport because +// the SDK's requestInit.headers handling drops the Authorization header +// in some versions, which makes auth against staging fail confusingly. + +let mcpCallSeq = 1; +async function mcpCall(toolName, args) { + const res = await fetch(mcpUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json, text/event-stream', + 'Authorization': `Bearer ${apiKey}`, + }, + body: JSON.stringify({ + jsonrpc: '2.0', id: mcpCallSeq++, method: 'tools/call', + params: {name: toolName, arguments: args}, + }), + }); + if (!res.ok) { + const errText = await res.text().catch(() => ''); + throw new Error(`mcp: ${toolName} → ${res.status}: ${errText || res.statusText}`); + } + const body = await res.json(); + if (body.error) { + throw new Error(`mcp: ${toolName} returned error: ${JSON.stringify(body.error)}`); + } + // Tool responses come back as `result.content[0].text`. Tools registered + // with sessionDataPolicy=true wrap that text in + // as a prompt-injection-defense fence; pull out the JSON object that lives + // inside (live-tunnel returns JSON; live-view-new returns plain text with + // labelled lines and we keep the raw text for those cases). + const text = body?.result?.content?.[0]?.text; + if (typeof text !== 'string') { + throw new Error(`mcp: ${toolName}: unexpected response shape: ${JSON.stringify(body)}`); + } + return text; +} + +function extractJSON(text) { + const jsonStart = text.indexOf('{'); + const jsonEnd = text.lastIndexOf('}'); + if (jsonStart < 0 || jsonEnd < jsonStart) return null; + try { + return JSON.parse(text.slice(jsonStart, jsonEnd + 1)); + } catch { + return null; + } +} + +// ----- 1. mint a relayUrl via the live-tunnel MCP tool ----- + +log(`mcp: calling live-tunnel @ ${mcpUrl}`); +const tunnelText = await mcpCall('live-tunnel', {}).catch(err => { + console.error(err.message); + process.exit(1); +}); +const tunnelInfo = extractJSON(tunnelText); +if (!tunnelInfo?.relayUrl || !tunnelInfo?.connectionId) { + console.error('mcp: live-tunnel response has no relayUrl/connectionId:', tunnelText); + process.exit(1); +} +const {relayUrl, connectionId, traceId} = tunnelInfo; +log(`mcp: got relayUrl, connectionId=${connectionId}${traceId ? ` traceId=${traceId}` : ''}`); + +// ----- 2. drive a TunnelClient directly ----- + +const client = new TunnelClient({ + relayUrl, + connectionId, + allowedOrigins, + log: msg => log(`tunnel: ${msg}`), + yamuxPingIntervalMs: pingIntervalMs, +}); + +client.on('need_live_tunnel', () => { + log('!! need_live_tunnel emitted (resume token rejected — tunnel cannot recover)'); +}); + +log(`tunnel: connecting (allowedOrigins=${JSON.stringify(allowedOrigins)}, pingIntervalMs=${pingIntervalMs})`); +client.connect(); + +// ----- 3. attach chromium via live-view-new ----- +// +// The minimal probe (just a registered tunnel, no chromium) survives idle +// past the 30s mark on staging. The failing case in SUBTEXT-344 had a +// chromium browser + view + screencast attached. This step fires +// live-view-new so the WS carries actual proxied traffic and the +// screencast publisher comes online — matching the failing-case profile. +// +// live-view-new's URL path goes through Goto with a 30s timeout; we wait +// for the tunnel to be `ready` first so the chromium navigation can +// actually flow through. + +async function waitForReady(timeoutMs = 30000) { + const deadline = Date.now() + timeoutMs; + while (client.state !== 'ready') { + if (Date.now() > deadline) { + throw new Error(`tunnel did not reach 'ready' within ${timeoutMs}ms (state=${client.state})`); + } + await new Promise(r => setTimeout(r, 50)); + } +} + +if (!skipView) { + try { + await waitForReady(); + log(`mcp: calling live-view-new (url=${viewUrl})`); + const viewText = await mcpCall('live-view-new', { + connection_id: connectionId, + url: viewUrl, + }); + // live-view-new returns labelled-line plain text on success: view_id, + // current_view, trace_url, capture_status, plus a screenshot URL. + // Print the small fields, skip any URL longer than ~200 chars (the + // signed screenshot URL would dominate otherwise). + const lines = viewText.split('\n').filter(l => { + const trimmed = l.trim(); + if (!trimmed) return false; + if (trimmed.startsWith('<') || trimmed.startsWith(' process.exit(0), 250); +} +process.on('SIGINT', () => shutdown('SIGINT')); +process.on('SIGTERM', () => shutdown('SIGTERM')); + +// If need_live_tunnel fires we can't recover (the resume token is gone). Dump +// history and exit so the operator sees the failure mode immediately rather +// than after they Ctrl-C the script. Wait a beat so any trailing events make +// it into the snapshot before we tear down. +client.once('need_live_tunnel', () => { + setTimeout(() => shutdown('need_live_tunnel — tunnel cannot recover'), 1000); +}); diff --git a/tunnel/src/client.ts b/tunnel/src/client.ts index 788afb7c..1ee4ae4b 100644 --- a/tunnel/src/client.ts +++ b/tunnel/src/client.ts @@ -3,6 +3,7 @@ import type {IncomingMessage} from 'node:http'; import {WebSocket, type RawData} from './third_party/index.js'; import type {OriginPattern} from './allowlist.js'; import {parseOriginPatterns} from './allowlist.js'; +import {EventRing} from './history.js'; import type {HelloMessage, RelayMessage, TunnelState} from './types.js'; import { RECONNECT_BASE_MS, @@ -73,6 +74,13 @@ export class TunnelClient extends EventEmitter { #resumeToken: string | undefined; #traceId: string | undefined; + /** + * Per-client lifecycle event ring. Read-only from the outside; surfaced via + * the `tunnel-history` MCP tool so callers (esp. agents) can self-diagnose + * why a tunnel went stale or why a resume failed without needing kubectl. + */ + readonly history: EventRing = new EventRing(); + constructor(opts: TunnelClientOptions) { super(); this.#relayUrl = opts.relayUrl; @@ -107,10 +115,12 @@ export class TunnelClient extends EventEmitter { connect(): void { this.#intentionalDisconnect = false; + this.history.push('connect-start', {resume: false}); this.#doConnect(); } disconnect(): void { + this.history.push('disconnect-requested'); this.#intentionalDisconnect = true; this.#cleanup(); this.#state = 'disconnected'; @@ -151,10 +161,12 @@ export class TunnelClient extends EventEmitter { // Handle non-101 upgrade responses (e.g. 401 on resume token replay). ws.on('unexpected-response', (_req: unknown, res: IncomingMessage) => { this.#log(`Relay rejected upgrade: ${res.statusCode}`); + this.history.push('unexpected-response', {statusCode: res.statusCode}); if (res.statusCode === 401) { this.#resumeToken = undefined; this.#traceId = undefined; this.#intentionalDisconnect = true; + this.history.push('need-live-tunnel', {reason: '401 on upgrade'}); this.emit('need_live_tunnel'); } res.resume(); // drain so socket can be released @@ -164,6 +176,7 @@ export class TunnelClient extends EventEmitter { this.#state = 'connected'; this.#connectedSince = Date.now(); this.#log('WebSocket open, sending hello'); + this.history.push('ws-open'); const hello: HelloMessage = { type: 'hello', protocol: 'yamux', @@ -178,6 +191,10 @@ export class TunnelClient extends EventEmitter { hello.connectionId = this.#initialConnectionId; } ws.send(JSON.stringify(hello)); + this.history.push('hello-sent', { + resume: !!this.#resumeToken, + hasAllowedOrigins: !!hello.allowedOrigins, + }); this.#resetStaleTimer(); }); @@ -197,11 +214,13 @@ export class TunnelClient extends EventEmitter { // Token already revoked server-side; skip reconnect and request a // fresh live-tunnel instead. this.#log(`Relay handshake error: ${msg.message}`); + this.history.push('handshake-error', {message: msg.message}); this.#resumeToken = undefined; this.#traceId = undefined; ws.removeListener('message', handshakeHandler); this.#intentionalDisconnect = true; ws.close(); + this.history.push('need-live-tunnel', {reason: 'handshake error'}); this.emit('need_live_tunnel'); return; } @@ -223,6 +242,13 @@ export class TunnelClient extends EventEmitter { this.#state = 'ready'; this.#reconnectAttempts = 0; this.#log(`Tunnel ready: ${msg.tunnelId} (connection ${msg.connectionId})`); + this.history.push('ready', { + tunnelId: msg.tunnelId, + connectionId: msg.connectionId, + gotResumeToken: msg.resumeToken !== undefined, + gotTraceId: msg.traceId !== undefined, + protocol: msg.protocol, + }); // Create the transport based on negotiated protocol. Both transports // wire onActivity to the stale-timer reset: yamux server-initiated @@ -238,6 +264,11 @@ export class TunnelClient extends EventEmitter { allowedOrigins: this.#allowedOrigins, onActivity: () => this.#resetStaleTimer(), pingIntervalMs: this.#yamuxPingIntervalMs, + // Diagnostic: record every successful ping enqueue. Lets callers + // verify the keepalive timer is actually firing during quiet idle + // windows; an absence of ping-sent events between connect and + // stale-fired is a strong signal of event-loop starvation. + onPingSent: () => this.history.push('ping-sent'), }); } else { this.#transport = new LegacyTransport({ @@ -253,7 +284,9 @@ export class TunnelClient extends EventEmitter { // Catch unexpected rejections so they don't become unhandled and kill // the process -- treat them the same as a connection drop. this.#transport.serve().catch((err: unknown) => { - this.#log(`Transport error: ${err instanceof Error ? err.message : String(err)}`); + const message = err instanceof Error ? err.message : String(err); + this.#log(`Transport error: ${message}`); + this.history.push('transport-error', {message}); this.#onDisconnect(); }); }; @@ -261,11 +294,13 @@ export class TunnelClient extends EventEmitter { ws.on('close', (code: number, reason: Buffer) => { this.#log(`WebSocket closed: ${code} ${reason.toString()}`); + this.history.push('ws-close', {code, reason: reason.toString()}); this.#onDisconnect(); }); ws.on('error', (err: Error) => { this.#log(`WebSocket error: ${err.message}`); + this.history.push('ws-error', {message: err.message}); // 'close' fires after 'error', so reconnect happens there }); @@ -308,6 +343,11 @@ export class TunnelClient extends EventEmitter { const total = Math.round(delay + jitter); this.#reconnectAttempts++; this.#log(`Reconnecting in ${total}ms (attempt ${this.#reconnectAttempts})`); + this.history.push('reconnect-scheduled', { + delayMs: total, + attempt: this.#reconnectAttempts, + resume: !!this.#resumeToken, + }); this.#reconnectTimer = setTimeout(() => { this.#reconnectTimer = null; this.#doConnect(); @@ -320,6 +360,7 @@ export class TunnelClient extends EventEmitter { this.#clearStaleTimer(); this.#staleTimer = setTimeout(() => { this.#log('Connection stale, reconnecting'); + this.history.push('stale-fired', {timeoutMs: this.#staleTimeoutMs}); this.#ws?.close(); }, this.#staleTimeoutMs); } diff --git a/tunnel/src/history.ts b/tunnel/src/history.ts new file mode 100644 index 00000000..f6fa334c --- /dev/null +++ b/tunnel/src/history.ts @@ -0,0 +1,68 @@ +/** + * Per-client event ring for tunnel diagnostics. + * + * Captures the WS lifecycle a tunnel went through (connect, ws-open, ready, + * ping-sent, ws-close, stale-fired, reconnect-scheduled, need-live-tunnel, + * handshake-error, etc.) so callers — especially MCP agents that have no + * other way to inspect the tunnel client process — can answer questions like + * "did the stale timer fire?" or "did pings actually go out during the quiet + * period?" without kubectl access. + * + * Bounded ring (default 64). Events older than the cap are dropped; this is + * a debug aid, not an audit log. + */ + +export type TunnelEventKind = + // Client-driven lifecycle. + | 'connect-start' + | 'disconnect-requested' + | 'reconnect-scheduled' + // WebSocket transport. + | 'ws-open' + | 'hello-sent' + | 'ws-close' + | 'ws-error' + | 'unexpected-response' + // Handshake outcomes. + | 'ready' + | 'handshake-error' + // Liveness. + | 'ping-sent' + | 'stale-fired' + // Terminal signals out. + | 'transport-error' + | 'need-live-tunnel'; + +export interface TunnelEvent { + /** Wall-clock ms since epoch — easier to correlate with server logs than relative ts. */ + ts: number; + kind: TunnelEventKind; + /** Optional structured detail. Keep small — this lives in a bounded ring. */ + detail?: Record; +} + +/** + * Fixed-capacity event ring. Push newest at the tail; oldest is dropped when + * full. `snapshot()` returns a chronologically-ordered copy safe to serialize. + */ +export class EventRing { + readonly #cap: number; + readonly #buf: TunnelEvent[] = []; + + constructor(cap = 64) { + this.#cap = cap; + } + + push(kind: TunnelEventKind, detail?: Record): void { + this.#buf.push({ts: Date.now(), kind, detail}); + if (this.#buf.length > this.#cap) this.#buf.shift(); + } + + snapshot(): TunnelEvent[] { + return [...this.#buf]; + } + + get length(): number { + return this.#buf.length; + } +} diff --git a/tunnel/src/main.ts b/tunnel/src/main.ts index 67caaf84..fbda566a 100644 --- a/tunnel/src/main.ts +++ b/tunnel/src/main.ts @@ -11,6 +11,7 @@ import { hideBin, } from "./third_party/index.js"; import { TunnelClient } from "./client.js"; +import type { TunnelEvent } from "./history.js"; // Single source of truth: read version from package.json at runtime so it // can't drift from what npm publishes. From build/src/main.js, package.json @@ -43,6 +44,28 @@ const log = (msg: string) => { // Multiple tunnels can be active simultaneously, keyed by tunnelId. const clients = new Map(); +// History of recently-dead tunnels. Captured when a tunnel emits +// need_live_tunnel (terminal) or is intentionally disconnected, so +// `tunnel-history` can still surface why a tunnel went away after the +// `clients` map entry has been removed. Bounded — debug aid, not audit. +interface DeadTunnelHistory { + tunnelId: string; + closedAt: number; + reason: string; + events: TunnelEvent[]; +} +const deadHistories: DeadTunnelHistory[] = []; +const MAX_DEAD_HISTORIES = 4; +function recordDead(id: string, client: TunnelClient, reason: string): void { + deadHistories.push({ + tunnelId: id, + closedAt: Date.now(), + reason, + events: client.history.snapshot(), + }); + while (deadHistories.length > MAX_DEAD_HISTORIES) deadHistories.shift(); +} + const server = new McpServer( { name: "subtext_tunnel", @@ -139,7 +162,12 @@ server.registerTool( // Capture id now: tunnelId is cleared by #onDisconnect() before the // reconnect that triggers need_live_tunnel, so reading client.tunnelId // at event-fire time is always undefined → stale map entry. - client.once('need_live_tunnel', () => clients.delete(id)); + // Record the history before deleting so `tunnel-history` can show + // *why* the tunnel went away after the fact (the most useful case). + client.once('need_live_tunnel', () => { + recordDead(id, client, 'need_live_tunnel'); + clients.delete(id); + }); } if (needsLiveTunnel) { @@ -263,6 +291,96 @@ server.registerTool( }, ); +server.registerTool( + "tunnel-history", + { + description: + "Returns recent lifecycle events for active and recently-closed tunnels. " + + "Use this to diagnose why a tunnel went away or stopped working — events " + + "include connect/reconnect attempts, ws-open/close, ready, ping-sent, " + + "stale-fired, handshake-error, and need-live-tunnel. Each event has a " + + "timestamp (ms since epoch) and optional structured detail. Read events " + + "chronologically as a story: e.g. ws-open → ready → ping-sent ×N → " + + "stale-fired → ws-close → reconnect-scheduled → unexpected-response 401 → " + + "need-live-tunnel tells you the WS went idle, the stale timer fired, and " + + "the resume reconnect was rejected. The absence of ping-sent events " + + "during a long quiet window suggests the keepalive timer wasn't firing " + + "(e.g. event-loop starvation in this MCP child process).", + inputSchema: z.object({ + tunnelId: z + .string() + .optional() + .describe( + "Optional tunnelId to filter to. Omit to receive history for all " + + "currently-active tunnels plus the last few recently-dead ones.", + ), + }), + }, + async ({ tunnelId }) => { + const now = Date.now(); + const formatLive = (id: string, client: TunnelClient) => ({ + tunnelId: id, + state: client.state, + traceId: client.traceId ?? null, + events: client.history.snapshot(), + }); + if (tunnelId) { + const live = clients.get(tunnelId); + if (live) { + return { + content: [ + { + type: "text" as const, + text: JSON.stringify( + { now, tunnel: formatLive(tunnelId, live) }, + null, + 2, + ), + }, + ], + }; + } + const dead = deadHistories.find((d) => d.tunnelId === tunnelId); + if (dead) { + return { + content: [ + { + type: "text" as const, + text: JSON.stringify({ now, dead }, null, 2), + }, + ], + }; + } + return { + content: [ + { + type: "text" as const, + text: JSON.stringify( + { error: `No tunnel (alive or recently-dead) with id ${tunnelId}` }, + null, + 2, + ), + }, + ], + isError: true, + }; + } + const live = [...clients.entries()].map(([id, c]) => formatLive(id, c)); + return { + content: [ + { + type: "text" as const, + text: JSON.stringify( + { now, live, dead: deadHistories }, + null, + 2, + ), + }, + ], + }; + }, +); + const transport = new StdioServerTransport(); await server.connect(transport); log("MCP server started"); diff --git a/tunnel/src/transport.ts b/tunnel/src/transport.ts index 533784a9..5ee50c79 100644 --- a/tunnel/src/transport.ts +++ b/tunnel/src/transport.ts @@ -45,6 +45,12 @@ export interface TransportOptions { * WS alive through stateful intermediaries. Defaults to YAMUX_PING_INTERVAL_MS. */ pingIntervalMs?: number; + /** + * yamux-only: diagnostic hook called whenever a client-initiated PING is + * successfully enqueued on the WS. Used by the client to record an event + * in its history ring so callers can verify the keepalive timer is firing. + */ + onPingSent?: () => void; } /** Convert wire headers (Record) to fetch Headers. */ diff --git a/tunnel/src/transport_yamux.ts b/tunnel/src/transport_yamux.ts index a071cee0..6017abc3 100644 --- a/tunnel/src/transport_yamux.ts +++ b/tunnel/src/transport_yamux.ts @@ -38,6 +38,7 @@ export class YamuxTransport implements TunnelTransport { this.#session = new YamuxSession(opts.ws, { onActivity: opts.onActivity, pingIntervalMs: opts.pingIntervalMs, + onPingSent: opts.onPingSent, }); this.#streaming = opts.streaming ?? false; this.#allowedOrigins = opts.allowedOrigins ?? []; diff --git a/tunnel/src/yamux.ts b/tunnel/src/yamux.ts index d6962b4f..4d74ffb3 100644 --- a/tunnel/src/yamux.ts +++ b/tunnel/src/yamux.ts @@ -246,12 +246,20 @@ export interface YamuxSessionOptions { * concrete value (typically YAMUX_PING_INTERVAL_MS). */ pingIntervalMs?: number; + /** + * Called every time a client-initiated PING frame is enqueued on the WS. + * Diagnostic hook — TunnelClient records these in its event ring so callers + * can verify the keepalive timer is actually firing during quiet periods. + * The callback is fire-and-forget; failures are swallowed. + */ + onPingSent?: () => void; } export class YamuxSession { readonly #ws: WSAdapter; readonly #streams = new Map(); readonly #onActivity: (() => void) | undefined; + readonly #onPingSent: (() => void) | undefined; #acceptQueue: YamuxStream[] = []; #acceptWaiters: Array<(stream: YamuxStream | null) => void> = []; @@ -265,6 +273,7 @@ export class YamuxSession { constructor(ws: WSAdapter, opts: YamuxSessionOptions = {}) { this.#ws = ws; this.#onActivity = opts.onActivity; + this.#onPingSent = opts.onPingSent; ws.on('message', (data: RawData) => { // Any WS message — yamux frame, ping ack, anything — counts as the // peer being alive. Reset the upstream stale timer before parsing so @@ -302,6 +311,10 @@ export class YamuxSession { const nonce = (this.#pingNonce = (this.#pingNonce + 1) >>> 0); try { this.#ws.send(makeHeader(TYPE_PING, FLAG_SYN, 0, nonce)); + // Notify the diagnostic hook only on successful enqueue so the absence + // of a 'ping-sent' event in the history reliably means "the timer + // fired but the send threw" rather than "the timer didn't fire." + this.#onPingSent?.(); } catch { // WS may have torn down between ticks; close handler will reconcile. } diff --git a/tunnel/tests/client.test.ts b/tunnel/tests/client.test.ts index 67b6f5ef..27c6897a 100644 --- a/tunnel/tests/client.test.ts +++ b/tunnel/tests/client.test.ts @@ -747,4 +747,45 @@ describe('TunnelClient', () => { client.disconnect(); }); + + it('history ring records connect, ws-open, hello-sent, ready, ws-close', async () => { + // Smoke test for the diagnostic ring used by the tunnel-history MCP tool. + // We verify the kinds appear in the expected chronological order on a + // single connect/ready/disconnect cycle. Detail payloads are intentionally + // not asserted in detail — they're for human reading and may evolve. + const client = createClient(); + const connPromise = new Promise(resolve => + wss.once('connection', resolve), + ); + client.connect(); + const ws = await connPromise; + await nextMessage(ws); + ws.send(JSON.stringify({ + type: 'ready', tunnelId: 't_hist', connectionId: 'cid', resumeToken: 'r', + })); + await waitFor(() => client.state === 'ready'); + + // Snapshot before disconnect so we don't race the close event. + const events = client.history.snapshot(); + const kinds = events.map(e => e.kind); + assert.deepEqual( + kinds.slice(0, 4), + ['connect-start', 'ws-open', 'hello-sent', 'ready'], + `unexpected event order: ${kinds.join(', ')}`, + ); + // Timestamps must be monotonically non-decreasing. + for (let i = 1; i < events.length; i++) { + assert.ok(events[i].ts >= events[i - 1].ts, 'event timestamps decreased'); + } + // Detail spot-check: the ready event carries the negotiated identifiers. + const ready = events.find(e => e.kind === 'ready'); + assert.ok(ready, 'expected a ready event'); + assert.equal((ready!.detail as Record)?.tunnelId, 't_hist'); + assert.equal( + (ready!.detail as Record)?.gotResumeToken, + true, + ); + + client.disconnect(); + }); });