diff --git a/src/index.ts b/src/index.ts index d376754..8e1c512 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,10 @@ import { newWebSocketRpcSession, RpcTarget } from "capnweb"; -import { getTunnelUrlFromServerUrl, HOSTED_CAPTUN_SERVER_URL } from "./routing.js"; +import { + getTunnelUrlFromServerUrl, + HOSTED_CAPTUN_SERVER_URL, + TUNNEL_OWNER_TOKEN_HEADER, + TUNNEL_OWNER_TOKEN_QUERY_PARAM, +} from "./routing.js"; /** Fetch is all you need! * @@ -28,6 +33,7 @@ export interface Fetcher { */ export type CaptunTunnel = Disposable & { url: string; + ownerToken: string; }; export async function createCaptunTunnel( @@ -36,10 +42,16 @@ export async function createCaptunTunnel( serverUrl?: string; name?: string; headers?: Record; + ownerToken?: string; }, ): Promise { const endpoint = resolveTunnelEndpoint(options); - const socket = createWebSocket({ url: endpoint.connectUrl, headers: options.headers }); + const ownership = withAnonymousOwnershipToken({ + connectUrl: endpoint.connectUrl, + headers: options.headers, + ownerToken: options.ownerToken, + }); + const socket = createWebSocket({ url: ownership.connectUrl, headers: options.headers }); // tunnelTargetFetcher is the "main object" that comes out on the other side in acceptCaptunTunnel // as a capnweb rpc stub that the server can just call fetch on const tunnelTargetFetcher = new TunnelTargetFetcher({ fetch: options.fetch }); @@ -48,6 +60,7 @@ export async function createCaptunTunnel( return { url: endpoint.publicUrl, + ownerToken: ownership.ownerToken, [Symbol.dispose]: () => session[Symbol.dispose](), }; } @@ -71,15 +84,47 @@ function resolveTunnelEndpoint(options: { function publicUrlFromConnectUrl(connectUrl: URL) { const publicUrl = new URL(connectUrl); publicUrl.pathname = publicUrl.pathname.replace(/\/__captun-connect\/?$/, "") || "/"; + publicUrl.search = ""; + publicUrl.hash = ""; return publicUrl.toString().replace(/\/$/, ""); } +function withAnonymousOwnershipToken(options: { + connectUrl: string; + headers: Record | undefined; + ownerToken: string | undefined; +}) { + const headerToken = getHeader(options.headers, TUNNEL_OWNER_TOKEN_HEADER); + if (headerToken) return { connectUrl: options.connectUrl, ownerToken: headerToken }; + + const connectUrl = new URL(options.connectUrl); + const ownerToken = + options.ownerToken || + connectUrl.searchParams.get(TUNNEL_OWNER_TOKEN_QUERY_PARAM) || + randomOwnershipToken(); + connectUrl.searchParams.set(TUNNEL_OWNER_TOKEN_QUERY_PARAM, ownerToken); + return { connectUrl: connectUrl.toString(), ownerToken }; +} + +function getHeader(headers: Record | undefined, name: string) { + if (!headers) return undefined; + const lowerName = name.toLowerCase(); + const key = Object.keys(headers).find((candidate) => candidate.toLowerCase() === lowerName); + return key ? headers[key] : undefined; +} + function randomTunnelName() { const bytes = new Uint8Array(8); crypto.getRandomValues(bytes); return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); } +function randomOwnershipToken() { + const bytes = new Uint8Array(16); + crypto.getRandomValues(bytes); + return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); +} + class TunnelTargetFetcher extends RpcTarget implements Fetcher { private fetcher: Fetcher; diff --git a/src/routing.ts b/src/routing.ts index 475e51e..d739bdd 100644 --- a/src/routing.ts +++ b/src/routing.ts @@ -113,6 +113,10 @@ export function getTunnelUrlFromServerUrl(serverUrl: string, tunnelName: string) /** Header used by the Worker to advertise a tunnel's canonical URL to its client. */ export const TUNNEL_URL_HEADER = "x-captun-tunnel-url"; +/** Anonymous hosted clients use this token to prove they own an active tunnel name. */ +export const TUNNEL_OWNER_TOKEN_QUERY_PARAM = "captun-owner-token"; +export const TUNNEL_OWNER_TOKEN_HEADER = "x-captun-owner-token"; + /** Reserved path used by tunnel clients to open the WebSocket; not a tunnel name. */ const CONNECT_PATH_SEGMENT = "__captun-connect"; diff --git a/src/worker.ts b/src/worker.ts index 32c7223..f830b7f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -6,6 +6,8 @@ import { getTunnelNameFromUrl, getTunnelUrl, RESERVED_HOSTED_SUBDOMAINS, + TUNNEL_OWNER_TOKEN_HEADER, + TUNNEL_OWNER_TOKEN_QUERY_PARAM, TUNNEL_URL_HEADER, } from "./routing.js"; @@ -41,6 +43,11 @@ type HostedRateLimitBucket = { resetAt: number; }; +type ActiveTunnel = { + fetcher: Fetcher & Disposable; + ownerToken?: string; +}; + /** * A shard Durable Object owns many named tunnels. * @@ -50,7 +57,7 @@ type HostedRateLimitBucket = { * aggregate throughput for lots of concurrent large responses. */ export class CaptunServerShard extends DurableObject { - private readonly tunnels = new Map(); + private readonly tunnels = new Map(); // The DO's `fetch` only handles the WebSocket upgrade. The upgrade hand-off // is special-cased by the Workers runtime around `stub.fetch(...)` — a 101 @@ -72,18 +79,32 @@ export class CaptunServerShard extends DurableObject { } } - this.tunnels.get(tunnelName)?.[Symbol.dispose](); + const ownerToken = hostedAnonymousOwnerToken(request, this.env); + if (ownerToken instanceof Response) return ownerToken; + + const activeTunnel = this.tunnels.get(tunnelName); + if (activeTunnel?.ownerToken && activeTunnel.ownerToken !== ownerToken) { + return new Response("Tunnel name is already connected\n", { + status: 409, + headers: { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }, + }); + } + + activeTunnel?.fetcher[Symbol.dispose](); const { response, tunnel } = acceptCaptunTunnel({ onDisconnect: () => { - if (this.tunnels.get(tunnelName) === tunnel) this.tunnels.delete(tunnelName); + if (this.tunnels.get(tunnelName)?.fetcher === tunnel) this.tunnels.delete(tunnelName); }, }); - this.tunnels.set(tunnelName, tunnel); + this.tunnels.set(tunnelName, { fetcher: tunnel, ownerToken }); return response; } async forward(tunnelName: string, request: Request): Promise { - const tunnel = this.tunnels.get(tunnelName); + const tunnel = this.tunnels.get(tunnelName)?.fetcher; if (!tunnel) return new Response("No tunnel client connected\n", { status: 503 }); try { return await tunnel.fetch(request); @@ -217,6 +238,39 @@ async function hostedRateLimitResponse(input: { return undefined; } +function hostedAnonymousOwnerToken( + request: Request, + env: CaptunEnv, +): string | Response | undefined { + if (env.CUSTOM_HOSTNAME !== HOSTED_CAPTUN_HOSTNAME) return undefined; + if (env.CAPTUN_SECRET) return undefined; + + const token = + request.headers.get(TUNNEL_OWNER_TOKEN_HEADER) || + new URL(request.url).searchParams.get(TUNNEL_OWNER_TOKEN_QUERY_PARAM) || + ""; + if (!token) { + return new Response("Missing tunnel ownership token\n", { + status: 400, + headers: { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }, + }); + } + if (!/^[a-zA-Z0-9._~-]{1,128}$/.test(token)) { + return new Response("Invalid tunnel ownership token\n", { + status: 400, + headers: { + "content-type": "text/plain; charset=utf-8", + "cache-control": "no-store", + }, + }); + } + + return token; +} + function hostedRateLimitedResponse(result: Extract) { return new Response(`Rate limit exceeded. Try again in ${result.retryAfterSeconds}s.\n`, { status: 429, @@ -536,13 +590,15 @@ const WWW_BROWSER_MODULE = `import { newWebSocketRpcSession, RpcTarget } from "h export async function createCaptunTunnel(options) { const tunnelName = options.name || randomTunnelName(); + const ownerToken = options.ownerToken || randomOwnershipToken(); const publicUrl = "https://" + tunnelName + ".captun.sh"; - const socket = new WebSocket("wss://" + tunnelName + ".captun.sh/__captun-connect"); + const socket = new WebSocket("wss://" + tunnelName + ".captun.sh/__captun-connect?captun-owner-token=" + ownerToken); const tunnelTargetFetcher = new TunnelTargetFetcher(options.fetch); const session = newWebSocketRpcSession(socket, tunnelTargetFetcher); await waitUntilOpen(socket); return { url: publicUrl, + ownerToken, close: () => disposeSession(session), }; } @@ -582,6 +638,12 @@ function randomTunnelName() { return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); } +function randomOwnershipToken() { + const bytes = new Uint8Array(16); + crypto.getRandomValues(bytes); + return Array.from(bytes, (byte) => byte.toString(16).padStart(2, "0")).join(""); +} + function disposeSession(session) { const disposeSymbol = Symbol.dispose; if (disposeSymbol && typeof session[disposeSymbol] === "function") session[disposeSymbol](); diff --git a/tasks/complete/2026-05-24-hosted-ownership-tokens.md b/tasks/complete/2026-05-24-hosted-ownership-tokens.md new file mode 100644 index 0000000..3a223e5 --- /dev/null +++ b/tasks/complete/2026-05-24-hosted-ownership-tokens.md @@ -0,0 +1,29 @@ +status: complete +size: medium + +# Hosted Anonymous Tunnel Ownership Tokens + +Status summary: Implementation is complete and locally verified. Hosted anonymous tunnel connections now require an ownership token, conflicting active owners get `409`, same-owner reconnects can replace themselves through the public API, and self-hosted replacement behavior remains compatible. + +## Checklist + +- [x] Add hosted-only ownership-token parsing to the tunnel connect path. _`CaptunServerShard.fetch` now reads `captun-owner-token` or `x-captun-owner-token` only for anonymous `CUSTOM_HOSTNAME=captun.sh` connects._ +- [x] Let the first successful hosted connection for a tunnel name claim its token while active. _Active shard entries now store the accepted owner token beside the Cap'n Web fetcher._ +- [x] Let a reconnect with the same token replace its own active connection. _Same-token hosted connects still dispose and replace the previous active fetcher._ +- [x] Reject a different token with `409 Conflict` while the tunnel is already active. _Different-token hosted connects return `Tunnel name is already connected` without touching the active tunnel._ +- [x] Keep self-hosted and secret-protected tunnel behavior compatible. _Ownership enforcement is skipped outside hosted `captun.sh` and when `CAPTUN_SECRET` is configured; tests cover self-hosted replacement._ +- [x] Generate and send a client-side anonymous token for hosted CLI/API/browser clients. _`createCaptunTunnel` appends a generated query token to the WebSocket URL, returns it on `tunnel.ownerToken`, accepts it back as `ownerToken`, and `/captun.browser.js` follows the same shape._ +- [x] Cover the hosted ownership behavior with integration-style tests. _`test/worker.test.ts` covers conflict, same-token replacement, header token parsing, missing-token rejection, client token generation/reuse, and self-hosted compatibility._ +- [x] Run the focused tests and full project checks. _Verified with focused worker tests, full `pnpm test`, `pnpm run check`, and `pnpm run build`._ + +## Notes + +- Scope is intentionally narrower than authenticated accounts: this is only an eviction guard for anonymous hosted tunnels. +- Tokens do not identify users and do not grant paid/custom subdomain rights. They only prove that a reconnect is from the same anonymous client instance. +- The hosted path is `captun.sh`; self-hosted Workers should keep the existing "last connection wins" behavior unless they opt into equivalent behavior later. + +## Implementation Notes + +- 2026-05-24: Tokens are intentionally passed on the connect request only, using a browser-compatible query parameter by default. The Worker also accepts an equivalent header for non-browser clients and tests. +- 2026-05-24: Review follow-up exposed `ownerToken` on the returned tunnel and added `ownerToken` as an explicit create option so exported API callers can exercise same-owner replacement without manually editing query strings. +- 2026-05-24: This does not persist ownership after disconnect. Once the active Cap'n Web session breaks, the tunnel name is free for another anonymous token to claim. diff --git a/tasks/hosted-captun-sh.md b/tasks/hosted-captun-sh.md index 7754802..36e2f40 100644 --- a/tasks/hosted-captun-sh.md +++ b/tasks/hosted-captun-sh.md @@ -5,7 +5,7 @@ size: medium # Hosted captun.sh -Status summary: Initial hosted deployment is live on `captun.sh`. The CLI, library, and browser landing-page demo can create hosted random tunnels; the main missing work is a proper free/paid control plane, tunnel ownership, and throttling. +Status summary: Initial hosted deployment is live on `captun.sh`. The CLI, library, and browser landing-page demo can create hosted random tunnels; the main missing work is a proper free/paid control plane, deeper resource caps, and observability. ## Initial public-hosted slice @@ -21,7 +21,7 @@ Status summary: Initial hosted deployment is live on `captun.sh`. The CLI, libra ## Safety and product follow-up - [ ] Use cryptographic random names for free hosted tunnels and keep friendly/custom subdomains behind auth or a paid reservation model. -- [ ] Add per-session tunnel ownership: first client claims a tunnel name, the same token can reconnect, and a different token gets `409` instead of evicting the active tunnel. +- [x] Add per-session tunnel ownership: first client claims a tunnel name, the same token can reconnect, and a different token gets `409` instead of evicting the active tunnel. _The stacked hosted-ownership-tokens PR stores anonymous active-owner tokens in `CaptunServerShard` and sends generated connect tokens from CLI/API/browser clients._ - [ ] Add Cloudflare Rate Limiting bindings for cheap edge throttles on connect attempts and forwarded requests. - [ ] Add Durable Object backed global-ish limits for active tunnels, concurrent tunnels per IP/account, and suspicious reconnect churn. - [ ] Add basic resource caps: max tunnel lifetime, idle timeout, in-flight request cap, request body size limit, and response streaming guardrails. diff --git a/tasks/hosted-rate-limits.md b/tasks/hosted-rate-limits.md index a66850a..831042d 100644 --- a/tasks/hosted-rate-limits.md +++ b/tasks/hosted-rate-limits.md @@ -5,7 +5,7 @@ size: medium # Hosted captun.sh rate limits -Status summary: First hosted throttling slice is implemented and locally verified. It adds hosted-only connect and forwarded-request limits with configurable Worker vars; ownership, paid/custom names, and deeper abuse controls remain follow-up work. +Status summary: First hosted throttling slice is implemented and locally verified. It adds hosted-only connect and forwarded-request limits with configurable Worker vars; this stacked branch also covers anonymous active-tunnel ownership, while paid/custom names and deeper abuse controls remain follow-up work. ## First hosted throttling slice @@ -19,7 +19,7 @@ Status summary: First hosted throttling slice is implemented and locally verifie ## Follow-up safety work -- [ ] Add tunnel ownership tokens so a different anonymous client cannot evict an active tunnel. _This should return `409` for conflicting reconnects rather than silently replacing the active client._ +- [x] Add tunnel ownership tokens so a different anonymous client cannot evict an active tunnel. _The stacked hosted-ownership-tokens PR returns `409` for a conflicting active token while allowing same-token replacement._ - [ ] Add active tunnel caps and reconnect-churn limits. _Likely needs a global-ish Durable Object keyed separately from the shard count._ - [ ] Add request body, response, and in-flight request caps. _Protect against tunnels used for bulk transfer or resource exhaustion._ - [ ] Add Cloudflare-native Rate Limiting bindings where available. _Use edge throttles for cheaper rejection before Durable Objects wake up._ diff --git a/test/worker.test.ts b/test/worker.test.ts index 127afa8..0c067b6 100644 --- a/test/worker.test.ts +++ b/test/worker.test.ts @@ -1,4 +1,7 @@ import { expect, test } from "vitest"; +import { createHash } from "node:crypto"; +import { createServer } from "node:http"; +import { newWebSocketRpcSession, RpcTarget } from "capnweb"; import { createCaptunTunnel } from "../src/index.js"; import { captunHealthResponse, isCaptunHealthRequest } from "../src/cli/tunnel-health.js"; import { @@ -139,6 +142,23 @@ test("Captun Worker forwards requests through a real Durable Object tunnel", asy }); }); +test("Captun Worker still lets self-hosted tunnels replace a name without ownership", async () => { + await using fixture = await createCaptunWorkerFixture({}); + using _firstTunnel = await createCaptunTunnel({ + url: `${fixture.origin}/demo/__captun-connect`, + fetch: () => new Response("first\n"), + }); + using _secondTunnel = await createCaptunTunnel({ + url: `${fixture.origin}/demo/__captun-connect`, + fetch: () => new Response("second\n"), + }); + + const response = await fetch(`${fixture.origin}/demo/hello`); + + expect(response).toMatchObject({ status: 200 }); + expect(await response.text()).toBe("second\n"); +}); + test("Captun Worker verifies health through a connected tunnel client", async () => { await using fixture = await createCaptunWorkerFixture({}); using _tunnel = await createCaptunTunnel({ @@ -231,7 +251,10 @@ test("Hosted Captun serves the browser demo module on www", async () => { expect(response).toMatchObject({ status: 200 }); expect(response.headers.get("content-type")).toContain("application/javascript"); - expect(await response.text()).toEqual(expect.stringContaining("createCaptunTunnel")); + const source = await response.text(); + + expect(source).toEqual(expect.stringContaining("createCaptunTunnel")); + expect(source).toEqual(expect.stringContaining("captun-owner-token")); }); test("Hosted Captun landing page includes an in-browser tunnel demo", async () => { @@ -459,6 +482,132 @@ test("Hosted Captun does not trust spoofable forwarded IP headers for rate limit expect(second).toMatchObject({ status: 429 }); }); +test("Hosted Captun rejects a different ownership token while a tunnel is active", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _ownerTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "owner a\n", + clientIp: "203.0.113.70", + }); + + const conflict = await fixture.worker.fetch( + "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-b", + { headers: { "cf-connecting-ip": "203.0.113.71" } }, + ); + const stillOwned = await fixture.worker.fetch("https://demo.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.72" }, + }); + + expect(conflict).toMatchObject({ status: 409 }); + expect(await conflict.text()).toBe("Tunnel name is already connected\n"); + expect(stillOwned).toMatchObject({ status: 200 }); + expect(await stillOwned.text()).toBe("owner a\n"); +}); + +test("Hosted Captun lets the same ownership token replace its active tunnel", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _firstTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "first\n", + clientIp: "203.0.113.80", + }); + using _secondTunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://demo.captun.sh/__captun-connect?captun-owner-token=owner-a", + responseText: "second\n", + clientIp: "203.0.113.81", + }); + + const response = await fixture.worker.fetch("https://demo.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.82" }, + }); + + expect(response).toMatchObject({ status: 200 }); + expect(await response.text()).toBe("second\n"); +}); + +test("Hosted Captun accepts an ownership token header", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + using _tunnel = await createDirectWorkerTunnel({ + fixture, + url: "https://header.captun.sh/__captun-connect", + responseText: "header token\n", + clientIp: "203.0.113.85", + headers: { "x-captun-owner-token": "owner-from-header" }, + }); + + const response = await fixture.worker.fetch("https://header.captun.sh/hello", { + headers: { "cf-connecting-ip": "203.0.113.86" }, + }); + + expect(response).toMatchObject({ status: 200 }); + expect(await response.text()).toBe("header token\n"); +}); + +test("Hosted Captun requires anonymous ownership tokens for public hosted connections", async () => { + await using fixture = await createCaptunWorkerFixture({ + CUSTOM_HOSTNAME: "captun.sh", + HOSTED_CONNECTS_PER_IP_PER_WINDOW: "100", + }); + + const response = await fixture.worker.fetch("https://demo.captun.sh/__captun-connect", { + headers: { "cf-connecting-ip": "203.0.113.90" }, + }); + + expect(response).toMatchObject({ status: 400 }); + expect(await response.text()).toBe("Missing tunnel ownership token\n"); +}); + +test("Captun clients send an anonymous ownership token on the tunnel WebSocket URL", async () => { + await using recorder = await createWebSocketUpgradeRecorder(); + + using _tunnel = await createCaptunTunnel({ + url: `${recorder.origin}/demo/__captun-connect`, + fetch: () => new Response("ok\n"), + }); + + const upgradeUrl = new URL(recorder.upgradeUrl.current || "", recorder.origin); + + expect(upgradeUrl).toMatchObject({ pathname: "/demo/__captun-connect" }); + expect(upgradeUrl.searchParams.get("captun-owner-token")).toMatch(/^[a-f0-9]{32}$/); + expect(_tunnel).toMatchObject({ + ownerToken: upgradeUrl.searchParams.get("captun-owner-token"), + }); +}); + +test("Captun clients can reuse a returned anonymous ownership token", async () => { + await using recorder = await createWebSocketUpgradeRecorder(); + + using firstTunnel = await createCaptunTunnel({ + url: `${recorder.origin}/demo/__captun-connect`, + fetch: () => new Response("first\n"), + }); + using secondTunnel = await createCaptunTunnel({ + url: `${recorder.origin}/demo/__captun-connect`, + ownerToken: firstTunnel.ownerToken, + fetch: () => new Response("second\n"), + }); + + const upgradeUrls = recorder.upgradeUrls.map((url) => new URL(url, recorder.origin)); + + expect(secondTunnel).toMatchObject({ ownerToken: firstTunnel.ownerToken }); + expect(upgradeUrls.map((url) => url.searchParams.get("captun-owner-token"))).toEqual([ + firstTunnel.ownerToken, + firstTunnel.ownerToken, + ]); +}); + test("Captun Worker rejects missing tunnel names before Durable Object dispatch", async () => { await using fixture = await createCaptunWorkerFixture({}); @@ -485,3 +634,90 @@ test("Captun Worker requires the configured secret before accepting a tunnel cli expect(response).toMatchObject({ status: 401 }); expect(await response.text()).toBe("Unauthorized\n"); }); + +async function createWebSocketUpgradeRecorder() { + const upgradeUrl = { current: "" }; + const upgradeUrls: string[] = []; + const sockets = new Set<{ destroy: () => void }>(); + const server = createServer(); + server.on("upgrade", (request, socket) => { + sockets.add(socket); + socket.once("close", () => sockets.delete(socket)); + upgradeUrl.current = request.url || ""; + upgradeUrls.push(upgradeUrl.current); + const key = request.headers["sec-websocket-key"]; + if (typeof key !== "string") { + socket.destroy(); + return; + } + const accept = createHash("sha1") + .update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`) + .digest("base64"); + socket.write( + [ + "HTTP/1.1 101 Switching Protocols", + "Upgrade: websocket", + "Connection: Upgrade", + `Sec-WebSocket-Accept: ${accept}`, + "", + "", + ].join("\r\n"), + ); + }); + await new Promise((resolveListen, rejectListen) => { + server.once("error", rejectListen); + server.listen(0, "127.0.0.1", resolveListen); + }); + const address = server.address(); + if (!address || typeof address === "string") throw new Error("Could not start test server"); + + return { + origin: `http://127.0.0.1:${address.port}`, + upgradeUrl, + upgradeUrls, + async [Symbol.asyncDispose]() { + for (const socket of sockets) socket.destroy(); + await new Promise((resolveClose) => server.close(() => resolveClose())); + }, + }; +} + +async function createDirectWorkerTunnel(options: { + fixture: any; + url: string; + responseText: string; + clientIp: string; + headers?: Record; +}) { + const response = await options.fixture.worker.fetch(options.url, { + headers: { + upgrade: "websocket", + "cf-connecting-ip": options.clientIp, + ...options.headers, + }, + }); + expect(response).toMatchObject({ status: 101 }); + + const socket = response.webSocket; + socket.accept(); + const session = newWebSocketRpcSession(socket, new TestTunnelFetcher(options.responseText)); + + return { + [Symbol.dispose]() { + session[Symbol.dispose](); + }, + }; +} + +class TestTunnelFetcher extends RpcTarget { + private responseText: string; + + constructor(responseText: string) { + super(); + this.responseText = responseText; + } + + fetch() { + return new Response(this.responseText); + } +}