Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"exports": "./mod.ts",
"imports": {
"@david/console-static-text": "jsr:@david/console-static-text@^0.3.4",
"@david/shell": "jsr:@david/shell@^0.5.0",
"@david/shell": "jsr:@david/shell@^0.5.1",
"@deno/dnt": "jsr:@deno/dnt@^0.42.3",
"@david/path": "jsr:@david/path@^0.3.2",
"@std/assert": "jsr:@std/assert@1",
Expand Down
14 changes: 5 additions & 9 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 47 additions & 41 deletions src/console/ttyFallback.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as fs from "node:fs";
import { Readable } from "node:stream";
import * as tty from "node:tty";
import process from "node:process";

Expand All @@ -17,13 +16,10 @@ export interface TtyStdin {
interface OpenedTty {
fd: number;
stream: tty.ReadStream;
reader: ReadableStreamDefaultReader<Uint8Array>;
}

// undefined = not yet attempted; null = attempted and failed
let cached: OpenedTty | null | undefined;
let pendingTtyRead: Promise<ReadableStreamReadResult<Uint8Array>> | undefined;
let ttyLeftover: Uint8Array | undefined;

function tryOpen(): OpenedTty | null {
if (cached !== undefined) return cached;
Expand All @@ -33,12 +29,7 @@ function tryOpen(): OpenedTty | null {
// and on some platforms that's only granted when opened for writing too.
const fd = fs.openSync(path, "r+");
const stream = new tty.ReadStream(fd);
// wrapping the stream locks it to one consumer; cache so an aborted
// read doesn't lose bytes — the in-flight read and leftover buffer
// are tied to this reader, same pattern as @david/shell's stdin.
const readable = Readable.toWeb(stream) as ReadableStream<Uint8Array>;
const reader = readable.getReader();
cached = { fd, stream, reader };
cached = { fd, stream };
} catch {
cached = null;
}
Expand All @@ -54,26 +45,12 @@ export function hasFallbackTty(): boolean {
* use if no controlling terminal is available — guard with
* {@link hasFallbackTty} first. */
export const ttyStdin: TtyStdin = {
async read(p: Uint8Array, options?: { signal?: AbortSignal }): Promise<number | null> {
read(p: Uint8Array, options?: { signal?: AbortSignal }): Promise<number | null> {
const signal = options?.signal;
signal?.throwIfAborted();
const handle = tryOpen();
if (handle == null) throw new Error("No controlling terminal available.");

if (ttyLeftover === undefined) {
// share a single in-flight read across callers so an aborted read
// doesn't drop bytes — the next call awaits the same promise.
pendingTtyRead ??= handle.reader.read();
const result = await (signal ? raceAbort(pendingTtyRead, signal) : pendingTtyRead);
pendingTtyRead = undefined;
if (result.done || result.value === undefined) return null;
ttyLeftover = result.value;
}

const len = Math.min(ttyLeftover.length, p.length);
p.set(ttyLeftover.subarray(0, len));
ttyLeftover = ttyLeftover.length > len ? ttyLeftover.subarray(len) : undefined;
return len;
if (handle == null) return Promise.reject(new Error("No controlling terminal available."));
return readTty(handle.stream, p, signal);
},
setRaw(mode: boolean): void {
const handle = tryOpen();
Expand All @@ -82,19 +59,48 @@ export const ttyStdin: TtyStdin = {
},
};

function raceAbort<T>(promise: Promise<T>, signal: AbortSignal): Promise<T> {
return new Promise<T>((resolve, reject) => {
const onAbort = () => reject(signal.reason);
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(v) => {
signal.removeEventListener("abort", onAbort);
resolve(v);
},
(e) => {
signal.removeEventListener("abort", onAbort);
reject(e);
},
);
// reads directly from the tty.ReadStream in paused mode by listening for a
// single `readable` event and detaching afterwards. Same pattern @david/shell
// uses for stdin: leaves the stream in a clean, handoff-able state, and is
// cleanly abortable — aborting just removes the listener, with no read syscall
// left outstanding to steal a byte from the next reader.
function readTty(stream: tty.ReadStream, p: Uint8Array, signal?: AbortSignal): Promise<number | null> {
return new Promise<number | null>((resolve, reject) => {
const onReadable = () => {
const chunk = stream.read() as Uint8Array | null;
if (chunk === null) return; // nothing buffered yet; wait for the next event
cleanup();
const len = Math.min(chunk.length, p.length);
p.set(chunk.subarray(0, len));
// put any bytes we didn't consume back into the stream rather than a
// private buffer, so the next reader still sees them.
if (chunk.length > len) stream.unshift(chunk.subarray(len));
resolve(len);
};
const onEnd = () => {
cleanup();
resolve(null);
};
const onError = (err: Error) => {
cleanup();
reject(err);
};
const onAbort = () => {
cleanup();
reject(signal!.reason);
};
const cleanup = () => {
stream.off("readable", onReadable);
stream.off("end", onEnd);
stream.off("error", onError);
signal?.removeEventListener("abort", onAbort);
};

stream.on("readable", onReadable);
stream.on("end", onEnd);
stream.on("error", onError);
signal?.addEventListener("abort", onAbort, { once: true });
// data may already be buffered, so attempt a read right away.
onReadable();
});
}
59 changes: 59 additions & 0 deletions src/test/aborted-prompts-visual-inspection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import process from "node:process";
import $ from "../../mod.ts";

// reproduces the bug where aborted maybePrompts left dangling stdin reads
// that would silently swallow keystrokes from the next prompt.
//
// before the fix: the visible prompt at the end took N+1 enters to accept
// (one per aborted prompt that had a pending read, plus the real one).
// after the fix: a single enter accepts.

const ABORTED_COUNT = 3;

for (let i = 0; i < ABORTED_COUNT; i++) {
const ac = new AbortController();
// give the prompt a moment to display and call reader.read, then abort.
setTimeout(() => ac.abort(), 500);
const result = await $.maybePrompt({
message: `prompt #${i + 1} (will be aborted)`,
signal: ac.signal,
noClear: true,
});
$.log(` -> aborted prompt #${i + 1} resolved to:`, result);
}

$.log(
`\nnow type something and press enter ONCE.\n`
+ `pre-fix: needed ~${ABORTED_COUNT + 1} enters before this prompt accepted input.\n`,
);

const answer = await $.prompt("final prompt");
$.log("got:", answer);

// after the prompts are done, stdin should be in a clean handoff-able
// state — read another line directly via process.stdin to confirm we
// haven't left listeners or a held lock that would block another reader.
$.log("\nnow type another line and press enter — read directly from process.stdin:");
const line = await new Promise<string>((resolve, reject) => {
let buf = "";
const onData = (chunk: Buffer) => {
buf += chunk.toString("utf8");
const newline = buf.indexOf("\n");
if (newline === -1) return;
cleanup();
resolve(buf.slice(0, newline).replace(/\r$/, ""));
};
const onError = (err: Error) => {
cleanup();
reject(err);
};
const cleanup = () => {
process.stdin.off("data", onData);
process.stdin.off("error", onError);
process.stdin.pause();
};
process.stdin.on("data", onData);
process.stdin.on("error", onError);
process.stdin.resume();
});
$.log("read via process.stdin:", line);
Loading