Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 63 additions & 6 deletions frontend/src/lib/components/LearnView.svelte
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<script lang="ts">
import {
fetchTOC,
fetchLessonContent,
fetchGuidedLessonCitations,
streamLessonContent,
streamGuidedChat,
type GuidedLesson,
} from "../services/guided";
Expand Down Expand Up @@ -105,6 +105,8 @@

let guidedChatAbortController: AbortController | null = null;
let guidedChatStreamVersion = 0;
let lessonContentAbortController: AbortController | null = null;
let lessonContentStreamVersion = 0;

/**
* Gets or creates a session ID for a specific lesson.
Expand All @@ -129,6 +131,14 @@
activeStreamingMessageId = null;
}

function cancelInFlightLessonContentStream(): void {
lessonContentStreamVersion++;
if (lessonContentAbortController) {
lessonContentAbortController.abort();
lessonContentAbortController = null;
}
}

// Rendered lesson content - SSR-safe parsing without DOM operations
let renderedLesson = $derived(
lessonMarkdown ? parseMarkdown(lessonMarkdown) : "",
Expand Down Expand Up @@ -158,6 +168,10 @@

async function selectLesson(lesson: GuidedLesson): Promise<void> {
const targetSlug = lesson.slug;
cancelInFlightLessonContentStream();
const activeLessonContentStreamVersion = lessonContentStreamVersion;
lessonContentAbortController = new AbortController();
const lessonContentAbortSignal = lessonContentAbortController.signal;

// Save current chat history before switching lessons
if (selectedLesson && messages.length > 0) {
Expand All @@ -178,10 +192,41 @@
messages = chatHistoryByLesson.get(lesson.slug) ?? [];

try {
const response = await fetchLessonContent(lesson.slug);
// Guard against stale response if user switched lessons
let hasReceivedLessonChunk = false;
await streamLessonContent(lesson.slug, {
signal: lessonContentAbortSignal,
onChunk: (lessonChunk) => {
if (selectedLesson?.slug !== targetSlug) return;
if (
lessonContentStreamVersion !==
activeLessonContentStreamVersion
)
return;
if (lessonContentAbortSignal.aborted) return;
lessonMarkdown += lessonChunk;
if (!hasReceivedLessonChunk) {
hasReceivedLessonChunk = true;
loadingLesson = false;
}
},
onError: (streamError) => {
if (selectedLesson?.slug !== targetSlug) return;
if (
lessonContentStreamVersion !==
activeLessonContentStreamVersion
)
return;
if (lessonContentAbortSignal.aborted) return;
lessonError = streamError.message;
},
});

if (selectedLesson?.slug !== targetSlug) return;
lessonMarkdown = response.markdown;
if (
lessonContentStreamVersion !== activeLessonContentStreamVersion
)
return;
if (lessonContentAbortSignal.aborted) return;

// Fetch citations for the lesson topic (Think Java-only, with explicit error tracking)
fetchGuidedLessonCitations(lesson.slug)
Expand Down Expand Up @@ -219,15 +264,26 @@
});
} catch (error) {
if (selectedLesson?.slug !== targetSlug) return;
if (
lessonContentStreamVersion !== activeLessonContentStreamVersion
)
return;
if (lessonContentAbortSignal.aborted) return;
lessonError =
error instanceof Error
? error.message
: "Failed to load lesson";
lessonMarkdown = "";
} finally {
if (selectedLesson?.slug === targetSlug) {
if (
selectedLesson?.slug === targetSlug &&
lessonContentStreamVersion === activeLessonContentStreamVersion
) {
loadingLesson = false;
}
if (lessonContentStreamVersion === activeLessonContentStreamVersion) {
lessonContentAbortController = null;
}
}
}

Expand All @@ -239,6 +295,7 @@

// Cancel any in-flight stream
cancelInFlightGuidedChatStream();
cancelInFlightLessonContentStream();
isChatDrawerOpen = false;
selectedLesson = null;
lessonMarkdown = "";
Expand Down Expand Up @@ -514,7 +571,7 @@
</div>
{:else}
<div class="lessons-grid">
{#each lessons as lesson, index}
{#each lessons as lesson, index (lesson.slug)}
<button
type="button"
class="lesson-card"
Expand Down
14 changes: 9 additions & 5 deletions frontend/src/lib/components/LearnView.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { render, fireEvent } from "@testing-library/svelte";
import { tick } from "svelte";

const fetchTocMock = vi.fn();
const fetchLessonContentMock = vi.fn();
const streamLessonContentMock = vi.fn();
const fetchGuidedLessonCitationsMock = vi.fn();
const streamGuidedChatMock = vi.fn();

Expand All @@ -13,7 +13,7 @@ vi.mock("../services/guided", async () => {
return {
...actualGuidedService,
fetchTOC: fetchTocMock,
fetchLessonContent: fetchLessonContentMock,
streamLessonContent: streamLessonContentMock,
fetchGuidedLessonCitations: fetchGuidedLessonCitationsMock,
streamGuidedChat: streamGuidedChatMock,
};
Expand All @@ -27,7 +27,7 @@ async function renderLearnView() {
describe("LearnView guided chat streaming stability", () => {
beforeEach(() => {
fetchTocMock.mockReset();
fetchLessonContentMock.mockReset();
streamLessonContentMock.mockReset();
fetchGuidedLessonCitationsMock.mockReset();
streamGuidedChatMock.mockReset();
});
Expand All @@ -41,7 +41,9 @@ describe("LearnView guided chat streaming stability", () => {
{ slug: "intro", title: "Test Lesson", summary: "Lesson summary", keywords: [] },
]);

fetchLessonContentMock.mockResolvedValue({ markdown: "# Lesson", cached: false });
streamLessonContentMock.mockImplementation(async (_slug, callbacks) => {
callbacks.onChunk("# Lesson");
});
fetchGuidedLessonCitationsMock.mockResolvedValue({ success: true, citations: [] });

let completeStream: () => void = () => {
Expand Down Expand Up @@ -107,7 +109,9 @@ describe("LearnView guided chat streaming stability", () => {
{ slug: "intro", title: "Test Lesson", summary: "Lesson summary", keywords: [] },
]);

fetchLessonContentMock.mockResolvedValue({ markdown: "# Lesson", cached: false });
streamLessonContentMock.mockImplementation(async (_slug, callbacks) => {
callbacks.onChunk("# Lesson");
});
fetchGuidedLessonCitationsMock.mockResolvedValue({ success: true, citations: [] });

const fetchMock = vi.fn().mockResolvedValue({
Expand Down
39 changes: 35 additions & 4 deletions frontend/src/lib/services/guided.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { beforeEach, describe, expect, it, vi } from "vitest";

const { streamSseMock } = vi.hoisted(() => {
return { streamSseMock: vi.fn() };
const { streamSseMock, streamSseGetMock } = vi.hoisted(() => {
return { streamSseMock: vi.fn(), streamSseGetMock: vi.fn() };
});

vi.mock("./sse", () => {
return { streamSse: streamSseMock };
return { streamSse: streamSseMock, streamSseGet: streamSseGetMock };
});

import { streamGuidedChat } from "./guided";
import { streamGuidedChat, streamLessonContent } from "./guided";

describe("streamGuidedChat recovery", () => {
beforeEach(() => {
streamSseMock.mockReset();
streamSseGetMock.mockReset();
});

it("retries once for recoverable invalid stream errors before any chunk", async () => {
Expand Down Expand Up @@ -111,4 +112,34 @@ describe("streamGuidedChat recovery", () => {
}),
);
});

it("streams lesson content from the guided content stream endpoint", async () => {
streamSseGetMock.mockImplementationOnce(async (_url, callbacks) => {
callbacks.onText("# Lesson");
});

const onChunk = vi.fn();
const onStatus = vi.fn();
const onError = vi.fn();

await expect(
streamLessonContent("intro", {
onChunk,
onStatus,
onError,
}),
).resolves.toBeUndefined();

expect(streamSseGetMock).toHaveBeenCalledWith(
"/api/guided/content/stream?slug=intro",
expect.objectContaining({
onText: expect.any(Function),
onStatus,
onError,
}),
"guided.lesson-content.ts",
{ signal: undefined },
);
expect(onChunk).toHaveBeenCalledWith("# Lesson");
});
});
29 changes: 29 additions & 0 deletions frontend/src/lib/services/guided.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from "../validation/schemas";
import { validateFetchJson } from "../validation/validate";
import { fetchCitationsByEndpoint, type CitationFetchResult } from "./chat";
import { streamSseGet } from "./sse";
import { streamWithRetry } from "./streamRecovery";

export type { StreamStatus, GuidedLesson, LessonContentResponse };
Expand All @@ -30,6 +31,14 @@ export interface GuidedStreamCallbacks {
signal?: AbortSignal;
}

/** Callbacks for streaming guided lesson markdown. */
export interface GuidedLessonContentCallbacks {
onChunk: (chunk: string) => void;
onStatus?: (status: StreamStatus) => void;
onError?: (error: StreamError) => void;
signal?: AbortSignal;
}

/**
* Fetch the table of contents for guided learning.
* Validates response structure via Zod schema.
Expand Down Expand Up @@ -104,6 +113,26 @@ export async function fetchGuidedLessonCitations(slug: string): Promise<Citation
);
}

/**
* Stream lesson markdown for a guided lesson slug.
* Uses the same SSE parser as chat so server-side dependency failures arrive as structured stream errors.
*/
export async function streamLessonContent(
slug: string,
callbacks: GuidedLessonContentCallbacks,
): Promise<void> {
return streamSseGet(
`/api/guided/content/stream?slug=${encodeURIComponent(slug)}`,
{
onText: callbacks.onChunk,
onStatus: callbacks.onStatus,
onError: callbacks.onError,
},
"guided.lesson-content.ts",
{ signal: callbacks.signal },
);
}

/**
* Stream a chat response within the guided lesson context.
* Uses the same JSON-wrapped SSE format as the main chat for consistent whitespace handling.
Expand Down
33 changes: 33 additions & 0 deletions frontend/src/lib/services/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,39 @@ export async function streamSse(
throw fetchError;
}

await consumeSseResponse(response, callbacks, source, abortSignal);
}

/**
* Streams SSE responses from a GET endpoint with the same parser used for POST streams.
*/
export async function streamSseGet(
url: string,
callbacks: SseCallbacks,
source: string,
options: StreamSseRequestOptions = {},
): Promise<void> {
const abortSignal = options.signal;
let response: Response;

try {
response = await fetch(url, { method: "GET", signal: abortSignal });
} catch (fetchError) {
if (abortSignal?.aborted || isAbortError(fetchError)) {
return;
}
throw fetchError;
}

await consumeSseResponse(response, callbacks, source, abortSignal);
}

async function consumeSseResponse(
response: Response,
callbacks: SseCallbacks,
source: string,
abortSignal?: AbortSignal,
): Promise<void> {
if (!response.ok) {
const apiMessage = await extractApiErrorMessage(response, `streamSse:${source}`);
const errorMessage =
Expand Down
Loading