Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion tunnel/build/src/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'node:events';
import { WebSocket } from './third_party/index.js';
import { parseOriginPatterns } from './allowlist.js';
import { EventRing } from './history.js';
import { RECONNECT_BASE_MS, RECONNECT_MAX_MS, RESUME_SUBPROTOCOL_PREFIX, STALE_CONNECTION_MS, YAMUX_PING_INTERVAL_MS, } from './types.js';
import { LegacyTransport } from './transport_legacy.js';
import { YamuxTransport } from './transport_yamux.js';
Expand Down Expand Up @@ -33,6 +34,12 @@ export class TunnelClient extends EventEmitter {
#intentionalDisconnect = false;
#resumeToken;
#traceId;
/**
* Per-client lifecycle event ring. Read-only from the outside; surfaced via
* the `tunnel-history` MCP tool so callers (esp. agents) can self-diagnose
* why a tunnel went stale or why a resume failed without needing kubectl.
*/
history = new EventRing();
constructor(opts) {
super();
this.#relayUrl = opts.relayUrl;
Expand Down Expand Up @@ -62,9 +69,11 @@ export class TunnelClient extends EventEmitter {
}
connect() {
this.#intentionalDisconnect = false;
this.history.push('connect-start', { resume: false });
this.#doConnect();
}
disconnect() {
this.history.push('disconnect-requested');
this.#intentionalDisconnect = true;
this.#cleanup();
this.#state = 'disconnected';
Expand Down Expand Up @@ -100,10 +109,12 @@ export class TunnelClient extends EventEmitter {
// Handle non-101 upgrade responses (e.g. 401 on resume token replay).
ws.on('unexpected-response', (_req, res) => {
this.#log(`Relay rejected upgrade: ${res.statusCode}`);
this.history.push('unexpected-response', { statusCode: res.statusCode });
if (res.statusCode === 401) {
this.#resumeToken = undefined;
this.#traceId = undefined;
this.#intentionalDisconnect = true;
this.history.push('need-live-tunnel', { reason: '401 on upgrade' });
this.emit('need_live_tunnel');
}
res.resume(); // drain so socket can be released
Expand All @@ -112,6 +123,7 @@ export class TunnelClient extends EventEmitter {
this.#state = 'connected';
this.#connectedSince = Date.now();
this.#log('WebSocket open, sending hello');
this.history.push('ws-open');
const hello = {
type: 'hello',
protocol: 'yamux',
Expand All @@ -126,6 +138,10 @@ export class TunnelClient extends EventEmitter {
hello.connectionId = this.#initialConnectionId;
}
ws.send(JSON.stringify(hello));
this.history.push('hello-sent', {
resume: !!this.#resumeToken,
hasAllowedOrigins: !!hello.allowedOrigins,
});
this.#resetStaleTimer();
});
// Listen for the ready message (always JSON, regardless of protocol).
Expand All @@ -144,11 +160,13 @@ export class TunnelClient extends EventEmitter {
// Token already revoked server-side; skip reconnect and request a
// fresh live-tunnel instead.
this.#log(`Relay handshake error: ${msg.message}`);
this.history.push('handshake-error', { message: msg.message });
this.#resumeToken = undefined;
this.#traceId = undefined;
ws.removeListener('message', handshakeHandler);
this.#intentionalDisconnect = true;
ws.close();
this.history.push('need-live-tunnel', { reason: 'handshake error' });
this.emit('need_live_tunnel');
return;
}
Expand All @@ -170,6 +188,13 @@ export class TunnelClient extends EventEmitter {
this.#state = 'ready';
this.#reconnectAttempts = 0;
this.#log(`Tunnel ready: ${msg.tunnelId} (connection ${msg.connectionId})`);
this.history.push('ready', {
tunnelId: msg.tunnelId,
connectionId: msg.connectionId,
gotResumeToken: msg.resumeToken !== undefined,
gotTraceId: msg.traceId !== undefined,
protocol: msg.protocol,
});
// Create the transport based on negotiated protocol. Both transports
// wire onActivity to the stale-timer reset: yamux server-initiated
// pings alone are not sufficient for liveness, since a silently dropped
Expand All @@ -184,6 +209,11 @@ export class TunnelClient extends EventEmitter {
allowedOrigins: this.#allowedOrigins,
onActivity: () => this.#resetStaleTimer(),
pingIntervalMs: this.#yamuxPingIntervalMs,
// Diagnostic: record every successful ping enqueue. Lets callers
// verify the keepalive timer is actually firing during quiet idle
// windows; an absence of ping-sent events between connect and
// stale-fired is a strong signal of event-loop starvation.
onPingSent: () => this.history.push('ping-sent'),
});
}
else {
Expand All @@ -199,17 +229,21 @@ export class TunnelClient extends EventEmitter {
// Catch unexpected rejections so they don't become unhandled and kill
// the process -- treat them the same as a connection drop.
this.#transport.serve().catch((err) => {
this.#log(`Transport error: ${err instanceof Error ? err.message : String(err)}`);
const message = err instanceof Error ? err.message : String(err);
this.#log(`Transport error: ${message}`);
this.history.push('transport-error', { message });
this.#onDisconnect();
});
};
ws.on('message', handshakeHandler);
ws.on('close', (code, reason) => {
this.#log(`WebSocket closed: ${code} ${reason.toString()}`);
this.history.push('ws-close', { code, reason: reason.toString() });
this.#onDisconnect();
});
ws.on('error', (err) => {
this.#log(`WebSocket error: ${err.message}`);
this.history.push('ws-error', { message: err.message });
// 'close' fires after 'error', so reconnect happens there
});
this.#ws = ws;
Expand Down Expand Up @@ -240,6 +274,11 @@ export class TunnelClient extends EventEmitter {
const total = Math.round(delay + jitter);
this.#reconnectAttempts++;
this.#log(`Reconnecting in ${total}ms (attempt ${this.#reconnectAttempts})`);
this.history.push('reconnect-scheduled', {
delayMs: total,
attempt: this.#reconnectAttempts,
resume: !!this.#resumeToken,
});
this.#reconnectTimer = setTimeout(() => {
this.#reconnectTimer = null;
this.#doConnect();
Expand All @@ -250,6 +289,7 @@ export class TunnelClient extends EventEmitter {
this.#clearStaleTimer();
this.#staleTimer = setTimeout(() => {
this.#log('Connection stale, reconnecting');
this.history.push('stale-fired', { timeoutMs: this.#staleTimeoutMs });
this.#ws?.close();
}, this.#staleTimeoutMs);
}
Expand Down
35 changes: 35 additions & 0 deletions tunnel/build/src/history.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Per-client event ring for tunnel diagnostics.
*
* Captures the WS lifecycle a tunnel went through (connect, ws-open, ready,
* ping-sent, ws-close, stale-fired, reconnect-scheduled, need-live-tunnel,
* handshake-error, etc.) so callers — especially MCP agents that have no
* other way to inspect the tunnel client process — can answer questions like
* "did the stale timer fire?" or "did pings actually go out during the quiet
* period?" without kubectl access.
*
* Bounded ring (default 64). Events older than the cap are dropped; this is
* a debug aid, not an audit log.
*/
/**
* Fixed-capacity event ring. Push newest at the tail; oldest is dropped when
* full. `snapshot()` returns a chronologically-ordered copy safe to serialize.
*/
export class EventRing {
#cap;
#buf = [];
constructor(cap = 64) {
this.#cap = cap;
}
push(kind, detail) {
this.#buf.push({ ts: Date.now(), kind, detail });
if (this.#buf.length > this.#cap)
this.#buf.shift();
}
snapshot() {
return [...this.#buf];
}
get length() {
return this.#buf.length;
}
}
89 changes: 88 additions & 1 deletion tunnel/build/src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ const log = (msg) => {
};
// Multiple tunnels can be active simultaneously, keyed by tunnelId.
const clients = new Map();
const deadHistories = [];
const MAX_DEAD_HISTORIES = 4;
function recordDead(id, client, reason) {
deadHistories.push({
tunnelId: id,
closedAt: Date.now(),
reason,
events: client.history.snapshot(),
});
while (deadHistories.length > MAX_DEAD_HISTORIES)
deadHistories.shift();
}
const server = new McpServer({
name: "subtext_tunnel",
version: VERSION,
Expand Down Expand Up @@ -103,7 +115,12 @@ server.registerTool("tunnel-connect", {
// Capture id now: tunnelId is cleared by #onDisconnect() before the
// reconnect that triggers need_live_tunnel, so reading client.tunnelId
// at event-fire time is always undefined → stale map entry.
client.once('need_live_tunnel', () => clients.delete(id));
// Record the history before deleting so `tunnel-history` can show
// *why* the tunnel went away after the fact (the most useful case).
client.once('need_live_tunnel', () => {
recordDead(id, client, 'need_live_tunnel');
clients.delete(id);
});
}
if (needsLiveTunnel) {
return {
Expand Down Expand Up @@ -197,6 +214,76 @@ server.registerTool("tunnel-status", {
],
};
});
server.registerTool("tunnel-history", {
description: "Returns recent lifecycle events for active and recently-closed tunnels. " +
"Use this to diagnose why a tunnel went away or stopped working — events " +
"include connect/reconnect attempts, ws-open/close, ready, ping-sent, " +
"stale-fired, handshake-error, and need-live-tunnel. Each event has a " +
"timestamp (ms since epoch) and optional structured detail. Read events " +
"chronologically as a story: e.g. ws-open → ready → ping-sent ×N → " +
"stale-fired → ws-close → reconnect-scheduled → unexpected-response 401 → " +
"need-live-tunnel tells you the WS went idle, the stale timer fired, and " +
"the resume reconnect was rejected. The absence of ping-sent events " +
"during a long quiet window suggests the keepalive timer wasn't firing " +
"(e.g. event-loop starvation in this MCP child process).",
inputSchema: z.object({
tunnelId: z
.string()
.optional()
.describe("Optional tunnelId to filter to. Omit to receive history for all " +
"currently-active tunnels plus the last few recently-dead ones."),
}),
}, async ({ tunnelId }) => {
const now = Date.now();
const formatLive = (id, client) => ({
tunnelId: id,
state: client.state,
traceId: client.traceId ?? null,
events: client.history.snapshot(),
});
if (tunnelId) {
const live = clients.get(tunnelId);
if (live) {
return {
content: [
{
type: "text",
text: JSON.stringify({ now, tunnel: formatLive(tunnelId, live) }, null, 2),
},
],
};
}
const dead = deadHistories.find((d) => d.tunnelId === tunnelId);
if (dead) {
return {
content: [
{
type: "text",
text: JSON.stringify({ now, dead }, null, 2),
},
],
};
}
return {
content: [
{
type: "text",
text: JSON.stringify({ error: `No tunnel (alive or recently-dead) with id ${tunnelId}` }, null, 2),
},
],
isError: true,
};
}
const live = [...clients.entries()].map(([id, c]) => formatLive(id, c));
return {
content: [
{
type: "text",
text: JSON.stringify({ now, live, dead: deadHistories }, null, 2),
},
],
};
});
const transport = new StdioServerTransport();
await server.connect(transport);
log("MCP server started");
Expand Down
1 change: 1 addition & 0 deletions tunnel/build/src/transport_yamux.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class YamuxTransport {
this.#session = new YamuxSession(opts.ws, {
onActivity: opts.onActivity,
pingIntervalMs: opts.pingIntervalMs,
onPingSent: opts.onPingSent,
});
this.#streaming = opts.streaming ?? false;
this.#allowedOrigins = opts.allowedOrigins ?? [];
Expand Down
6 changes: 6 additions & 0 deletions tunnel/build/src/yamux.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export class YamuxSession {
#ws;
#streams = new Map();
#onActivity;
#onPingSent;
#acceptQueue = [];
#acceptWaiters = [];
#closed = false;
Expand All @@ -218,6 +219,7 @@ export class YamuxSession {
constructor(ws, opts = {}) {
this.#ws = ws;
this.#onActivity = opts.onActivity;
this.#onPingSent = opts.onPingSent;
ws.on('message', (data) => {
// Any WS message — yamux frame, ping ack, anything — counts as the
// peer being alive. Reset the upstream stale timer before parsing so
Expand Down Expand Up @@ -254,6 +256,10 @@ export class YamuxSession {
const nonce = (this.#pingNonce = (this.#pingNonce + 1) >>> 0);
try {
this.#ws.send(makeHeader(TYPE_PING, FLAG_SYN, 0, nonce));
// Notify the diagnostic hook only on successful enqueue so the absence
// of a 'ping-sent' event in the history reliably means "the timer
// fired but the send threw" rather than "the timer didn't fire."
this.#onPingSent?.();
}
catch {
// WS may have torn down between ticks; close handler will reconcile.
Expand Down
2 changes: 1 addition & 1 deletion tunnel/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fullstory/subtext-tunnel",
"version": "0.1.14",
"version": "0.1.15",
"description": "MCP server for reverse tunneling to localhost — connects hosted Subtext browser tools to local dev servers.",
"type": "module",
"bin": {
Expand Down
Loading