Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 49 additions & 26 deletions src/server/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,35 @@ function sendPtyReady(entry: { viewer: WebSocket | null; alive: boolean }): bool
return true;
}

/** True when `entry` is still the live, current owner of `session` and `ws`
* is still its active viewer. After every `await`, the entry may have been
* torn down, replaced by a takeover, or had its viewer displaced — bail
* if any of that happened. */
function entryStillCurrent(
entry: { alive: boolean; viewer: WebSocket | null },
session: string,
ws: WebSocket,
): boolean {
return entry.alive
&& (activePtySessions.get(session) as unknown) === entry
&& entry.viewer === ws;
}

/** Try-close a WebSocket and log failures at debug. The close codes used
* here (DISPLACED/SESSION_UNAVAILABLE/PTY_TEARDOWN/SERVER_ERROR/NORMAL)
* always succeed in practice; the try/catch exists to guard against the
* socket already being in a non-OPEN state. */
function tryWsClose(
target: WebSocket,
code: number,
reason: string,
logMsg: string,
session: string,
): void {
try { target.close(code, reason); }
catch (e: unknown) { log.debug(logMsg, { session, error: errMsg(e) }); }
}

/** Send prefill buffer in 32KB chunks with short delays to avoid stalling mobile connections.
* Sends `prefill_done` message at the end so the client exits buffering state. */
async function sendPrefillChunked(
Expand Down Expand Up @@ -231,11 +260,11 @@ export function teardownPty(session: string): void {
entry.unsubscribeLifecycle = null;
}
if (entry.viewer) {
try { entry.viewer.close(CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN); } catch (e: unknown) { log.debug(`teardownPty: viewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(entry.viewer, CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN, "teardownPty: viewer close failed", session);
entry.viewer = null;
}
if (entry.pendingViewer) {
try { entry.pendingViewer.close(CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN); } catch (e: unknown) { log.debug(`teardownPty: pendingViewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(entry.pendingViewer, CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN, "teardownPty: pendingViewer close failed", session);
entry.pendingViewer = null;
}
if (entry.proc) {
Expand Down Expand Up @@ -271,7 +300,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void
const oldViewer = existing.viewer;
existing.viewer = null;
if (oldViewer) {
try { oldViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`takeover: oldViewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(oldViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "takeover: oldViewer close failed", session);
}
if (existing.unsubscribe) {
existing.unsubscribe();
Expand All @@ -289,7 +318,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void
try { oldProc.kill(); } catch (e: unknown) { log.debug(`takeover: proc kill failed`, { session, error: errMsg(e) }); }
}
if (existing.pendingViewer) {
try { existing.pendingViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`displaced pendingViewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(existing.pendingViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "displaced pendingViewer close failed", session);
existing.pendingViewer = null;
}
setupNewPtyEntry(ws, session, dims);
Expand All @@ -301,7 +330,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void

// If there's already a pending viewer, close it
if (existing.pendingViewer) {
try { existing.pendingViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`displaced pendingViewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(existing.pendingViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "displaced pendingViewer close failed", session);
}
existing.pendingViewer = ws;

Expand All @@ -315,7 +344,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void
function cleanupPending() {
clearInterval(pingTimer);
if (existing.pendingViewer && existing.pendingViewer !== ws) {
try { existing.pendingViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`cleanupPending: displaced other pending`, { session, error: errMsg(e) }); }
tryWsClose(existing.pendingViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "cleanupPending: displaced other pending", session);
}
ws.removeListener("message", pendingMessage);
ws.removeListener("close", cleanup);
Expand Down Expand Up @@ -396,9 +425,7 @@ function setupNewPtyEntry(

if (!streamingBackend) {
log.warn("ws attach: streaming backend unavailable", { session });
try { ws.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) {
log.debug("streaming backend missing close failed", { session, error: errMsg(e) });
}
tryWsClose(ws, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "streaming backend missing close failed", session);
activePtySessions.delete(session);
return;
}
Expand Down Expand Up @@ -433,14 +460,14 @@ function setupNewPtyEntry(
entry.alive = false;
activePtySessions.delete(session);
if (entry.viewer) {
try { entry.viewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`session unavailable: viewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(entry.viewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "session unavailable: viewer close failed", session);
entry.viewer = null;
}
return;
}
}

if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return;
if (!entryStillCurrent(entry, session, ws)) return;

// Skip settle wait when there's no snapshot to take — the only purpose
// of waiting is to capture the SIGWINCH redraw in the snapshot, but
Expand Down Expand Up @@ -478,7 +505,7 @@ function setupNewPtyEntry(
const settleStart = Date.now();
let lastChangeAt = -1;
while (true) {
if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return;
if (!entryStillCurrent(entry, session, ws)) return;
const elapsedTotal = Date.now() - settleStart;
if (elapsedTotal >= PRE_SNAPSHOT_RESIZE_TIMEOUT_MS) break;
if (lastChangeAt < 0) {
Expand All @@ -498,7 +525,7 @@ function setupNewPtyEntry(
}
}

if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return;
if (!entryStillCurrent(entry, session, ws)) return;

// Apply settled dims and wait for the SIGWINCH-triggered redraw to
// fully land in the broker output stream BEFORE snapshotting.
Expand Down Expand Up @@ -539,7 +566,7 @@ function setupNewPtyEntry(
let lastResizeAt = Date.now();
const settleStart = lastResizeAt;
while (true) {
if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) break;
if (!entryStillCurrent(entry, session, ws)) break;
// Dim changed since we last applied? Re-resize and restart the
// quiescence clock so we capture this redraw too.
if (
Expand Down Expand Up @@ -600,7 +627,7 @@ function setupNewPtyEntry(
}
}

if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return;
if (!entryStillCurrent(entry, session, ws)) return;

// Final reconciliation catches resize frames that arrived while the
// snapshot bytes were being fetched/sent. The resulting SIGWINCH
Expand All @@ -613,7 +640,7 @@ function setupNewPtyEntry(
) {
appliedSize = latestRequestedSize;
await backend.resize(session, appliedSize.cols, appliedSize.rows);
if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return;
if (!entryStillCurrent(entry, session, ws)) return;
}

// Subscribe after prefill. sinceSeq: prefillSeq replays any broker output
Expand Down Expand Up @@ -671,9 +698,7 @@ function setupNewPtyEntry(
log.warn("subscribe rpc failed — tearing down viewer", { session, error: errMsg(err) });
if (!entry.alive) return;
if (entry.viewer && entry.viewer.readyState === 1) {
try { entry.viewer.close(CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED); } catch (e: unknown) {
log.debug("subscribe-error: viewer close failed", { session, error: errMsg(e) });
}
tryWsClose(entry.viewer, CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED, "subscribe-error: viewer close failed", session);
}
teardownPty(session);
},
Expand All @@ -683,7 +708,7 @@ function setupNewPtyEntry(
entry.alive = false;
activePtySessions.delete(session);
if (entry.viewer) {
try { entry.viewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`onSessionData null: viewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(entry.viewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "onSessionData null: viewer close failed", session);
entry.viewer = null;
}
return;
Expand All @@ -709,9 +734,7 @@ function setupNewPtyEntry(
if (!entry.alive) return;
log.warn("replay_truncated — forcing client reconnect for fresh snapshot", { session });
if (entry.viewer && entry.viewer.readyState === 1) {
try { entry.viewer.close(CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED); } catch (e: unknown) {
log.debug(`replay_truncated: viewer close failed`, { session, error: errMsg(e) });
}
tryWsClose(entry.viewer, CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED, "replay_truncated: viewer close failed", session);
}
teardownPty(session);
return;
Expand All @@ -730,11 +753,11 @@ function setupNewPtyEntry(
// exit anyway, and we're inside the callback. Just null the ref.
entry.unsubscribeLifecycle = null;
if (entry.viewer) {
try { entry.viewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`lifecycle exit: viewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(entry.viewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "lifecycle exit: viewer close failed", session);
entry.viewer = null;
}
if (entry.pendingViewer) {
try { entry.pendingViewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`lifecycle exit: pendingViewer close failed`, { session, error: errMsg(e) }); }
tryWsClose(entry.pendingViewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "lifecycle exit: pendingViewer close failed", session);
entry.pendingViewer = null;
}
});
Expand Down Expand Up @@ -825,7 +848,7 @@ function setupNewPtyEntry(

function detach() {
clearInterval(pingTimer);
if (entry.alive && entry.viewer === ws && activePtySessions.get(session) === entry) {
if (entryStillCurrent(entry, session, ws)) {
entry.viewer = null;
teardownPty(session);
}
Expand Down
Loading