Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions packages/vinext/src/server/app-page-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,19 @@ export function deferUntilStreamConsumed(
const reader = piped.getReader();
return new ReadableStream<Uint8Array>({
pull(controller) {
return reader.read().then(({ done, value }) => {
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
});
return reader.read().then(
({ done, value }) => {
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
(error) => {
once();
controller.error(error);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: after controller.error(error), the reader from the piped TransformStream is left in a locked state without being explicitly cancelled. In practice this is fine because the outer ReadableStream is now errored and will never call pull() or cancel() again, so the reader (and the underlying pipe) will be GC'd. But if you wanted to be defensive about releasing the lock eagerly, you could add reader.cancel() after controller.error(error). Not blocking — the current behavior is correct.

},
);
},
cancel(reason) {
// Stream cancelled before fully consumed (e.g. client disconnected).
Expand Down
55 changes: 55 additions & 0 deletions tests/app-page-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from "vite-plus/test";
import {
createAppPageFontData,
createAppPageRscErrorTracker,
deferUntilStreamConsumed,
renderAppPageHtmlResponse,
renderAppPageHtmlStream,
renderAppPageHtmlStreamWithRecovery,
Expand Down Expand Up @@ -100,6 +101,60 @@ describe("app page stream helpers", () => {
expect(contextCleared).toHaveLength(1);
});

it("calls onFlush when the upstream stream errors mid-consumption", async () => {
const onFlush = vi.fn();
const streamError = new Error("component threw during streaming");

// Emit one chunk, then error on the next pull — simulates a component
// throwing partway through RSC/SSR streaming.
let pullCount = 0;
const source = new ReadableStream<Uint8Array>({
pull(controller) {
pullCount++;
if (pullCount === 1) {
controller.enqueue(new TextEncoder().encode("partial"));
} else {
controller.error(streamError);
}
},
});

const wrapped = deferUntilStreamConsumed(source, onFlush);
const reader = wrapped.getReader();

// First read succeeds with the enqueued chunk.
const { value } = await reader.read();
expect(new TextDecoder().decode(value)).toBe("partial");

// Second read should surface the upstream error.
await expect(reader.read()).rejects.toThrow("component threw during streaming");

// onFlush must have been called despite the error — this is the bug fix.
expect(onFlush).toHaveBeenCalledTimes(1);
});

it("calls onFlush only once when the stream errors then is cancelled", async () => {
const onFlush = vi.fn();
const streamError = new Error("stream error");

// Error on the very first pull — simulates immediate failure.
const source = new ReadableStream<Uint8Array>({
pull(controller) {
controller.error(streamError);
},
});

const wrapped = deferUntilStreamConsumed(source, onFlush);
const reader = wrapped.getReader();

// Reading the errored stream triggers the error handler.
await expect(reader.read()).rejects.toThrow("stream error");

// The idempotent once() guard prevents double invocation — even if
// some code path triggered cleanup again, onFlush fires exactly once.
expect(onFlush).toHaveBeenCalledTimes(1);
});
Comment on lines +136 to +156
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name says "errors then is cancelled" but the test doesn't actually exercise a cancel-after-error sequence — it only tests an immediate error on first pull and verifies onFlush is called once. The test is still valuable (it covers the immediate-error path and the once() guard), but the name is slightly misleading. Consider renaming to something like "calls onFlush exactly once on immediate stream error" to match what's actually being asserted.


it("builds an HTML response, including link headers, and defers clearing request context until after body is consumed", async () => {
const clearRequestContext = vi.fn();

Expand Down
Loading