From f3045cc153ed58daaa215fd49e31280bda4cf7ed Mon Sep 17 00:00:00 2001 From: Adrian Rocke Date: Thu, 23 Apr 2026 12:14:13 -0500 Subject: [PATCH 1/4] chore: remove old copy stream function --- src/db.test.ts | 10 +++++----- src/db.ts | 16 +--------------- .../data-access/machineGlossRepository.ts | 4 ++-- 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/src/db.test.ts b/src/db.test.ts index 9682f5c3..c3c5da3e 100644 --- a/src/db.test.ts +++ b/src/db.test.ts @@ -1,11 +1,11 @@ import { initializeDatabase } from "@/tests/vitest/dbUtils"; import { beforeEach, describe, expect, test } from "vitest"; -import { copyStreamV2, query } from "./db"; +import { copyStream, query } from "./db"; import { Readable } from "stream"; initializeDatabase(); -describe("copyStreamV2", () => { +describe("copyStream", () => { const sourceData: Array<{ id: number; flag: boolean; @@ -39,7 +39,7 @@ describe("copyStreamV2", () => { }); test("can stream data into a table", async () => { - await copyStreamV2({ + await copyStream({ table: "test", stream: Readable.from(sourceData), fields: { @@ -64,7 +64,7 @@ describe("copyStreamV2", () => { }); test("can stream a subset of columns into a table", async () => { - await copyStreamV2({ + await copyStream({ table: "test", stream: Readable.from(sourceData), fields: { @@ -91,7 +91,7 @@ describe("copyStreamV2", () => { chunks.push(sourceData.slice(10 * i, 10 * (i + 1))); } - await copyStreamV2({ + await copyStream({ table: "test", stream: Readable.from(chunks), fields: { diff --git a/src/db.ts b/src/db.ts index 47e05f09..a3b2876f 100644 --- a/src/db.ts +++ b/src/db.ts @@ -139,20 +139,6 @@ export async function queryStream( return stream; } -export async function copyStream( - table: string, - stream: Readable, -): Promise { - const client = await getPool().connect(); - - try { - const dbStream = client.query(copyFrom(`copy ${table} from stdin`)); - await pipeline(stream, dbStream); - } finally { - client.release(); - } -} - export async function transaction( tx: (q: typeof query) => Promise, ): Promise { @@ -196,7 +182,7 @@ export async function reconnect() { _pool = undefined; } -export async function copyStreamV2< +export async function copyStream< Record = unknown, Table extends keyof Database = keyof Database, >({ diff --git a/src/modules/translation/data-access/machineGlossRepository.ts b/src/modules/translation/data-access/machineGlossRepository.ts index ec809cd8..48efe6d5 100644 --- a/src/modules/translation/data-access/machineGlossRepository.ts +++ b/src/modules/translation/data-access/machineGlossRepository.ts @@ -1,4 +1,4 @@ -import { copyStreamV2, getDb } from "@/db"; +import { copyStream, getDb } from "@/db"; import { Readable, Transform } from "stream"; export interface StreamedMachineGloss { @@ -30,7 +30,7 @@ export const machineGlossRepository = { .where("language_id", "=", languageId) .execute(); - await copyStreamV2({ + await copyStream({ table: "machine_gloss", stream: stream.pipe(new FilterMissingWordsTransform(wordIdSet)), fields: { From 3c77c2f2bc0d52fcc6335fd25c06ba8e7c43c595 Mon Sep 17 00:00:00 2001 From: Adrian Rocke Date: Thu, 23 Apr 2026 15:58:15 -0500 Subject: [PATCH 2/4] fix: use compose to propagate stream errors --- package-lock.json | 8 +- package.json | 2 +- src/db.test.ts | 42 ++++++- .../machineGlossRepository.test.ts | 115 +++++++++++++++++- .../data-access/machineGlossRepository.ts | 49 +++++++- .../translation/jobs/importAIGlosses.ts | 47 ++----- 6 files changed, 214 insertions(+), 49 deletions(-) diff --git a/package-lock.json b/package-lock.json index 4339eee4..e1748f5b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -41,7 +41,7 @@ "@types/d3": "7.4.3", "@types/dompurify": "3.0.5", "@types/lodash": "4.17.7", - "@types/node": "22.19.15", + "@types/node": "22.19.17", "@types/nodemailer": "6.4.15", "@types/pg": "8.11.6", "@types/pg-copy-streams": "1.2.5", @@ -7306,9 +7306,9 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "22.19.15", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.15.tgz", - "integrity": "sha512-F0R/h2+dsy5wJAUe3tAU6oqa2qbWY5TpNfL/RGmo1y38hiyO1w3x2jPtt76wmuaJI4DQnOBu21cNXQ2STIUUWg==", + "version": "22.19.17", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.19.17.tgz", + "integrity": "sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==", "dev": true, "license": "MIT", "dependencies": { diff --git a/package.json b/package.json index b33b82fa..c3a38126 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ "@types/d3": "7.4.3", "@types/dompurify": "3.0.5", "@types/lodash": "4.17.7", - "@types/node": "22.19.15", + "@types/node": "22.19.17", "@types/nodemailer": "6.4.15", "@types/pg": "8.11.6", "@types/pg-copy-streams": "1.2.5", diff --git a/src/db.test.ts b/src/db.test.ts index c3c5da3e..230bd5bc 100644 --- a/src/db.test.ts +++ b/src/db.test.ts @@ -1,7 +1,7 @@ import { initializeDatabase } from "@/tests/vitest/dbUtils"; import { beforeEach, describe, expect, test } from "vitest"; import { copyStream, query } from "./db"; -import { Readable } from "stream"; +import { PassThrough, Readable } from "stream"; initializeDatabase(); @@ -114,4 +114,44 @@ describe("copyStream", () => { })), ); }); + + test("errors in copy stream are handled", async () => { + const result = copyStream({ + table: "missing_table", + fields: { + id: (record: any) => record.id.toString(), + flag: (record: any) => record.flag.toString(), + content: (record: any) => record.content, + total: (record: any) => record.total.toString(), + created_at: (record: any) => record.createdAt.toISOString(), + }, + stream: Readable.from(sourceData), + }); + + await expect(result).rejects.toThrowError(); + }); + + test("errors in source stream are handled", async () => { + async function* testData() { + yield sourceData[0]; + throw new Error("test error"); + } + + const result = copyStream({ + table: "missing_table", + fields: { + id: (record: any) => record.id.toString(), + flag: (record: any) => record.flag.toString(), + content: (record: any) => record.content, + total: (record: any) => record.total.toString(), + created_at: (record: any) => record.createdAt.toISOString(), + }, + // This extra pass through ensures that we cover the case where the error is a stream not directly passed to the db function. + stream: Readable.from(testData()).compose( + new PassThrough({ objectMode: true }), + ), + }); + + await expect(result).rejects.toThrowError(new Error("test error")); + }); }); diff --git a/src/modules/translation/data-access/machineGlossRepository.test.ts b/src/modules/translation/data-access/machineGlossRepository.test.ts index a8c05457..c1689cec 100644 --- a/src/modules/translation/data-access/machineGlossRepository.test.ts +++ b/src/modules/translation/data-access/machineGlossRepository.test.ts @@ -1,9 +1,10 @@ import { initializeDatabase } from "@/tests/vitest/dbUtils"; import { getDb } from "@/db"; -import { beforeEach, describe, expect, test } from "vitest"; +import { beforeEach, describe, expect, test, vi } from "vitest"; import { machineGlossRepository } from "./machineGlossRepository"; import { Readable } from "stream"; import { languageFactory } from "@/modules/languages/test-utils/languageFactory"; +import type { AIGlossChapter } from "./aiGlossImportService"; initializeDatabase(); @@ -115,11 +116,23 @@ describe("updateAllForLanguage", () => { gloss: "Should be dropped", }, ]; + const chapters: Array = [ + { + bookId: 1, + chapterNumber: 1, + glosses: newGlosses.slice(0, 2), + }, + { + bookId: 1, + chapterNumber: 1, + glosses: newGlosses.slice(2), + }, + ]; await machineGlossRepository.updateAllForLanguage({ languageId: spaLanguage.id, modelCode: "llm_import", - stream: Readable.from(newGlosses), + stream: Readable.from(chapters), }); const insertedGlosses = await getDb() @@ -138,4 +151,102 @@ describe("updateAllForLanguage", () => { })), ]); }); + + test("tracks progress when streaming AI gloss chapters", async () => { + const { language } = await languageFactory.build({ + members: [], + }); + + const onBookIdChange = vi.fn().mockResolvedValue(undefined); + const chapterStream: Array = [ + { + bookId: 1, + chapterNumber: 1, + glosses: [{ wordId: "0100100101", gloss: "One" }], + }, + { + bookId: 1, + chapterNumber: 2, + glosses: [{ wordId: "0100100102", gloss: "Two" }], + }, + { + bookId: 2, + chapterNumber: 1, + glosses: [{ wordId: "0100100103", gloss: "Three" }], + }, + ]; + + await machineGlossRepository.updateAllForLanguage({ + languageId: language.id, + modelCode: "llm_import", + stream: Readable.from(chapterStream), + onBookIdChange, + }); + + expect(onBookIdChange).toHaveBeenCalledTimes(2); + expect(onBookIdChange).toHaveBeenNthCalledWith(1, 1); + expect(onBookIdChange).toHaveBeenNthCalledWith(2, 2); + + const insertedGlosses = await getDb() + .selectFrom("machine_gloss") + .where("language_id", "=", language.id) + .orderBy("id") + .select(["word_id", "gloss"]) + .execute(); + + expect(insertedGlosses).toEqual([ + { word_id: "0100100101", gloss: "One" }, + { word_id: "0100100102", gloss: "Two" }, + { word_id: "0100100103", gloss: "Three" }, + ]); + }); + + test("errors in tracks progress don't crash the stream", async () => { + const { language } = await languageFactory.build({ + members: [], + }); + + const onBookIdChange = vi.fn().mockRejectedValue(new Error("test error")); + const chapterStream: Array = [ + { + bookId: 1, + chapterNumber: 1, + glosses: [{ wordId: "0100100101", gloss: "One" }], + }, + { + bookId: 1, + chapterNumber: 2, + glosses: [{ wordId: "0100100102", gloss: "Two" }], + }, + { + bookId: 2, + chapterNumber: 1, + glosses: [{ wordId: "0100100103", gloss: "Three" }], + }, + ]; + + await machineGlossRepository.updateAllForLanguage({ + languageId: language.id, + modelCode: "llm_import", + stream: Readable.from(chapterStream), + onBookIdChange, + }); + + expect(onBookIdChange).toHaveBeenCalledTimes(2); + expect(onBookIdChange).toHaveBeenNthCalledWith(1, 1); + expect(onBookIdChange).toHaveBeenNthCalledWith(2, 2); + + const insertedGlosses = await getDb() + .selectFrom("machine_gloss") + .where("language_id", "=", language.id) + .orderBy("id") + .select(["word_id", "gloss"]) + .execute(); + + expect(insertedGlosses).toEqual([ + { word_id: "0100100101", gloss: "One" }, + { word_id: "0100100102", gloss: "Two" }, + { word_id: "0100100103", gloss: "Three" }, + ]); + }); }); diff --git a/src/modules/translation/data-access/machineGlossRepository.ts b/src/modules/translation/data-access/machineGlossRepository.ts index 48efe6d5..66fed4e8 100644 --- a/src/modules/translation/data-access/machineGlossRepository.ts +++ b/src/modules/translation/data-access/machineGlossRepository.ts @@ -1,4 +1,5 @@ import { copyStream, getDb } from "@/db"; +import type { AIGloss, AIGlossChapter } from "./aiGlossImportService"; import { Readable, Transform } from "stream"; export interface StreamedMachineGloss { @@ -11,10 +12,12 @@ export const machineGlossRepository = { languageId, modelCode, stream, + onProgress, }: { languageId: string; modelCode: string; stream: Readable; + onProgress?: (bookId: number) => Promise; }): Promise { const words = await getDb().selectFrom("word").select("id").execute(); const wordIdSet = buildWordIdsSet(words); @@ -30,19 +33,50 @@ export const machineGlossRepository = { .where("language_id", "=", languageId) .execute(); + const progressTransform = new TrackBookProgressTransform(onProgress); + const filterTransform = new FilterMissingWordsTransform(wordIdSet); + await copyStream({ table: "machine_gloss", - stream: stream.pipe(new FilterMissingWordsTransform(wordIdSet)), fields: { word_id: (record) => record.wordId, language_id: () => languageId, model_id: () => model.id.toString(), gloss: (record) => record.gloss, }, + stream: stream.compose(progressTransform).compose(filterTransform), }); }, }; +class TrackBookProgressTransform extends Transform { + private currentBookId: number | undefined; + + constructor(private onBookIdChange?: (bookId: number) => Promise) { + super({ writableObjectMode: true, readableObjectMode: true }); + } + + override _transform( + chapter: AIGlossChapter, + _encoding: BufferEncoding, + cb: (error?: Error | null, data?: Array) => void, + ) { + if (chapter.bookId !== this.currentBookId) { + this.currentBookId = chapter.bookId; + // This is intentionally not awaited since we don't want to block the stream + if (this.onBookIdChange) { + this.onBookIdChange(chapter.bookId).catch((err) => { + console.error( + `Unhandled failure in TrackBookProgressTransform.onBookIdChange: ${err}`, + ); + }); + } + } + + cb(null, chapter.glosses); + } +} + export class FilterMissingWordsTransform extends Transform { constructor(private readonly existingWordIds: ReadonlySet) { super({ @@ -89,3 +123,16 @@ export function normalizeWordIdToNumber(wordId: string): number { return Number(wordId); } } + +function isAIGlossChapter(chunk: unknown): chunk is AIGlossChapter { + if (!chunk || typeof chunk !== "object") { + return false; + } + + return ( + "bookId" in chunk && + "chapterNumber" in chunk && + "glosses" in chunk && + Array.isArray(chunk.glosses) + ); +} diff --git a/src/modules/translation/jobs/importAIGlosses.ts b/src/modules/translation/jobs/importAIGlosses.ts index 56880203..105e76b9 100644 --- a/src/modules/translation/jobs/importAIGlosses.ts +++ b/src/modules/translation/jobs/importAIGlosses.ts @@ -2,14 +2,10 @@ import { logger } from "@/logging"; import { Job } from "@/shared/jobs/model"; import jobRepository from "@/shared/jobs/data-access/jobRepository"; import { TRANSLATION_JOB_TYPES } from "./jobType"; -import { - AIGloss, - aiGlossImportService, - type AIGlossChapter, -} from "../data-access/aiGlossImportService"; +import { aiGlossImportService } from "../data-access/aiGlossImportService"; import { machineGlossRepository } from "../data-access/machineGlossRepository"; import { resolveLanguageByCode } from "@/modules/languages"; -import { Readable, Transform } from "stream"; +import { Readable } from "stream"; interface ImportAIGlossesJobData { bookId?: number; @@ -52,47 +48,18 @@ export async function importAIGlosses(job: ImportAIGlossesJob) { const requestStream = aiGlossImportService.streamGlosses( job.payload.languageCode, ); - const progressTransform = new TrackBookProgressTransform({ - onBookIdChange: async (bookId) => { - jobLogger.info(`Importing AI glosses for book ${bookId}`); - await jobRepository.updateData(job.id, { bookId }); - }, - }); await machineGlossRepository.updateAllForLanguage({ languageId: language.id, modelCode: "llm_import", - stream: Readable.from(requestStream).pipe(progressTransform), + stream: Readable.from(requestStream), + onProgress: async (bookId) => { + jobLogger.info(`Importing AI glosses for book ${bookId}`); + await jobRepository.updateData(job.id, { bookId }); + }, }); jobLogger.info( `Imported AI glosses for language ${job.payload.languageCode}`, ); } - -interface TrackBookProgressTransformProps { - onBookIdChange(bookId: number): Promise; -} - -class TrackBookProgressTransform extends Transform { - private currentBookId: number | undefined; - private readonly onBookIdChange: (bookId: number) => Promise; - - constructor({ onBookIdChange }: TrackBookProgressTransformProps) { - super({ writableObjectMode: true, readableObjectMode: true }); - this.onBookIdChange = onBookIdChange; - } - - override _transform( - chapter: AIGlossChapter, - _encoding: BufferEncoding, - cb: (error?: Error | null, data?: Array) => void, - ) { - if (chapter.bookId !== this.currentBookId) { - this.currentBookId = chapter.bookId; - this.onBookIdChange(chapter.bookId); - } - - cb(null, chapter.glosses); - } -} From a9649c4a3364fae7e29a2f80683b930194da980b Mon Sep 17 00:00:00 2001 From: Adrian Rocke Date: Thu, 23 Apr 2026 16:03:24 -0500 Subject: [PATCH 3/4] fix: lint issue --- .../data-access/machineGlossRepository.ts | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/modules/translation/data-access/machineGlossRepository.ts b/src/modules/translation/data-access/machineGlossRepository.ts index 66fed4e8..a2ff9a8a 100644 --- a/src/modules/translation/data-access/machineGlossRepository.ts +++ b/src/modules/translation/data-access/machineGlossRepository.ts @@ -123,16 +123,3 @@ export function normalizeWordIdToNumber(wordId: string): number { return Number(wordId); } } - -function isAIGlossChapter(chunk: unknown): chunk is AIGlossChapter { - if (!chunk || typeof chunk !== "object") { - return false; - } - - return ( - "bookId" in chunk && - "chapterNumber" in chunk && - "glosses" in chunk && - Array.isArray(chunk.glosses) - ); -} From 9b9abd638d5c481a421c129f59ae092077e649dd Mon Sep 17 00:00:00 2001 From: Adrian Rocke Date: Thu, 23 Apr 2026 16:09:40 -0500 Subject: [PATCH 4/4] fix: rename variable in tests --- .../machineGlossRepository.test.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/modules/translation/data-access/machineGlossRepository.test.ts b/src/modules/translation/data-access/machineGlossRepository.test.ts index c1689cec..d56a296d 100644 --- a/src/modules/translation/data-access/machineGlossRepository.test.ts +++ b/src/modules/translation/data-access/machineGlossRepository.test.ts @@ -157,7 +157,7 @@ describe("updateAllForLanguage", () => { members: [], }); - const onBookIdChange = vi.fn().mockResolvedValue(undefined); + const onProgress = vi.fn().mockResolvedValue(undefined); const chapterStream: Array = [ { bookId: 1, @@ -180,12 +180,12 @@ describe("updateAllForLanguage", () => { languageId: language.id, modelCode: "llm_import", stream: Readable.from(chapterStream), - onBookIdChange, + onProgress, }); - expect(onBookIdChange).toHaveBeenCalledTimes(2); - expect(onBookIdChange).toHaveBeenNthCalledWith(1, 1); - expect(onBookIdChange).toHaveBeenNthCalledWith(2, 2); + expect(onProgress).toHaveBeenCalledTimes(2); + expect(onProgress).toHaveBeenNthCalledWith(1, 1); + expect(onProgress).toHaveBeenNthCalledWith(2, 2); const insertedGlosses = await getDb() .selectFrom("machine_gloss") @@ -206,7 +206,7 @@ describe("updateAllForLanguage", () => { members: [], }); - const onBookIdChange = vi.fn().mockRejectedValue(new Error("test error")); + const onProgress = vi.fn().mockRejectedValue(new Error("test error")); const chapterStream: Array = [ { bookId: 1, @@ -229,12 +229,12 @@ describe("updateAllForLanguage", () => { languageId: language.id, modelCode: "llm_import", stream: Readable.from(chapterStream), - onBookIdChange, + onProgress, }); - expect(onBookIdChange).toHaveBeenCalledTimes(2); - expect(onBookIdChange).toHaveBeenNthCalledWith(1, 1); - expect(onBookIdChange).toHaveBeenNthCalledWith(2, 2); + expect(onProgress).toHaveBeenCalledTimes(2); + expect(onProgress).toHaveBeenNthCalledWith(1, 1); + expect(onProgress).toHaveBeenNthCalledWith(2, 2); const insertedGlosses = await getDb() .selectFrom("machine_gloss")