diff --git a/.github/workflows/pkg-pr-new.yml b/.github/workflows/pkg-pr-new.yml index dbe1d35..ada908e 100644 --- a/.github/workflows/pkg-pr-new.yml +++ b/.github/workflows/pkg-pr-new.yml @@ -15,4 +15,4 @@ jobs: - run: corepack enable - run: pnpm install --frozen-lockfile - run: pnpm run build - - run: pnpm dlx pkg-pr-new publish --pnpm + - run: pnpm dlx pkg-pr-new publish --pnpm --bin diff --git a/src/cli/bin.ts b/src/cli/bin.ts index 4450353..adff41d 100755 --- a/src/cli/bin.ts +++ b/src/cli/bin.ts @@ -1,4 +1,5 @@ #!/usr/bin/env node +import { realpathSync } from "node:fs"; import { mkdir, readFile, writeFile } from "node:fs/promises"; import { createServer, type Server } from "node:http"; import { homedir } from "node:os"; @@ -12,7 +13,7 @@ import { createCli, yamlTableConsoleLogger } from "trpc-cli"; import { z } from "zod/v4"; import { color } from "./ansi.js"; import { CliFriendlyError } from "./cli-error.js"; -import { createCaptunTunnel } from "../index.js"; +import { CaptunTunnelConnectError, createCaptunTunnel, randomOwnershipToken } from "../index.js"; import { assertLocalTargetAcceptingConnections } from "./local-target.js"; import { withSpinner } from "./spinner.js"; import { @@ -59,6 +60,7 @@ export type CaptunCliRouterOptions = { writeConfig?: (config: Config) => Promise; waitForShutdown?: () => Promise; onTunnelReady?: (ready: TunnelReady) => void | Promise; + tunnelRetries?: number; }; const adjectives = @@ -110,6 +112,7 @@ export function createCaptunCliRouter(options: CaptunCliRouterOptions = {}) { const tunnel = resolveTunnel(input, config); printTunnelOpening(tunnel); await runTunnelSession(tunnel, { + retries: options.tunnelRetries, waitForShutdown: options.waitForShutdown, onReady: options.onTunnelReady, }); @@ -505,6 +508,7 @@ async function connectTunnelWithRetry( ) { const url = `${tunnel.tunnel}/__captun-connect`; const headers = tunnel.secret ? { authorization: `Bearer ${tunnel.secret}` } : undefined; + const ownerToken = randomOwnershipToken(); const fetcher = makeTunnelFetcher(tunnel, advertisedUrl); const maxAttempts = retries + 1; @@ -515,7 +519,9 @@ async function connectTunnelWithRetry( ? `Connecting to ${tunnel.tunnel}` : `Connecting to ${tunnel.tunnel} (retry ${attempt - 1}/${retries})`; try { - return await withSpinner(label, () => createCaptunTunnel({ url, headers, fetch: fetcher })); + return await withSpinner(label, () => + createCaptunTunnel({ url, headers, ownerToken, fetch: fetcher }), + ); } catch (error) { if (attempt === maxAttempts) { throw tunnelConnectError(tunnel, error); @@ -570,6 +576,14 @@ function tunnelConnectError(tunnel: ResolvedTunnel, cause: unknown) { const hostname = new URL(tunnel.tunnel).hostname; const message = cause instanceof Error ? cause.message : String(cause); const lines = [`Could not connect tunnel to ${color.cyan(tunnel.tunnel)} (${message}).`]; + if (isActiveTunnelConflict(cause)) { + lines.push( + ``, + `The tunnel name appears to be in use by another active anonymous client.`, + `Pick a different ${color.cyan("--name")} or stop the existing tunnel and retry.`, + ); + return new CliFriendlyError(lines.join("\n")); + } if (!hostname.endsWith(".workers.dev")) { // Dropping the leftmost label gives the zone-side wildcard parent — // `tunnel.mispwoso.com` -> `mispwoso.com`, `t.captun.example.com` -> `captun.example.com`. @@ -588,6 +602,18 @@ function tunnelConnectError(tunnel: ResolvedTunnel, cause: unknown) { return new CliFriendlyError(lines.join("\n")); } +function isActiveTunnelConflict(cause: unknown) { + if (cause instanceof CaptunTunnelConnectError && cause.response) { + return cause.response.status === 409 && isActiveTunnelConflictMessage(cause.response.body); + } + const message = cause instanceof Error ? cause.message : String(cause); + return isActiveTunnelConflictMessage(message); +} + +function isActiveTunnelConflictMessage(message: string) { + return /tunnel name is already connected|tunnel name .*in use/i.test(message); +} + function sleep(ms: number) { return new Promise((resolveSleep) => setTimeout(resolveSleep, ms)); } @@ -648,5 +674,10 @@ if (isMainModule()) { function isMainModule() { const entry = process.argv[1]; - return Boolean(entry && resolve(entry) === fileURLToPath(import.meta.url)); + if (!entry) return false; + try { + return realpathSync(entry) === fileURLToPath(import.meta.url); + } catch { + return resolve(entry) === fileURLToPath(import.meta.url); + } } diff --git a/src/hosted-admission.ts b/src/hosted-admission.ts new file mode 100644 index 0000000..cf1341c --- /dev/null +++ b/src/hosted-admission.ts @@ -0,0 +1,74 @@ +import { + HOSTED_CAPTUN_HOSTNAME, + TUNNEL_OWNER_TOKEN_HEADER, + TUNNEL_OWNER_TOKEN_QUERY_PARAM, +} from "./routing.js"; + +export type HostedAdmissionEnv = { + CAPTUN_SECRET?: string; + CUSTOM_HOSTNAME?: string; +}; + +export type TunnelAdmission = + | { ok: true; ownerToken: string | undefined } + | { ok: false; response: Response }; + +export function decideTunnelAdmission(input: { + request: Request; + env: HostedAdmissionEnv; + activeOwnerToken: string | undefined; +}): TunnelAdmission { + const expected = input.env.CAPTUN_SECRET ? `Bearer ${input.env.CAPTUN_SECRET}` : undefined; + if (expected && !constantTimeEqual(input.request.headers.get("authorization") || "", expected)) { + return { ok: false, response: new Response("Unauthorized\n", { status: 401 }) }; + } + + const ownerToken = hostedAnonymousOwnerToken(input.request, input.env); + if (ownerToken instanceof Response) return { ok: false, response: ownerToken }; + + if (ownerToken !== undefined && input.activeOwnerToken && input.activeOwnerToken !== ownerToken) { + return { ok: false, response: reject("Tunnel name is already connected\n", 409) }; + } + + return { ok: true, ownerToken }; +} + +function hostedAnonymousOwnerToken( + request: Request, + env: HostedAdmissionEnv, +): string | Response | undefined { + if (env.CUSTOM_HOSTNAME !== HOSTED_CAPTUN_HOSTNAME) return undefined; + if (env.CAPTUN_SECRET) return undefined; + + const token = + request.headers.get(TUNNEL_OWNER_TOKEN_HEADER) || + new URL(request.url).searchParams.get(TUNNEL_OWNER_TOKEN_QUERY_PARAM) || + ""; + if (!token) return reject("Missing tunnel ownership token\n", 400); + if (!/^[a-zA-Z0-9._~-]{1,128}$/.test(token)) { + return reject("Invalid tunnel ownership token\n", 400); + } + + return token; +} + +function reject(body: string, status: number) { + return new Response(body, { + status, + headers: { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }, + }); +} + +function constantTimeEqual(actual: string, expected: string) { + const actualBytes = new TextEncoder().encode(actual); + const expectedBytes = new TextEncoder().encode(expected); + let diff = actualBytes.length ^ expectedBytes.length; + const length = Math.max(actualBytes.length, expectedBytes.length); + for (let index = 0; index < length; index++) { + diff |= (actualBytes[index] || 0) ^ (expectedBytes[index] || 0); + } + return diff === 0; +} diff --git a/src/index.ts b/src/index.ts index d376754..b8162f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,11 @@ import { newWebSocketRpcSession, RpcTarget } from "capnweb"; -import { getTunnelUrlFromServerUrl, HOSTED_CAPTUN_SERVER_URL } from "./routing.js"; +import { + getTunnelUrlFromServerUrl, + HOSTED_CAPTUN_SERVER_URL, + TUNNEL_CONNECT_DIAGNOSTIC_HEADER, + TUNNEL_OWNER_TOKEN_HEADER, + TUNNEL_OWNER_TOKEN_QUERY_PARAM, +} from "./routing.js"; /** Fetch is all you need! * @@ -28,26 +34,54 @@ export interface Fetcher { */ export type CaptunTunnel = Disposable & { url: string; + ownerToken: string; }; +export class CaptunTunnelConnectError extends Error { + response: { status: number; statusText: string; body: string } | undefined; + + constructor( + message: string, + response: { status: number; statusText: string; body: string } | undefined, + ) { + super(message); + this.name = "CaptunTunnelConnectError"; + this.response = response; + } +} + +const WEBSOCKET_REJECTION_PROBE_TIMEOUT_MS = 500; + export async function createCaptunTunnel( options: Fetcher & { url?: string | URL; serverUrl?: string; name?: string; headers?: Record; + ownerToken?: string; }, ): Promise { const endpoint = resolveTunnelEndpoint(options); - const socket = createWebSocket({ url: endpoint.connectUrl, headers: options.headers }); + const ownership = withAnonymousOwnershipToken({ + connectUrl: endpoint.connectUrl, + headers: options.headers, + ownerToken: options.ownerToken, + }); + const socket = createWebSocket({ url: ownership.connectUrl, headers: options.headers }); // tunnelTargetFetcher is the "main object" that comes out on the other side in acceptCaptunTunnel // as a capnweb rpc stub that the server can just call fetch on const tunnelTargetFetcher = new TunnelTargetFetcher({ fetch: options.fetch }); const session = newWebSocketRpcSession(socket, tunnelTargetFetcher); - await waitUntilOpen(socket); + try { + await waitUntilOpen(socket, { connectUrl: ownership.connectUrl, headers: options.headers }); + } catch (error) { + session[Symbol.dispose](); + throw error; + } return { url: endpoint.publicUrl, + ownerToken: ownership.ownerToken, [Symbol.dispose]: () => session[Symbol.dispose](), }; } @@ -71,15 +105,47 @@ function resolveTunnelEndpoint(options: { function publicUrlFromConnectUrl(connectUrl: URL) { const publicUrl = new URL(connectUrl); publicUrl.pathname = publicUrl.pathname.replace(/\/__captun-connect\/?$/, "") || "/"; + publicUrl.search = ""; + publicUrl.hash = ""; return publicUrl.toString().replace(/\/$/, ""); } +function withAnonymousOwnershipToken(options: { + connectUrl: string; + headers: Record | undefined; + ownerToken: string | undefined; +}) { + const headerToken = getHeader(options.headers, TUNNEL_OWNER_TOKEN_HEADER); + if (headerToken) return { connectUrl: options.connectUrl, ownerToken: headerToken }; + + const connectUrl = new URL(options.connectUrl); + const ownerToken = + options.ownerToken || + connectUrl.searchParams.get(TUNNEL_OWNER_TOKEN_QUERY_PARAM) || + randomOwnershipToken(); + connectUrl.searchParams.set(TUNNEL_OWNER_TOKEN_QUERY_PARAM, ownerToken); + return { connectUrl: connectUrl.toString(), ownerToken }; +} + +function getHeader(headers: Record | undefined, name: string) { + if (!headers) return undefined; + const lowerName = name.toLowerCase(); + const key = Object.keys(headers).find((candidate) => candidate.toLowerCase() === lowerName); + return key ? headers[key] : undefined; +} + function randomTunnelName() { const bytes = new Uint8Array(8); crypto.getRandomValues(bytes); return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); } +export function randomOwnershipToken() { + const bytes = new Uint8Array(16); + crypto.getRandomValues(bytes); + return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); +} + class TunnelTargetFetcher extends RpcTarget implements Fetcher { private fetcher: Fetcher; @@ -109,7 +175,10 @@ function createWebSocket(options: { url: string | URL; headers?: Record | undefined }, +) { if (socket.readyState === WebSocket.OPEN) return; if (socket.readyState !== WebSocket.CONNECTING) { throw new Error("WebSocket closed before opening"); @@ -124,7 +193,10 @@ async function waitUntilOpen(socket: WebSocket) { socket.addEventListener("open", () => settle(resolve), { signal: listeners.signal }); socket.addEventListener( "error", - () => settle(() => reject(new Error("WebSocket connection failed"))), + () => + settle(() => { + void webSocketConnectionFailedError(options).then(reject); + }), { signal: listeners.signal }, ); socket.addEventListener( @@ -138,6 +210,49 @@ async function waitUntilOpen(socket: WebSocket) { }); } +async function webSocketConnectionFailedError(options: { + connectUrl: string; + headers: Record | undefined; +}) { + const response = await readWebSocketRejection(options); + if (!response) return new CaptunTunnelConnectError("WebSocket connection failed", undefined); + return new CaptunTunnelConnectError( + `WebSocket connection failed: ${response.status} ${response.statusText}: ${response.body}`.trim(), + response, + ); +} + +async function readWebSocketRejection(options: { + connectUrl: string; + headers: Record | undefined; +}) { + const abort = new AbortController(); + const timeout = setTimeout(() => abort.abort(), WEBSOCKET_REJECTION_PROBE_TIMEOUT_MS); + try { + const response = await fetch(options.connectUrl, { + headers: diagnosticHeaders(options.headers), + signal: abort.signal, + }); + if (response.ok) return undefined; + const body = (await response.text()).trim(); + return { + status: response.status, + statusText: response.statusText || "Rejected", + body: body || "No response body", + }; + } catch { + return undefined; + } finally { + clearTimeout(timeout); + } +} + +function diagnosticHeaders(headers: Record | undefined) { + const diagnostic = new Headers(headers); + diagnostic.set(TUNNEL_CONNECT_DIAGNOSTIC_HEADER, "1"); + return diagnostic; +} + // --------------------------------------------------------------------------- // Tunnel server (formerly src/server.ts) // --------------------------------------------------------------------------- diff --git a/src/routing.ts b/src/routing.ts index 475e51e..095d3a6 100644 --- a/src/routing.ts +++ b/src/routing.ts @@ -113,6 +113,13 @@ export function getTunnelUrlFromServerUrl(serverUrl: string, tunnelName: string) /** Header used by the Worker to advertise a tunnel's canonical URL to its client. */ export const TUNNEL_URL_HEADER = "x-captun-tunnel-url"; +/** Header used by clients to ask for read-only connect rejection details. */ +export const TUNNEL_CONNECT_DIAGNOSTIC_HEADER = "x-captun-connect-diagnostic"; + +/** Anonymous hosted clients use this token to prove they own an active tunnel name. */ +export const TUNNEL_OWNER_TOKEN_QUERY_PARAM = "captun-owner-token"; +export const TUNNEL_OWNER_TOKEN_HEADER = "x-captun-owner-token"; + /** Reserved path used by tunnel clients to open the WebSocket; not a tunnel name. */ const CONNECT_PATH_SEGMENT = "__captun-connect"; diff --git a/src/worker.ts b/src/worker.ts index c315be7..a1b703c 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,4 +1,5 @@ import { DurableObject } from "cloudflare:workers"; +import { decideTunnelAdmission } from "./hosted-admission.js"; import { acceptCaptunTunnel, type Fetcher } from "./index.js"; import { captunShardName, @@ -6,19 +7,49 @@ import { getTunnelNameFromUrl, getTunnelUrl, RESERVED_HOSTED_SUBDOMAINS, + TUNNEL_CONNECT_DIAGNOSTIC_HEADER, TUNNEL_URL_HEADER, } from "./routing.js"; type CaptunEnv = { CaptunServerShard: DurableObjectNamespace; + HostedRateLimiter?: DurableObjectNamespace; CAPTUN_SECRET?: string; SHARD_COUNT?: string; CUSTOM_HOSTNAME?: string; + HOSTED_RATE_LIMIT_WINDOW_SECONDS?: string; + HOSTED_CONNECTS_PER_IP_PER_WINDOW?: string; + HOSTED_REQUESTS_PER_IP_PER_WINDOW?: string; + HOSTED_REQUESTS_PER_TUNNEL_PER_WINDOW?: string; + HOSTED_RATE_LIMIT_DISABLED?: string; }; /** Set by the top-level Worker on the WebSocket-upgrade request so the DO knows the tunnel. */ const TUNNEL_NAME_HEADER = "x-captun-tunnel-name"; +const DEFAULT_HOSTED_RATE_LIMIT_WINDOW_SECONDS = 60; +const DEFAULT_HOSTED_CONNECTS_PER_IP_PER_WINDOW = 30; +const DEFAULT_HOSTED_REQUESTS_PER_IP_PER_WINDOW = 600; +const DEFAULT_HOSTED_REQUESTS_PER_TUNNEL_PER_WINDOW = 1200; +const HOSTED_RATE_LIMIT_DIAGNOSTIC_WINDOW_MS = 2_000; + +type HostedRateLimitKind = "connect" | "request"; + +type HostedRateLimitInput = { limit: number; windowSeconds: number }; + +type HostedRateLimitResult = { ok: true } | { ok: false; limit: number; retryAfterSeconds: number }; + +type HostedRateLimitBucket = { + count: number; + resetAt: number; + lastRejectedAt?: number; +}; + +type ActiveTunnel = { + fetcher: Fetcher & Disposable; + ownerToken?: string; +}; + /** * A shard Durable Object owns many named tunnels. * @@ -28,7 +59,7 @@ const TUNNEL_NAME_HEADER = "x-captun-tunnel-name"; * aggregate throughput for lots of concurrent large responses. */ export class CaptunServerShard extends DurableObject { - private readonly tunnels = new Map(); + private readonly tunnels = new Map(); // The DO's `fetch` only handles the WebSocket upgrade. The upgrade hand-off // is special-cased by the Workers runtime around `stub.fetch(...)` — a 101 @@ -40,28 +71,39 @@ export class CaptunServerShard extends DurableObject { const tunnelName = request.headers.get(TUNNEL_NAME_HEADER); if (!tunnelName) return new Response("Missing tunnel name\n", { status: 404 }); - const expected = this.env.CAPTUN_SECRET ? `Bearer ${this.env.CAPTUN_SECRET}` : undefined; - if (expected) { - // Constant-time comparison to avoid leaking the secret via timing. - const actual = new TextEncoder().encode(request.headers.get("authorization") ?? ""); - const want = new TextEncoder().encode(expected); - if (actual.length !== want.length || !crypto.subtle.timingSafeEqual(actual, want)) { - return new Response("Unauthorized\n", { status: 401 }); - } - } + const activeTunnel = this.tunnels.get(tunnelName); + const admission = decideTunnelAdmission({ + request, + env: this.env, + activeOwnerToken: activeTunnel?.ownerToken, + }); + if (!admission.ok) return admission.response; - this.tunnels.get(tunnelName)?.[Symbol.dispose](); + activeTunnel?.fetcher[Symbol.dispose](); const { response, tunnel } = acceptCaptunTunnel({ onDisconnect: () => { - if (this.tunnels.get(tunnelName) === tunnel) this.tunnels.delete(tunnelName); + if (this.tunnels.get(tunnelName)?.fetcher === tunnel) this.tunnels.delete(tunnelName); }, }); - this.tunnels.set(tunnelName, tunnel); + this.tunnels.set(tunnelName, { fetcher: tunnel, ownerToken: admission.ownerToken }); return response; } + diagnoseConnect(tunnelName: string, request: Request): Response { + const admission = decideTunnelAdmission({ + request, + env: this.env, + activeOwnerToken: this.tunnels.get(tunnelName)?.ownerToken, + }); + if (!admission.ok) return admission.response; + return new Response(null, { + status: 204, + headers: { "cache-control": "no-store" }, + }); + } + async forward(tunnelName: string, request: Request): Promise { - const tunnel = this.tunnels.get(tunnelName); + const tunnel = this.tunnels.get(tunnelName)?.fetcher; if (!tunnel) return new Response("No tunnel client connected\n", { status: 503 }); try { return await tunnel.fetch(request); @@ -71,8 +113,55 @@ export class CaptunServerShard extends DurableObject { } } +export class HostedRateLimiter extends DurableObject { + private bucket: HostedRateLimitBucket | undefined; + + check(input: HostedRateLimitInput): HostedRateLimitResult { + const now = Date.now(); + const bucket = this.activeBucket(now, now + input.windowSeconds * 1000); + if (bucket.count >= input.limit) { + bucket.lastRejectedAt = now; + return { + ok: false, + limit: input.limit, + retryAfterSeconds: Math.max(1, Math.ceil((bucket.resetAt - now) / 1000)), + }; + } + + bucket.count++; + return { ok: true }; + } + + diagnose(input: HostedRateLimitInput): HostedRateLimitResult { + const now = Date.now(); + const bucket = this.bucket; + if ( + bucket && + bucket.count >= input.limit && + bucket.resetAt > now && + bucket.lastRejectedAt && + now - bucket.lastRejectedAt <= HOSTED_RATE_LIMIT_DIAGNOSTIC_WINDOW_MS + ) { + return { + ok: false, + limit: input.limit, + retryAfterSeconds: Math.max(1, Math.ceil((bucket.resetAt - now) / 1000)), + }; + } + + return { ok: true }; + } + + private activeBucket(now: number, resetAt: number): HostedRateLimitBucket { + if (this.bucket && this.bucket.resetAt > now) return this.bucket; + const bucket: HostedRateLimitBucket = { count: 0, resetAt }; + this.bucket = bucket; + return bucket; + } +} + export default { - fetch(request: Request, env: CaptunEnv): Response | Promise { + async fetch(request: Request, env: CaptunEnv): Promise { const hostedResponse = hostedCaptunResponse(request, env); if (hostedResponse) return hostedResponse; @@ -100,9 +189,37 @@ export default { if (forwardedPath === "/__captun-connect") { const headers = new Headers(forwarded.headers); headers.set(TUNNEL_NAME_HEADER, tunnelName); - return shard.fetch(new Request(forwarded, { headers })); + const connectRequest = new Request(forwarded, { headers }); + if (isConnectDiagnostic(connectRequest)) { + const rateLimited = await hostedRateLimitDiagnosticResponse({ + env, + request, + tunnelName, + kind: "connect", + }); + if (rateLimited) return rateLimited; + return shard.diagnoseConnect(tunnelName, connectRequest); + } + + const rateLimited = await hostedRateLimitResponse({ + env, + request, + tunnelName, + kind: "connect", + }); + if (rateLimited) return rateLimited; + + return shard.fetch(connectRequest); } + const rateLimited = await hostedRateLimitResponse({ + env, + request, + tunnelName, + kind: "request", + }); + if (rateLimited) return rateLimited; + // Advertise the canonical tunnel URL back to the tunnel client. The CLI // reads this so it doesn't have to mirror the Worker's routing convention. const tunnelUrl = getTunnelUrl({ @@ -116,6 +233,150 @@ export default { }, } satisfies ExportedHandler; +function isConnectDiagnostic(request: Request) { + if (request.headers.get("upgrade") === "websocket") return false; + return request.headers.get(TUNNEL_CONNECT_DIAGNOSTIC_HEADER) === "1"; +} + +async function hostedRateLimitResponse(input: { + env: CaptunEnv; + request: Request; + tunnelName: string; + kind: HostedRateLimitKind; +}): Promise { + if (input.env.CUSTOM_HOSTNAME !== HOSTED_CAPTUN_HOSTNAME) return undefined; + if (!input.env.HostedRateLimiter) { + return hostedRateLimiterMissingResponse(input.env); + } + + const config = hostedRateLimitConfig(input.env); + const checks = hostedRateLimitChecks({ + kind: input.kind, + clientKey: hostedClientKey(input.request), + tunnelName: input.tunnelName, + config, + }); + for (const check of checks) { + const limiter = input.env.HostedRateLimiter.getByName(hostedRateLimiterName(check.key)); + const result = await limiter.check({ + limit: check.limit, + windowSeconds: config.windowSeconds, + }); + if (!result.ok) return hostedRateLimitedResponse(result); + } + + return undefined; +} + +async function hostedRateLimitDiagnosticResponse(input: { + env: CaptunEnv; + request: Request; + tunnelName: string; + kind: HostedRateLimitKind; +}): Promise { + if (input.env.CUSTOM_HOSTNAME !== HOSTED_CAPTUN_HOSTNAME) return undefined; + if (!input.env.HostedRateLimiter) { + return hostedRateLimiterMissingResponse(input.env); + } + + const config = hostedRateLimitConfig(input.env); + const checks = hostedRateLimitChecks({ + kind: input.kind, + clientKey: hostedClientKey(input.request), + tunnelName: input.tunnelName, + config, + }); + for (const check of checks) { + const limiter = input.env.HostedRateLimiter.getByName(hostedRateLimiterName(check.key)); + const result = await limiter.diagnose({ + limit: check.limit, + windowSeconds: config.windowSeconds, + }); + if (!result.ok) return hostedRateLimitedResponse(result); + } + + return undefined; +} + +function hostedRateLimiterMissingResponse(env: CaptunEnv) { + if (env.HOSTED_RATE_LIMIT_DISABLED === "1") return undefined; + return new Response("Hosted rate limiter is not configured\n", { + status: 503, + headers: { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }, + }); +} + +function hostedRateLimitedResponse(result: Extract) { + return new Response(`Rate limit exceeded. Try again in ${result.retryAfterSeconds}s.\n`, { + status: 429, + headers: { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + "retry-after": String(result.retryAfterSeconds), + "x-captun-rate-limit": String(result.limit), + }, + }); +} + +function hostedClientKey(request: Request) { + return request.headers.get("cf-connecting-ip") || "unknown"; +} + +function hostedRateLimitChecks(input: { + kind: HostedRateLimitKind; + clientKey: string; + tunnelName: string; + config: ReturnType; +}) { + if (input.kind === "connect") { + return [{ key: `connect:ip:${input.clientKey}`, limit: input.config.connectsPerIp }]; + } + + return [ + { key: `request:ip:${input.clientKey}`, limit: input.config.requestsPerIp }, + { key: `request:tunnel:${input.tunnelName}`, limit: input.config.requestsPerTunnel }, + ]; +} + +function hostedRateLimiterName(key: string) { + let hash = 2166136261; + for (let index = 0; index < key.length; index++) { + hash ^= key.charCodeAt(index); + hash = Math.imul(hash, 16777619); + } + return `bucket-${(hash >>> 0).toString(36)}`; +} + +function hostedRateLimitConfig(env: CaptunEnv) { + return { + windowSeconds: positiveInteger( + env.HOSTED_RATE_LIMIT_WINDOW_SECONDS, + DEFAULT_HOSTED_RATE_LIMIT_WINDOW_SECONDS, + ), + connectsPerIp: positiveInteger( + env.HOSTED_CONNECTS_PER_IP_PER_WINDOW, + DEFAULT_HOSTED_CONNECTS_PER_IP_PER_WINDOW, + ), + requestsPerIp: positiveInteger( + env.HOSTED_REQUESTS_PER_IP_PER_WINDOW, + DEFAULT_HOSTED_REQUESTS_PER_IP_PER_WINDOW, + ), + requestsPerTunnel: positiveInteger( + env.HOSTED_REQUESTS_PER_TUNNEL_PER_WINDOW, + DEFAULT_HOSTED_REQUESTS_PER_TUNNEL_PER_WINDOW, + ), + }; +} + +function positiveInteger(value: string | undefined, fallback: number) { + const parsed = Number(value); + if (!Number.isInteger(parsed) || parsed < 1) return fallback; + return parsed; +} + function hostedCaptunResponse(request: Request, env: CaptunEnv): Response | undefined { if (env.CUSTOM_HOSTNAME !== HOSTED_CAPTUN_HOSTNAME) return undefined; @@ -367,13 +628,15 @@ const WWW_BROWSER_MODULE = `import { newWebSocketRpcSession, RpcTarget } from "h export async function createCaptunTunnel(options) { const tunnelName = options.name || randomTunnelName(); + const ownerToken = options.ownerToken || randomOwnershipToken(); const publicUrl = "https://" + tunnelName + ".captun.sh"; - const socket = new WebSocket("wss://" + tunnelName + ".captun.sh/__captun-connect"); + const socket = new WebSocket("wss://" + tunnelName + ".captun.sh/__captun-connect?captun-owner-token=" + ownerToken); const tunnelTargetFetcher = new TunnelTargetFetcher(options.fetch); const session = newWebSocketRpcSession(socket, tunnelTargetFetcher); await waitUntilOpen(socket); return { url: publicUrl, + ownerToken, close: () => disposeSession(session), }; } @@ -413,6 +676,12 @@ function randomTunnelName() { return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); } +function randomOwnershipToken() { + const bytes = new Uint8Array(16); + crypto.getRandomValues(bytes); + return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); +} + function disposeSession(session) { const disposeSymbol = Symbol.dispose; if (disposeSymbol && typeof session[disposeSymbol] === "function") session[disposeSymbol](); diff --git a/tasks/complete/2026-05-24-hosted-admission-module.md b/tasks/complete/2026-05-24-hosted-admission-module.md new file mode 100644 index 0000000..c4b0321 --- /dev/null +++ b/tasks/complete/2026-05-24-hosted-admission-module.md @@ -0,0 +1,26 @@ +status: complete +size: small + +# Hosted Admission Module + +Status summary: Complete and locally verified. Hosted anonymous tunnel admission now lives in a small direct-testable module, while `CaptunServerShard` only supplies active tunnel state and performs WebSocket acceptance. + +## Checklist + +- [x] Extract a hosted tunnel admission module. _`src/hosted-admission.ts` now owns secret auth, hosted anonymous owner-token parsing, token validation, and active-owner conflict decisions._ +- [x] Keep the Durable Object focused on active tunnel state and WebSocket acceptance. _`CaptunServerShard.fetch` now calls `decideTunnelAdmission` with the current active owner token, then disposes/replaces the tunnel when admitted._ +- [x] Add direct tests for the admission decision. _`test/hosted-admission.test.ts` covers self-hosted bypass, secret auth, missing/invalid hosted tokens, same-owner replace, and different-owner conflict._ +- [x] Keep integration coverage passing. _Existing Worker ownership tests still cover Durable Object wiring; the conflict test now reads the rejection body before the follow-up forwarded request to avoid Miniflare response-body flakiness._ +- [x] Run focused and full verification. _Verified with focused Vitest, `pnpm run check`, `pnpm test`, and `pnpm run build`._ + +## Assumptions + +- This is stacked on `mmkal/26/05/24/hosted-connect-conflict-message`. +- This is an architecture-only change; hosted behavior, response status codes, and response bodies should stay compatible. +- The module interface should be small enough that future hosted safety checks can be added there without growing `CaptunServerShard.fetch`. + +## Implementation Notes + +- 2026-05-24: Nightly architecture pass recommended this because ownership-token safety policy was embedded in the Durable Object implementation, forcing integration setup for pure admission-policy cases. +- 2026-05-24: Replaced the Worker-specific `crypto.subtle.timingSafeEqual` call with a local constant-time string comparison so the admission module can be tested directly in Node while retaining fixed-work secret comparison behavior. +- 2026-05-24: Bugbot follow-up made secret-auth hosted admission ignore stale anonymous owner tokens, since setting `CAPTUN_SECRET` disables anonymous ownership policy. diff --git a/tasks/complete/2026-05-24-hosted-connect-conflict-message.md b/tasks/complete/2026-05-24-hosted-connect-conflict-message.md new file mode 100644 index 0000000..08c00c1 --- /dev/null +++ b/tasks/complete/2026-05-24-hosted-connect-conflict-message.md @@ -0,0 +1,28 @@ +status: complete +size: small + +# Hosted Connect Conflict Messages + +Status summary: Complete and locally verified. `createCaptunTunnel` now surfaces deterministic HTTP rejection details when available without hanging or mutating tunnel state during the diagnostic probe, and the CLI treats only Captun active-owner conflicts as name-in-use errors instead of DNS setup failures. + +## Checklist + +- [x] Add a regression test for a rejected WebSocket upgrade body. _`test/worker.test.ts` covers a 409 connect rejection and asserts `createCaptunTunnel` reports `Tunnel name is already connected`._ +- [x] Improve library connect errors for pre-open WebSocket failures. _`src/index.ts` now probes the connect URL after a pre-open WebSocket error with a read-only diagnostic header, aborts the probe after a short timeout, and throws `CaptunTunnelConnectError` with status/body details when the server exposes them._ +- [x] Improve CLI tunnel connect messaging. _`src/cli/bin.ts` detects the known Captun 409/name-in-use body and prints an active anonymous client explanation instead of DNS guidance._ +- [x] Add review regression coverage. _`test/worker.test.ts` covers an unresponsive diagnostic probe; `test/cli.test.ts` covers unrelated 409 responses retaining DNS guidance._ +- [x] Run focused and full verification. _Verified with focused Vitest files, `pnpm run check`, `pnpm test`, and `pnpm run build`._ + +## Notes + +- This is stacked on `mmkal/26/05/24/hosted-ownership-tokens`. +- The target conflict body from the Worker is `Tunnel name is already connected`. +- Keep self-hosted DNS/certificate guidance for ordinary connection failures. + +## Implementation Notes + +- 2026-05-24: Created as a follow-up to the hosted ownership-token PR after review found anonymous active-owner conflicts were being reported as generic WebSocket/DNS failures. +- 2026-05-24: Node's WebSocket `ErrorEvent` does not expose the rejected upgrade status/body directly, so the library performs a follow-up `fetch` to the same connect URL. This is deterministic for the hosted Worker conflict because the Worker returns `409` before creating the WebSocket upgrade response. +- 2026-05-24: Review follow-up added a timeout around the diagnostic HTTP probe and tightened CLI conflict classification so arbitrary `409` responses keep the generic troubleshooting path. +- 2026-05-24: Bugbot follow-up made the diagnostic probe explicitly read-only with `x-captun-connect-diagnostic`, so it does not accept/replace tunnel sessions and does not charge the hosted connect-attempt rate-limit bucket a second time. +- 2026-05-24: Second Bugbot follow-up taught diagnostics to report fail-closed limiter config and recent real connect rate-limit rejections without incrementing the bucket. diff --git a/tasks/complete/2026-05-24-hosted-ownership-tokens.md b/tasks/complete/2026-05-24-hosted-ownership-tokens.md new file mode 100644 index 0000000..94995db --- /dev/null +++ b/tasks/complete/2026-05-24-hosted-ownership-tokens.md @@ -0,0 +1,31 @@ +status: complete +size: medium + +# Hosted Anonymous Tunnel Ownership Tokens + +Status summary: Implementation is complete and locally verified. Hosted anonymous tunnel connections now require an ownership token, conflicting active owners get `409`, same-owner reconnects and CLI retries reuse one owner token, and self-hosted replacement behavior remains compatible. + +## Checklist + +- [x] Add hosted-only ownership-token parsing to the tunnel connect path. _`CaptunServerShard.fetch` now reads `captun-owner-token` or `x-captun-owner-token` only for anonymous `CUSTOM_HOSTNAME=captun.sh` connects._ +- [x] Let the first successful hosted connection for a tunnel name claim its token while active. _Active shard entries now store the accepted owner token beside the Cap'n Web fetcher._ +- [x] Let a reconnect with the same token replace its own active connection. _Same-token hosted connects still dispose and replace the previous active fetcher._ +- [x] Reject a different token with `409 Conflict` while the tunnel is already active. _Different-token hosted connects return `Tunnel name is already connected` without touching the active tunnel._ +- [x] Keep self-hosted and secret-protected tunnel behavior compatible. _Ownership enforcement is skipped outside hosted `captun.sh` and when `CAPTUN_SECRET` is configured; tests cover self-hosted replacement._ +- [x] Generate and send a client-side anonymous token for hosted CLI/API/browser clients. _`createCaptunTunnel` appends a generated query token to the WebSocket URL, returns it on `tunnel.ownerToken`, accepts it back as `ownerToken`, CLI retry loops reuse one generated token, and `/captun.browser.js` follows the same shape._ +- [x] Cover the hosted ownership behavior with integration-style tests. _`test/worker.test.ts` covers conflict, same-token replacement, header token parsing, missing-token rejection, client token generation/reuse, and self-hosted compatibility._ +- [x] Run the focused tests and full project checks. _Verified with focused worker tests, full `pnpm test`, `pnpm run check`, and `pnpm run build`._ + +## Notes + +- Scope is intentionally narrower than authenticated accounts: this is only an eviction guard for anonymous hosted tunnels. +- Tokens do not identify users and do not grant paid/custom subdomain rights. They only prove that a reconnect is from the same anonymous client instance. +- The hosted path is `captun.sh`; self-hosted Workers should keep the existing "last connection wins" behavior unless they opt into equivalent behavior later. + +## Implementation Notes + +- 2026-05-24: Tokens are intentionally passed on the connect request only, using a browser-compatible query parameter by default. The Worker also accepts an equivalent header for non-browser clients and tests. +- 2026-05-24: Review follow-up exposed `ownerToken` on the returned tunnel and added `ownerToken` as an explicit create option so exported API callers can exercise same-owner replacement without manually editing query strings. +- 2026-05-24: This does not persist ownership after disconnect. Once the active Cap'n Web session breaks, the tunnel name is free for another anonymous token to claim. +- 2026-05-24: Bugbot follow-up moved CLI retries to one owner token per tunnel session so a partially successful retry cannot conflict with its own previous attempt. +- 2026-05-24: Second Bugbot follow-up reused the library's exported `randomOwnershipToken` in the CLI instead of maintaining a duplicate token generator. diff --git a/tasks/hosted-captun-sh.md b/tasks/hosted-captun-sh.md index 7754802..36e2f40 100644 --- a/tasks/hosted-captun-sh.md +++ b/tasks/hosted-captun-sh.md @@ -5,7 +5,7 @@ size: medium # Hosted captun.sh -Status summary: Initial hosted deployment is live on `captun.sh`. The CLI, library, and browser landing-page demo can create hosted random tunnels; the main missing work is a proper free/paid control plane, tunnel ownership, and throttling. +Status summary: Initial hosted deployment is live on `captun.sh`. The CLI, library, and browser landing-page demo can create hosted random tunnels; the main missing work is a proper free/paid control plane, deeper resource caps, and observability. ## Initial public-hosted slice @@ -21,7 +21,7 @@ Status summary: Initial hosted deployment is live on `captun.sh`. The CLI, libra ## Safety and product follow-up - [ ] Use cryptographic random names for free hosted tunnels and keep friendly/custom subdomains behind auth or a paid reservation model. -- [ ] Add per-session tunnel ownership: first client claims a tunnel name, the same token can reconnect, and a different token gets `409` instead of evicting the active tunnel. +- [x] Add per-session tunnel ownership: first client claims a tunnel name, the same token can reconnect, and a different token gets `409` instead of evicting the active tunnel. _The stacked hosted-ownership-tokens PR stores anonymous active-owner tokens in `CaptunServerShard` and sends generated connect tokens from CLI/API/browser clients._ - [ ] Add Cloudflare Rate Limiting bindings for cheap edge throttles on connect attempts and forwarded requests. - [ ] Add Durable Object backed global-ish limits for active tunnels, concurrent tunnels per IP/account, and suspicious reconnect churn. - [ ] Add basic resource caps: max tunnel lifetime, idle timeout, in-flight request cap, request body size limit, and response streaming guardrails. diff --git a/tasks/hosted-rate-limits.md b/tasks/hosted-rate-limits.md new file mode 100644 index 0000000..831042d --- /dev/null +++ b/tasks/hosted-rate-limits.md @@ -0,0 +1,33 @@ +--- +status: review +size: medium +--- + +# Hosted captun.sh rate limits + +Status summary: First hosted throttling slice is implemented and locally verified. It adds hosted-only connect and forwarded-request limits with configurable Worker vars; this stacked branch also covers anonymous active-tunnel ownership, while paid/custom names and deeper abuse controls remain follow-up work. + +## First hosted throttling slice + +- [x] Add a hosted-only rate-limiter Durable Object. _`HostedRateLimiter` is bound in `wrangler.jsonc` and only consulted when `CUSTOM_HOSTNAME=captun.sh`._ +- [x] Limit tunnel connect attempts per client IP. _`__captun-connect` requests check `connect:ip:` before shard dispatch._ +- [x] Limit forwarded HTTP requests per client IP and per tunnel name. _Forwarded hosted requests check both `request:ip:` and `request:tunnel:` buckets._ +- [x] Return useful `429` responses. _Hosted throttles return plain text with `Retry-After`, `cache-control: no-store`, and `x-captun-rate-limit`._ +- [x] Make limits configurable by Worker vars. _Window and connect/IP/tunnel limits are controlled by `HOSTED_\*_PER_WINDOW` vars with public-service defaults._ +- [x] Cover limits in Miniflare tests. _`test/worker.test.ts` covers connect, per-IP request, per-tunnel request, and self-hosted bypass behavior._ +- [ ] Deploy to `captun-public` after merge-ready checks. _Not deployed yet; this stacked PR should deploy after review or on explicit request._ + +## Follow-up safety work + +- [x] Add tunnel ownership tokens so a different anonymous client cannot evict an active tunnel. _The stacked hosted-ownership-tokens PR returns `409` for a conflicting active token while allowing same-token replacement._ +- [ ] Add active tunnel caps and reconnect-churn limits. _Likely needs a global-ish Durable Object keyed separately from the shard count._ +- [ ] Add request body, response, and in-flight request caps. _Protect against tunnels used for bulk transfer or resource exhaustion._ +- [ ] Add Cloudflare-native Rate Limiting bindings where available. _Use edge throttles for cheaper rejection before Durable Objects wake up._ +- [ ] Add observability for 429s, high-volume IPs, high-volume tunnel names, and emergency shutdowns. _Needed before the public hosted service is advertised._ + +## Implementation Notes + +- 2026-05-23: Initial unsafe hosted service is intentionally live but obscure. This task starts the first throttling layer before publicising `captun.sh`. +- 2026-05-24: Implemented fixed-window in-memory buckets in hosted rate-limiter Durable Objects named by hashed bucket key. This is intentionally a first abuse guardrail, not billing-grade quota accounting. +- 2026-05-24: Review follow-up changed the limiter to fail closed when the binding is missing, added an explicit `HOSTED_RATE_LIMIT_DISABLED=1` escape hatch, and stopped trusting spoofable forwarded-IP headers. +- 2026-05-24: Verified with `pnpm run check`, `pnpm test`, `pnpm run build`, and `CAPTUN_PUBLIC_E2E=1 pnpm vitest run test/public-hosted.test.ts` after retrying one transient live WebSocket-open failure. diff --git a/test/cli.test.ts b/test/cli.test.ts new file mode 100644 index 0000000..eada5a6 --- /dev/null +++ b/test/cli.test.ts @@ -0,0 +1,226 @@ +import { createHash } from "node:crypto"; +import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; + +import { createRouterClient } from "@orpc/server"; +import { expect, test } from "vitest"; + +import { createCaptunCliRouter } from "../src/cli/bin.js"; + +test("CLI tunnel connect errors do not blame DNS for active-owner conflicts", async () => { + await using target = await createTestServer((_request, response) => { + response.end("ok\n"); + }); + await using rejection = await createRejectedTunnelServer("Tunnel name is already connected\n"); + + const router = createCaptunCliRouter({ readConfig: async () => undefined }); + const client = createRouterClient(router); + + let caught: unknown; + try { + await client.tunnel({ + target: String(target.port), + serverUrl: rejection.origin, + name: "demo", + requestLogs: false, + }); + } catch (error) { + caught = error; + } + + expect(caught).toMatchObject({ + message: expect.stringContaining("Tunnel name is already connected"), + }); + expect(caught).not.toMatchObject({ + message: expect.stringContaining("DNS for"), + }); +}); + +test("CLI tunnel connect errors keep DNS guidance for unrelated 409 responses", async () => { + await using target = await createTestServer((_request, response) => { + response.end("ok\n"); + }); + await using rejection = await createRejectedTunnelServer("Some other conflict\n"); + + const router = createCaptunCliRouter({ readConfig: async () => undefined }); + const client = createRouterClient(router); + + let caught: unknown; + try { + await client.tunnel({ + target: String(target.port), + serverUrl: rejection.origin, + name: "demo", + requestLogs: false, + }); + } catch (error) { + caught = error; + } + + expect(caught).toMatchObject({ + message: expect.stringContaining("Some other conflict"), + }); + expect(caught).toMatchObject({ + message: expect.stringContaining("DNS for"), + }); + expect(caught).not.toMatchObject({ + message: expect.stringContaining("active anonymous client"), + }); +}); + +test("CLI tunnel retries reuse the same anonymous owner token", async () => { + await using target = await createTestServer((_request, response) => { + response.end("ok\n"); + }); + await using tunnelServer = await createFlakyTunnelServer(); + + const router = createCaptunCliRouter({ + readConfig: async () => undefined, + waitForShutdown: async () => {}, + tunnelRetries: 1, + }); + const client = createRouterClient(router); + + await client.tunnel({ + target: String(target.port), + serverUrl: tunnelServer.origin, + name: "demo", + requestLogs: false, + }); + + const upgradeTokens = tunnelServer.upgradeUrls.map((url) => + new URL(url, tunnelServer.origin).searchParams.get("captun-owner-token"), + ); + expect(upgradeTokens).toEqual([upgradeTokens[0], upgradeTokens[0]]); +}); + +async function createTestServer( + handler: (req: IncomingMessage, res: ServerResponse) => void | Promise, +) { + const server = createServer((req, res) => { + void Promise.resolve(handler(req, res)).catch((error: unknown) => { + res.writeHead(500, { "content-type": "text/plain" }); + res.end(error instanceof Error ? error.stack : String(error)); + }); + }); + await new Promise((resolveListen, rejectListen) => { + server.once("error", rejectListen); + server.listen(0, "127.0.0.1", resolveListen); + }); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("Could not determine test-server port."); + } + return { + port: address.port, + async [Symbol.asyncDispose]() { + await new Promise((resolveClose) => server.close(() => resolveClose())); + }, + }; +} + +async function createFlakyTunnelServer() { + const upgradeUrls: string[] = []; + const sockets = new Set<{ destroy: () => void }>(); + const server = createServer((request, response) => { + if (request.url === "/demo/__captun/health") { + response.end("ok\n"); + return; + } + response.writeHead(503, { "content-type": "text/plain; charset=utf-8" }); + response.end("try again\n"); + }); + server.on("upgrade", (request, socket) => { + sockets.add(socket); + socket.once("close", () => sockets.delete(socket)); + upgradeUrls.push(request.url || ""); + if (upgradeUrls.length === 1) { + socket.write( + [ + "HTTP/1.1 503 Service Unavailable", + "Content-Type: text/plain; charset=utf-8", + "Content-Length: 10", + "Connection: close", + "", + "try again\n", + ].join("\r\n"), + ); + socket.destroy(); + return; + } + + const key = request.headers["sec-websocket-key"]; + if (typeof key !== "string") { + socket.destroy(); + return; + } + const accept = createHash("sha1") + .update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`) + .digest("base64"); + socket.write( + [ + "HTTP/1.1 101 Switching Protocols", + "Upgrade: websocket", + "Connection: Upgrade", + `Sec-WebSocket-Accept: ${accept}`, + "", + "", + ].join("\r\n"), + ); + }); + await new Promise((resolveListen, rejectListen) => { + server.once("error", rejectListen); + server.listen(0, "127.0.0.1", resolveListen); + }); + const address = server.address(); + if (!address || typeof address === "string") throw new Error("Could not start test server"); + + return { + origin: `http://127.0.0.1:${address.port}`, + upgradeUrls, + async [Symbol.asyncDispose]() { + for (const socket of sockets) socket.destroy(); + await new Promise((resolveClose) => server.close(() => resolveClose())); + }, + }; +} + +async function createRejectedTunnelServer(body: string) { + const sockets = new Set<{ destroy: () => void }>(); + const server = createServer((_request, response) => { + response.writeHead(409, { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }); + response.end(body); + }); + server.on("upgrade", (_request, socket) => { + sockets.add(socket); + socket.once("close", () => sockets.delete(socket)); + socket.write( + [ + "HTTP/1.1 409 Conflict", + "Content-Type: text/plain; charset=utf-8", + "Cache-Control: no-store", + `Content-Length: ${Buffer.byteLength(body)}`, + "Connection: close", + "", + body, + ].join("\r\n"), + ); + socket.destroy(); + }); + await new Promise((resolveListen, rejectListen) => { + server.once("error", rejectListen); + server.listen(0, "127.0.0.1", resolveListen); + }); + const address = server.address(); + if (!address || typeof address === "string") throw new Error("Could not start test server"); + + return { + origin: `http://127.0.0.1:${address.port}`, + async [Symbol.asyncDispose]() { + for (const socket of sockets) socket.destroy(); + await new Promise((resolveClose) => server.close(() => resolveClose())); + }, + }; +} diff --git a/test/hosted-admission.test.ts b/test/hosted-admission.test.ts new file mode 100644 index 0000000..3e5ebe9 --- /dev/null +++ b/test/hosted-admission.test.ts @@ -0,0 +1,105 @@ +import { expect, test } from "vitest"; + +import { decideTunnelAdmission, type HostedAdmissionEnv } from "../src/hosted-admission.js"; + +test("hosted tunnel admission allows self-hosted connects without owner tokens", () => { + const admission = decideTunnelAdmission({ + request: new Request("https://captun.example.com/demo/__captun-connect"), + env: { CUSTOM_HOSTNAME: "captun.example.com" }, + activeOwnerToken: undefined, + }); + + expect(admission).toMatchObject({ ok: true, ownerToken: undefined }); +}); + +test("hosted tunnel admission checks configured secrets before owner-token policy", async () => { + const rejected = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect"), + env: { CUSTOM_HOSTNAME: "captun.sh", CAPTUN_SECRET: "secret" }, + activeOwnerToken: undefined, + }); + const accepted = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect", { + headers: { authorization: "Bearer secret" }, + }), + env: { CUSTOM_HOSTNAME: "captun.sh", CAPTUN_SECRET: "secret" }, + activeOwnerToken: undefined, + }); + + expect(rejected).toMatchObject({ ok: false }); + if (rejected.ok) throw new Error("expected secret rejection"); + expect(rejected.response).toMatchObject({ status: 401 }); + expect(await rejected.response.text()).toBe("Unauthorized\n"); + expect(accepted).toMatchObject({ ok: true, ownerToken: undefined }); +}); + +test("hosted tunnel admission ignores active anonymous owners when secret auth is configured", () => { + const admission = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect", { + headers: { authorization: "Bearer secret" }, + }), + env: { CUSTOM_HOSTNAME: "captun.sh", CAPTUN_SECRET: "secret" }, + activeOwnerToken: "owner-a", + }); + + expect(admission).toMatchObject({ ok: true, ownerToken: undefined }); +}); + +test("hosted tunnel admission requires anonymous owner tokens on captun.sh", async () => { + const missing = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect"), + env: hostedEnv(), + activeOwnerToken: undefined, + }); + const invalid = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect?captun-owner-token=no spaces"), + env: hostedEnv(), + activeOwnerToken: undefined, + }); + + expect(missing).toMatchObject({ ok: false }); + if (missing.ok) throw new Error("expected missing token rejection"); + expect(missing.response).toMatchObject({ status: 400 }); + expect(await missing.response.text()).toBe("Missing tunnel ownership token\n"); + + expect(invalid).toMatchObject({ ok: false }); + if (invalid.ok) throw new Error("expected invalid token rejection"); + expect(invalid.response).toMatchObject({ status: 400 }); + expect(await invalid.response.text()).toBe("Invalid tunnel ownership token\n"); +}); + +test("hosted tunnel admission allows first and same-owner anonymous connects", () => { + const first = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a"), + env: hostedEnv(), + activeOwnerToken: undefined, + }); + const sameOwner = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect", { + headers: { "x-captun-owner-token": "owner-a" }, + }), + env: hostedEnv(), + activeOwnerToken: "owner-a", + }); + + expect(first).toMatchObject({ ok: true, ownerToken: "owner-a" }); + expect(sameOwner).toMatchObject({ ok: true, ownerToken: "owner-a" }); +}); + +test("hosted tunnel admission rejects different active anonymous owners", async () => { + const admission = decideTunnelAdmission({ + request: new Request("https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b"), + env: hostedEnv(), + activeOwnerToken: "owner-a", + }); + + expect(admission).toMatchObject({ ok: false }); + if (admission.ok) throw new Error("expected active owner rejection"); + expect(admission.response).toMatchObject({ status: 409 }); + expect(admission.response.headers.get("cache-control")).toBe("no-store"); + expect(await admission.response.text()).toBe("Tunnel name is already connected\n"); +}); + +function hostedEnv(): HostedAdmissionEnv { + return { CUSTOM_HOSTNAME: "captun.sh" }; +} diff --git a/test/miniflare.ts b/test/miniflare.ts index cfaabe9..6e2edce 100644 --- a/test/miniflare.ts +++ b/test/miniflare.ts @@ -52,6 +52,7 @@ export function createCaptunWorkerFixture(bindings: Record) { entryPoint: "src/worker.ts", durableObjects: { CaptunServerShard: { className: "CaptunServerShard" }, + HostedRateLimiter: { className: "HostedRateLimiter" }, }, bindings, }); diff --git a/test/worker.test.ts b/test/worker.test.ts index a5a541f..744c7eb 100644 --- a/test/worker.test.ts +++ b/test/worker.test.ts @@ -1,13 +1,17 @@ import { expect, test } from "vitest"; +import { createHash } from "node:crypto"; +import { createServer } from "node:http"; +import { newWebSocketRpcSession, RpcTarget } from "capnweb"; import { createCaptunTunnel } from "../src/index.js"; import { captunHealthResponse, isCaptunHealthRequest } from "../src/cli/tunnel-health.js"; import { captunShardName, getTunnelNameFromUrl, getTunnelUrl, + TUNNEL_CONNECT_DIAGNOSTIC_HEADER, TUNNEL_URL_HEADER, } from "../src/routing.js"; -import { createCaptunWorkerFixture } from "./miniflare.js"; +import { createCaptunWorkerFixture, createMiniflareWorkerFixture } from "./miniflare.js"; const tunnelNameCases: Array< [url: string, customHostname: string | undefined, name: string | null] @@ -139,6 +143,23 @@ test("Captun Worker forwards requests through a real Durable Object tunnel", asy }); }); +test("Captun Worker still lets self-hosted tunnels replace a name without ownership", async () => { + await using fixture = await createCaptunWorkerFixture({}); + using _firstTunnel = await createCaptunTunnel({ + url: `${fixture.origin}/demo/__captun-connect`, + fetch: () => new Response("first\n"), + }); + using _secondTunnel = await createCaptunTunnel({ + url: `${fixture.origin}/demo/__captun-connect`, + fetch: () => new Response("second\n"), + }); + + const response = await fetch(`${fixture.origin}/demo/hello`); + + expect(response).toMatchObject({ status: 200 }); + expect(await response.text()).toBe("second\n"); +}); + test("Captun Worker verifies health through a connected tunnel client", async () => { await using fixture = await createCaptunWorkerFixture({}); using _tunnel = await createCaptunTunnel({ @@ -231,7 +252,10 @@ test("Hosted Captun serves the browser demo module on www", async () => { expect(response).toMatchObject({ status: 200 }); expect(response.headers.get("content-type")).toContain("application/javascript"); - expect(await response.text()).toEqual(expect.stringContaining("createCaptunTunnel")); + const source = await response.text(); + + expect(source).toEqual(expect.stringContaining("createCaptunTunnel")); + expect(source).toEqual(expect.stringContaining("captun-owner-token")); }); test("Hosted Captun landing page includes an in-browser tunnel demo", async () => { @@ -325,6 +349,419 @@ test.each([ expect(await response.text()).toBe("Reserved captun.sh subdomain\n"); }); +test("Hosted Captun rate limits tunnel connect attempts per client IP", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + CAPTUN_SECRET: "secret", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "1", + }); + const headers = { "cf-connecting-ip": "203.0.113.10" }; + + const first = await fixture.worker.fetch("https://one.captun.sh/__captun-connect", { + headers, + }); + const second = await fixture.worker.fetch("https://two.captun.sh/__captun-connect", { + headers, + }); + + expect(first).toMatchObject({ status: 401 }); + expect(second).toMatchObject({ status: 429 }); + expect(second.headers.get("retry-after")).toBe("60"); + expect(second.headers.get("cache-control")).toBe("no-store"); + expect(await second.text()).toBe("Rate limit exceeded. Try again in 60s.\n"); +}); + +test("Hosted Captun rate limits forwarded requests per client IP", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_REQUESTS_PER_IP_PER_WINDOW: "2", + HOSTED_REQUESTS_PER_TUNNEL_PER_WINDOW: "100", + }); + const headers = { "cf-connecting-ip": "203.0.113.20" }; + + const first = await fixture.worker.fetch("https://one.captun.sh/hello", { headers }); + const second = await fixture.worker.fetch("https://two.captun.sh/hello", { headers }); + const third = await fixture.worker.fetch("https://three.captun.sh/hello", { headers }); + + expect(first).toMatchObject({ status: 503 }); + expect(second).toMatchObject({ status: 503 }); + expect(third).toMatchObject({ status: 429 }); + expect(third.headers.get("x-captun-rate-limit")).toBe("2"); +}); + +test("Hosted Captun rate limits forwarded requests per tunnel name", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_REQUESTS_PER_IP_PER_WINDOW: "100", + HOSTED_REQUESTS_PER_TUNNEL_PER_WINDOW: "1", + }); + + const first = await fixture.worker.fetch("https://one.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.30" }, + }); + const second = await fixture.worker.fetch("https://one.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.31" }, + }); + const otherTunnel = await fixture.worker.fetch("https://two.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.31" }, + }); + + expect(first).toMatchObject({ status: 503 }); + expect(second).toMatchObject({ status: 429 }); + expect(otherTunnel).toMatchObject({ status: 503 }); +}); + +test("Hosted Captun rate limits do not affect self-hosted folder routing", async () => { + await using fixture = await createCaptunWorkerFixture({ + HOSTED_REQUESTS_PER_IP_PER_WINDOW: "1", + }); + const headers = { "cf-connecting-ip": "203.0.113.40" }; + + const first = await fixture.worker.fetch(`${fixture.origin}/one/hello`, { headers }); + const second = await fixture.worker.fetch(`${fixture.origin}/two/hello`, { headers }); + + expect(first).toMatchObject({ status: 503 }); + expect(second).toMatchObject({ status: 503 }); +}); + +test("Hosted Captun fails closed when the rate limiter binding is missing", async () => { + await using fixture = await createMiniflareWorkerFixture({ + entryPoint: "src/worker.ts", + durableObjects: { + CaptunServerShard: { className: "CaptunServerShard" }, + }, + bindings: { CUSTOM_HOSTNAME: "captun.sh" }, + }); + + const response = await fixture.worker.fetch("https://one.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.50" }, + }); + + expect(response).toMatchObject({ status: 503 }); + expect(response.headers.get("cache-control")).toBe("no-store"); + expect(await response.text()).toBe("Hosted rate limiter is not configured\n"); +}); + +test("Hosted Captun only bypasses a missing rate limiter binding when explicitly disabled", async () => { + await using fixture = await createMiniflareWorkerFixture({ + entryPoint: "src/worker.ts", + durableObjects: { + CaptunServerShard: { className: "CaptunServerShard" }, + }, + bindings: { + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_RATE_LIMIT_DISABLED: "1", + HOSTED_REQUESTS_PER_IP_PER_WINDOW: "1", + }, + }); + + const first = await fixture.worker.fetch("https://one.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.51" }, + }); + const second = await fixture.worker.fetch("https://two.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.51" }, + }); + + expect(first).toMatchObject({ status: 503 }); + expect(second).toMatchObject({ status: 503 }); +}); + +test("Hosted Captun does not trust spoofable forwarded IP headers for rate limiting", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_REQUESTS_PER_IP_PER_WINDOW: "1", + }); + + const first = await fixture.worker.fetch("https://one.captun.sh/hello", { + headers: { "x-forwarded-for": "203.0.113.60" }, + }); + const second = await fixture.worker.fetch("https://two.captun.sh/hello", { + headers: { "x-forwarded-for": "203.0.113.61" }, + }); + + expect(first).toMatchObject({ status: 503 }); + expect(second).toMatchObject({ status: 429 }); +}); + +test("Hosted Captun rejects a different ownership token while a tunnel is active", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _ownerTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "owner a\n", + clientIp: "203.0.113.70", + }); + + const conflict = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b", + { headers: { "cf-connecting-ip": "203.0.113.71" } }, + ); + expect({ status: conflict.status }).toMatchObject({ status: 409 }); + expect(await conflict.text()).toBe("Tunnel name is already connected\n"); + + const stillOwned = await fixture.worker.fetch("https://demo.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.72" }, + }); + + expect({ status: stillOwned.status }).toMatchObject({ status: 200 }); + expect(await stillOwned.text()).toBe("owner a\n"); +}); + +test("Hosted Captun connect diagnostics do not replace active tunnels", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _ownerTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "owner a\n", + clientIp: "203.0.113.74", + }); + + const diagnostic = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + { + headers: { + "cf-connecting-ip": "203.0.113.75", + [TUNNEL_CONNECT_DIAGNOSTIC_HEADER]: "1", + }, + }, + ); + const stillOwned = await fixture.worker.fetch("https://demo.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.76" }, + }); + + expect({ status: diagnostic.status }).toMatchObject({ status: 204 }); + expect({ status: stillOwned.status }).toMatchObject({ status: 200 }); + expect(await stillOwned.text()).toBe("owner a\n"); +}); + +test("Hosted Captun connect diagnostics do not spend connect rate-limit slots", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "2", + }); + using _ownerTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "owner a\n", + clientIp: "203.0.113.77", + }); + + const firstConflict = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b", + { headers: { "cf-connecting-ip": "203.0.113.78" } }, + ); + const diagnostic = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b", + { + headers: { + "cf-connecting-ip": "203.0.113.78", + [TUNNEL_CONNECT_DIAGNOSTIC_HEADER]: "1", + }, + }, + ); + const secondConflict = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-c", + { headers: { "cf-connecting-ip": "203.0.113.78" } }, + ); + + expect({ status: firstConflict.status }).toMatchObject({ status: 409 }); + expect({ status: diagnostic.status }).toMatchObject({ status: 409 }); + expect(await diagnostic.text()).toBe("Tunnel name is already connected\n"); + expect({ status: secondConflict.status }).toMatchObject({ status: 409 }); +}); + +test("Hosted Captun connect diagnostics surface recent connect rate limits", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "1", + }); + using _ownerTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "owner a\n", + clientIp: "203.0.113.79", + }); + + const rateLimited = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b", + { headers: { "cf-connecting-ip": "203.0.113.79" } }, + ); + const diagnostic = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b", + { + headers: { + "cf-connecting-ip": "203.0.113.79", + [TUNNEL_CONNECT_DIAGNOSTIC_HEADER]: "1", + }, + }, + ); + + expect({ status: rateLimited.status }).toMatchObject({ status: 429 }); + expect({ status: diagnostic.status }).toMatchObject({ status: 429 }); + expect(await diagnostic.text()).toMatch(/^Rate limit exceeded\. Try again in \d+s\.\n$/); +}); + +test("Hosted Captun connect diagnostics fail closed when rate limiter binding is missing", async () => { + await using fixture = await createMiniflareWorkerFixture({ + entryPoint: "src/worker.ts", + durableObjects: { CaptunServerShard: { className: "CaptunServerShard" } }, + bindings: { CUSTOM_HOSTNAME: "captun.sh" }, + }); + + const diagnostic = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + { + headers: { + "cf-connecting-ip": "203.0.113.84", + [TUNNEL_CONNECT_DIAGNOSTIC_HEADER]: "1", + }, + }, + ); + + expect({ status: diagnostic.status }).toMatchObject({ status: 503 }); + expect(await diagnostic.text()).toBe("Hosted rate limiter is not configured\n"); +}); + +test("Hosted Captun lets the same ownership token replace its active tunnel", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _firstTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "first\n", + clientIp: "203.0.113.80", + }); + using _secondTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "second\n", + clientIp: "203.0.113.81", + }); + + const response = await fixture.worker.fetch("https://demo.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.82" }, + }); + + expect(response).toMatchObject({ status: 200 }); + expect(await response.text()).toBe("second\n"); +}); + +test("Hosted Captun accepts an ownership token header", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _tunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://header.captun.sh/__captun-connect", + responseText: "header token\n", + clientIp: "203.0.113.85", + headers: { "x-captun-owner-token": "owner-from-header" }, + }); + + const response = await fixture.worker.fetch("https://header.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.86" }, + }); + + expect(response).toMatchObject({ status: 200 }); + expect(await response.text()).toBe("header token\n"); +}); + +test("Hosted Captun requires anonymous ownership tokens for public hosted connections", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + + const response = await fixture.worker.fetch("https://demo.captun.sh/__captun-connect", { + headers: { "cf-connecting-ip": "203.0.113.90" }, + }); + + expect(response).toMatchObject({ status: 400 }); + expect(await response.text()).toBe("Missing tunnel ownership token\n"); +}); + +test("Captun clients send an anonymous ownership token on the tunnel WebSocket URL", async () => { + await using recorder = await createWebSocketUpgradeRecorder(); + + using _tunnel = await createCaptunTunnel({ + url: `${recorder.origin}/demo/__captun-connect`, + fetch: () => new Response("ok\n"), + }); + + const upgradeUrl = new URL(recorder.upgradeUrl.current || "", recorder.origin); + + expect(upgradeUrl).toMatchObject({ pathname: "/demo/__captun-connect" }); + expect(upgradeUrl.searchParams.get("captun-owner-token")).toMatch(/^[a-f0-9]{32}$/); + expect(_tunnel).toMatchObject({ + ownerToken: upgradeUrl.searchParams.get("captun-owner-token"), + }); +}); + +test("Captun clients can reuse a returned anonymous ownership token", async () => { + await using recorder = await createWebSocketUpgradeRecorder(); + + using firstTunnel = await createCaptunTunnel({ + url: `${recorder.origin}/demo/__captun-connect`, + fetch: () => new Response("first\n"), + }); + using secondTunnel = await createCaptunTunnel({ + url: `${recorder.origin}/demo/__captun-connect`, + ownerToken: firstTunnel.ownerToken, + fetch: () => new Response("second\n"), + }); + + const upgradeUrls = recorder.upgradeUrls.map((url) => new URL(url, recorder.origin)); + + expect(secondTunnel).toMatchObject({ ownerToken: firstTunnel.ownerToken }); + expect(upgradeUrls.map((url) => url.searchParams.get("captun-owner-token"))).toEqual([ + firstTunnel.ownerToken, + firstTunnel.ownerToken, + ]); +}); + +test("createCaptunTunnel surfaces rejected WebSocket upgrade response details", async () => { + await using rejection = await createRejectedWebSocketUpgradeServer({ + status: 409, + body: "Tunnel name is already connected\n", + }); + + await expect( + createCaptunTunnel({ + url: `${rejection.origin}/demo/__captun-connect`, + fetch: () => new Response("unused\n"), + }), + ).rejects.toThrow(/409 Conflict: Tunnel name is already connected/); +}); + +test("createCaptunTunnel falls back when the rejected upgrade probe does not respond", async () => { + await using rejection = await createRejectedWebSocketUpgradeServer({ + status: 409, + body: "Tunnel name is already connected\n", + neverRespondToHttp: true, + }); + + let caught: unknown; + try { + await createCaptunTunnel({ + url: `${rejection.origin}/demo/__captun-connect`, + fetch: () => new Response("unused\n"), + }); + } catch (error) { + caught = error; + } + + expect(caught).toMatchObject({ message: "WebSocket connection failed" }); +}); + test("Captun Worker rejects missing tunnel names before Durable Object dispatch", async () => { await using fixture = await createCaptunWorkerFixture({}); @@ -351,3 +788,139 @@ test("Captun Worker requires the configured secret before accepting a tunnel cli expect(response).toMatchObject({ status: 401 }); expect(await response.text()).toBe("Unauthorized\n"); }); + +async function createWebSocketUpgradeRecorder() { + const upgradeUrl = { current: "" }; + const upgradeUrls: string[] = []; + const sockets = new Set<{ destroy: () => void }>(); + const server = createServer(); + server.on("upgrade", (request, socket) => { + sockets.add(socket); + socket.once("close", () => sockets.delete(socket)); + upgradeUrl.current = request.url || ""; + upgradeUrls.push(upgradeUrl.current); + const key = request.headers["sec-websocket-key"]; + if (typeof key !== "string") { + socket.destroy(); + return; + } + const accept = createHash("sha1") + .update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`) + .digest("base64"); + socket.write( + [ + "HTTP/1.1 101 Switching Protocols", + "Upgrade: websocket", + "Connection: Upgrade", + `Sec-WebSocket-Accept: ${accept}`, + "", + "", + ].join("\r\n"), + ); + }); + await new Promise((resolveListen, rejectListen) => { + server.once("error", rejectListen); + server.listen(0, "127.0.0.1", resolveListen); + }); + const address = server.address(); + if (!address || typeof address === "string") throw new Error("Could not start test server"); + + return { + origin: `http://127.0.0.1:${address.port}`, + upgradeUrl, + upgradeUrls, + async [Symbol.asyncDispose]() { + for (const socket of sockets) socket.destroy(); + await new Promise((resolveClose) => server.close(() => resolveClose())); + }, + }; +} + +async function createRejectedWebSocketUpgradeServer(options: { + status: number; + body: string; + neverRespondToHttp?: boolean; +}) { + const sockets = new Set<{ destroy: () => void }>(); + const statusText = options.status === 409 ? "Conflict" : "Rejected"; + const server = createServer((_request, response) => { + if (options.neverRespondToHttp) return; + response.writeHead(options.status, { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }); + response.end(options.body); + }); + server.on("connection", (socket) => { + sockets.add(socket); + socket.once("close", () => sockets.delete(socket)); + }); + server.on("upgrade", (_request, socket) => { + socket.write( + [ + `HTTP/1.1 ${options.status} ${statusText}`, + "Content-Type: text/plain; charset=utf-8", + "Cache-Control: no-store", + `Content-Length: ${Buffer.byteLength(options.body)}`, + "Connection: close", + "", + options.body, + ].join("\r\n"), + ); + socket.destroy(); + }); + await new Promise((resolveListen, rejectListen) => { + server.once("error", rejectListen); + server.listen(0, "127.0.0.1", resolveListen); + }); + const address = server.address(); + if (!address || typeof address === "string") throw new Error("Could not start test server"); + + return { + origin: `http://127.0.0.1:${address.port}`, + async [Symbol.asyncDispose]() { + for (const socket of sockets) socket.destroy(); + await new Promise((resolveClose) => server.close(() => resolveClose())); + }, + }; +} + +async function createDirectWorkerTunnel(options: { + fixture: any; + url: string; + responseText: string; + clientIp: string; + headers?: Record; +}) { + const response = await options.fixture.worker.fetch(options.url, { + headers: { + upgrade: "websocket", + "cf-connecting-ip": options.clientIp, + ...options.headers, + }, + }); + expect(response).toMatchObject({ status: 101 }); + + const socket = response.webSocket; + socket.accept(); + const session = newWebSocketRpcSession(socket, new TestTunnelFetcher(options.responseText)); + + return { + [Symbol.dispose]() { + session[Symbol.dispose](); + }, + }; +} + +class TestTunnelFetcher extends RpcTarget { + private responseText: string; + + constructor(responseText: string) { + super(); + this.responseText = responseText; + } + + fetch() { + return new Response(this.responseText); + } +} diff --git a/wrangler.jsonc b/wrangler.jsonc index 5ca49da..b67f507 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -10,7 +10,13 @@ "head_sampling_rate": 1, }, "durable_objects": { - "bindings": [{ "name": "CaptunServerShard", "class_name": "CaptunServerShard" }], + "bindings": [ + { "name": "CaptunServerShard", "class_name": "CaptunServerShard" }, + { "name": "HostedRateLimiter", "class_name": "HostedRateLimiter" }, + ], }, - "migrations": [{ "tag": "v1", "new_sqlite_classes": ["CaptunServerShard"] }], + "migrations": [ + { "tag": "v1", "new_sqlite_classes": ["CaptunServerShard"] }, + { "tag": "v2", "new_sqlite_classes": ["HostedRateLimiter"] }, + ], }