diff --git a/src/server/websocket.ts b/src/server/websocket.ts index 17ffdae..503a0e8 100644 --- a/src/server/websocket.ts +++ b/src/server/websocket.ts @@ -181,6 +181,35 @@ function sendPtyReady(entry: { viewer: WebSocket | null; alive: boolean }): bool return true; } +/** True when `entry` is still the live, current owner of `session` and `ws` + * is still its active viewer. After every `await`, the entry may have been + * torn down, replaced by a takeover, or had its viewer displaced — bail + * if any of that happened. */ +function entryStillCurrent( + entry: { alive: boolean; viewer: WebSocket | null }, + session: string, + ws: WebSocket, +): boolean { + return entry.alive + && (activePtySessions.get(session) as unknown) === entry + && entry.viewer === ws; +} + +/** Try-close a WebSocket and log failures at debug. The close codes used + * here (DISPLACED/SESSION_UNAVAILABLE/PTY_TEARDOWN/SERVER_ERROR/NORMAL) + * always succeed in practice; the try/catch exists to guard against the + * socket already being in a non-OPEN state. */ +function tryWsClose( + target: WebSocket, + code: number, + reason: string, + logMsg: string, + session: string, +): void { + try { target.close(code, reason); } + catch (e: unknown) { log.debug(logMsg, { session, error: errMsg(e) }); } +} + /** Send prefill buffer in 32KB chunks with short delays to avoid stalling mobile connections. * Sends `prefill_done` message at the end so the client exits buffering state. */ async function sendPrefillChunked( @@ -231,11 +260,11 @@ export function teardownPty(session: string): void { entry.unsubscribeLifecycle = null; } if (entry.viewer) { - try { entry.viewer.close(CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN); } catch (e: unknown) { log.debug(`teardownPty: viewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(entry.viewer, CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN, "teardownPty: viewer close failed", session); entry.viewer = null; } if (entry.pendingViewer) { - try { entry.pendingViewer.close(CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN); } catch (e: unknown) { log.debug(`teardownPty: pendingViewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(entry.pendingViewer, CLOSE_CODE_NORMAL, WS_CLOSE_REASONS.PTY_TEARDOWN, "teardownPty: pendingViewer close failed", session); entry.pendingViewer = null; } if (entry.proc) { @@ -271,7 +300,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void const oldViewer = existing.viewer; existing.viewer = null; if (oldViewer) { - try { oldViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`takeover: oldViewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(oldViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "takeover: oldViewer close failed", session); } if (existing.unsubscribe) { existing.unsubscribe(); @@ -289,7 +318,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void try { oldProc.kill(); } catch (e: unknown) { log.debug(`takeover: proc kill failed`, { session, error: errMsg(e) }); } } if (existing.pendingViewer) { - try { existing.pendingViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`displaced pendingViewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(existing.pendingViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "displaced pendingViewer close failed", session); existing.pendingViewer = null; } setupNewPtyEntry(ws, session, dims); @@ -301,7 +330,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void // If there's already a pending viewer, close it if (existing.pendingViewer) { - try { existing.pendingViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`displaced pendingViewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(existing.pendingViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "displaced pendingViewer close failed", session); } existing.pendingViewer = ws; @@ -315,7 +344,7 @@ export function handlePtyWs(ws: WebSocket, session: string, reset = false): void function cleanupPending() { clearInterval(pingTimer); if (existing.pendingViewer && existing.pendingViewer !== ws) { - try { existing.pendingViewer.close(CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED); } catch (e: unknown) { log.debug(`cleanupPending: displaced other pending`, { session, error: errMsg(e) }); } + tryWsClose(existing.pendingViewer, CLOSE_CODE_DISPLACED, WS_CLOSE_REASONS.DISPLACED, "cleanupPending: displaced other pending", session); } ws.removeListener("message", pendingMessage); ws.removeListener("close", cleanup); @@ -396,9 +425,7 @@ function setupNewPtyEntry( if (!streamingBackend) { log.warn("ws attach: streaming backend unavailable", { session }); - try { ws.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { - log.debug("streaming backend missing close failed", { session, error: errMsg(e) }); - } + tryWsClose(ws, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "streaming backend missing close failed", session); activePtySessions.delete(session); return; } @@ -433,14 +460,14 @@ function setupNewPtyEntry( entry.alive = false; activePtySessions.delete(session); if (entry.viewer) { - try { entry.viewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`session unavailable: viewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(entry.viewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "session unavailable: viewer close failed", session); entry.viewer = null; } return; } } - if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return; + if (!entryStillCurrent(entry, session, ws)) return; // Skip settle wait when there's no snapshot to take — the only purpose // of waiting is to capture the SIGWINCH redraw in the snapshot, but @@ -478,7 +505,7 @@ function setupNewPtyEntry( const settleStart = Date.now(); let lastChangeAt = -1; while (true) { - if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return; + if (!entryStillCurrent(entry, session, ws)) return; const elapsedTotal = Date.now() - settleStart; if (elapsedTotal >= PRE_SNAPSHOT_RESIZE_TIMEOUT_MS) break; if (lastChangeAt < 0) { @@ -498,7 +525,7 @@ function setupNewPtyEntry( } } - if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return; + if (!entryStillCurrent(entry, session, ws)) return; // Apply settled dims and wait for the SIGWINCH-triggered redraw to // fully land in the broker output stream BEFORE snapshotting. @@ -539,7 +566,7 @@ function setupNewPtyEntry( let lastResizeAt = Date.now(); const settleStart = lastResizeAt; while (true) { - if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) break; + if (!entryStillCurrent(entry, session, ws)) break; // Dim changed since we last applied? Re-resize and restart the // quiescence clock so we capture this redraw too. if ( @@ -600,7 +627,7 @@ function setupNewPtyEntry( } } - if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return; + if (!entryStillCurrent(entry, session, ws)) return; // Final reconciliation catches resize frames that arrived while the // snapshot bytes were being fetched/sent. The resulting SIGWINCH @@ -613,7 +640,7 @@ function setupNewPtyEntry( ) { appliedSize = latestRequestedSize; await backend.resize(session, appliedSize.cols, appliedSize.rows); - if (!entry.alive || activePtySessions.get(session) !== entry || entry.viewer !== ws) return; + if (!entryStillCurrent(entry, session, ws)) return; } // Subscribe after prefill. sinceSeq: prefillSeq replays any broker output @@ -671,9 +698,7 @@ function setupNewPtyEntry( log.warn("subscribe rpc failed — tearing down viewer", { session, error: errMsg(err) }); if (!entry.alive) return; if (entry.viewer && entry.viewer.readyState === 1) { - try { entry.viewer.close(CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED); } catch (e: unknown) { - log.debug("subscribe-error: viewer close failed", { session, error: errMsg(e) }); - } + tryWsClose(entry.viewer, CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED, "subscribe-error: viewer close failed", session); } teardownPty(session); }, @@ -683,7 +708,7 @@ function setupNewPtyEntry( entry.alive = false; activePtySessions.delete(session); if (entry.viewer) { - try { entry.viewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`onSessionData null: viewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(entry.viewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "onSessionData null: viewer close failed", session); entry.viewer = null; } return; @@ -709,9 +734,7 @@ function setupNewPtyEntry( if (!entry.alive) return; log.warn("replay_truncated — forcing client reconnect for fresh snapshot", { session }); if (entry.viewer && entry.viewer.readyState === 1) { - try { entry.viewer.close(CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED); } catch (e: unknown) { - log.debug(`replay_truncated: viewer close failed`, { session, error: errMsg(e) }); - } + tryWsClose(entry.viewer, CLOSE_CODE_SERVER_ERROR, WS_CLOSE_REASONS.SUBSCRIBE_FAILED, "replay_truncated: viewer close failed", session); } teardownPty(session); return; @@ -730,11 +753,11 @@ function setupNewPtyEntry( // exit anyway, and we're inside the callback. Just null the ref. entry.unsubscribeLifecycle = null; if (entry.viewer) { - try { entry.viewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`lifecycle exit: viewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(entry.viewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "lifecycle exit: viewer close failed", session); entry.viewer = null; } if (entry.pendingViewer) { - try { entry.pendingViewer.close(CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE); } catch (e: unknown) { log.debug(`lifecycle exit: pendingViewer close failed`, { session, error: errMsg(e) }); } + tryWsClose(entry.pendingViewer, CLOSE_CODE_SESSION_UNAVAILABLE, WS_CLOSE_REASONS.SESSION_UNAVAILABLE, "lifecycle exit: pendingViewer close failed", session); entry.pendingViewer = null; } }); @@ -825,7 +848,7 @@ function setupNewPtyEntry( function detach() { clearInterval(pingTimer); - if (entry.alive && entry.viewer === ws && activePtySessions.get(session) === entry) { + if (entryStillCurrent(entry, session, ws)) { entry.viewer = null; teardownPty(session); }