diff --git a/apps/pdftk/project.json b/apps/pdftk/project.json index 98761203..40b7f2d3 100644 --- a/apps/pdftk/project.json +++ b/apps/pdftk/project.json @@ -25,7 +25,9 @@ "executor": "@riwi/bun:test", "outputs": ["{workspaceRoot}/coverage/{projectRoot}"], "dependsOn": ["docker-build-test"], - "options": {} + "options": { + "maxConcurrency": 1 + } }, "local-test": { "executor": "@riwi/bun:test", diff --git a/apps/pdftk/src/main.ts b/apps/pdftk/src/main.ts index 7dd2abbc..b2f7d2be 100644 --- a/apps/pdftk/src/main.ts +++ b/apps/pdftk/src/main.ts @@ -11,6 +11,7 @@ import { formFillStream, uncompressStream, } from '@riwi/binary/pdftk'; +import { handleRouteError } from '@riwi/http/error'; import { connect, healthEndpoints, post, processEndpoints } from '@riwi/http/route'; import { httpServer } from '@riwi/http/server'; import { middlewareQuery } from '@riwi/http/validate'; @@ -30,20 +31,14 @@ httpServer( try { await compressStream({ input: req, output: res }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }), post({ path: '/uncompress' }, async ({ req, res }) => { try { await uncompressStream({ input: req, output: res }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }), post( @@ -53,10 +48,7 @@ httpServer( try { await encryptStream({ input: req, output: res, password, userPassword, allow }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }, ), @@ -64,30 +56,21 @@ httpServer( try { await decryptStream({ input: req, output: res, password }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }), post({ path: '/data/fields' }, async ({ req, res }) => { try { await dataFieldsStream({ input: req, output: res }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }), post({ path: '/data/dump' }, async ({ req, res }) => { try { await dataDumpStream({ input: req, output: res }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }), // post({ path: '/data/annots' }, async ({ req, res }) => { @@ -109,10 +92,7 @@ httpServer( try { await formFillStream({ input: req, output: res, flag, fontName, data }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }, ), @@ -120,10 +100,7 @@ httpServer( try { await dataFdfStream({ input: req, output: res }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'pdftk failed'); - } + handleRouteError(res, error, 'pdftk failed'); } }), ...healthEndpoints, diff --git a/apps/pdftk/src/test/pdftk.spec.ts b/apps/pdftk/src/test/pdftk.spec.ts index 453e0f18..4d91cb49 100644 --- a/apps/pdftk/src/test/pdftk.spec.ts +++ b/apps/pdftk/src/test/pdftk.spec.ts @@ -3,6 +3,7 @@ import { readFileSync, statSync } from 'node:fs'; import { dirname, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; import { currentArch } from '@riwi/docker'; +import { wait } from '@riwi/helper'; import { streamLength, streamToBuffer } from '@riwi/stream'; import { createProcessEndpointsDisabledTest, createProcessEndpointTests, useTestContainer } from '@riwi/test/bun'; import { beautifyJson, streamRequest, testRequest } from '@riwi/test/request'; @@ -18,6 +19,12 @@ const assertSnapshot = (actual: string, relativePath: string): void => { expect(actual).toBe(expected); }; +const expectStatusOk = (statusCode: number | undefined): void => { + expect(statusCode).toBe(200); +}; + +const requestSettleDelayMs = 75; + describe('pdftk', () => { [currentArch()].forEach((arch) => { describe(`arch: ${arch}`, () => { @@ -25,6 +32,7 @@ describe('pdftk', () => { image: `philiplehmann/pdftk:test-${arch}`, containerPort, env: { PDFTK_PROCESS_ENABLED: 'true' }, + type: 'each', }); describe('compress', async () => { @@ -39,12 +47,38 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(response.statusCode); const size = await streamLength(response); + await wait(requestSettleDelayMs); expect(stats.size).toBeGreaterThan(size); }); }); + describe.skip('compress parallel', async () => { + it('pdf file reduces in size', async () => { + const file = resolve(__dirname, 'assets/uncompressed.pdf'); + const stats = statSync(file); + const responses = await Promise.all( + Array.from({ length: 20 }).map(() => + streamRequest({ + method: 'POST', + host: 'localhost', + port: setup.port, + path: '/compress', + headers: { 'Content-Type': 'application/pdf' }, + file, + }), + ), + ); + for (const response of responses) { + expectStatusOk(response.statusCode); + const size1 = await streamLength(response); + expect(stats.size > size1).toBeTruthy(); + } + }); + }); + describe('uncompress', () => { it('pdf file increases in size', async () => { const file = resolve(__dirname, 'assets/compressed.pdf'); @@ -57,7 +91,9 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(response.statusCode); const size = await streamLength(response); + await wait(requestSettleDelayMs); expect(stats.size).toBeLessThan(size); }); @@ -66,7 +102,7 @@ describe('pdftk', () => { describe('encrypt', () => { it('pdf file is encrypted', async () => { const file = resolve(__dirname, 'assets/form.pdf'); - const [, text] = await testRequest({ + const encryptResponse = await streamRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -74,13 +110,27 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(encryptResponse.statusCode); - expect(text).toInclude('/Encrypt'); + const encryptedPdf = await streamToBuffer(encryptResponse); + await wait(requestSettleDelayMs); + const [decryptResponse, text] = await testRequest({ + method: 'POST', + host: 'localhost', + port: setup.port, + path: '/decrypt?password=1234', + headers: { 'Content-Type': 'application/pdf' }, + body: encryptedPdf, + }); + expectStatusOk(decryptResponse.statusCode); + await wait(requestSettleDelayMs); + + expect(text.includes('/Encrypt')).toBeFalsy(); }); it('pdf file is encrypted and has password', async () => { const file = resolve(__dirname, 'assets/form.pdf'); - const [, text] = await testRequest({ + const encryptResponse = await streamRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -88,13 +138,27 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(encryptResponse.statusCode); + + const encryptedPdf = await streamToBuffer(encryptResponse); + await wait(requestSettleDelayMs); + const [decryptResponse, text] = await testRequest({ + method: 'POST', + host: 'localhost', + port: setup.port, + path: '/decrypt?password=5678', + headers: { 'Content-Type': 'application/pdf' }, + body: encryptedPdf, + }); + expectStatusOk(decryptResponse.statusCode); + await wait(requestSettleDelayMs); - expect(text).toInclude('/Encrypt'); + expect(text.includes('/Encrypt')).toBeFalsy(); }); it('pdf file is encrypted, has password and allow is defined', async () => { const file = resolve(__dirname, 'assets/form.pdf'); - const [, text] = await testRequest({ + const encryptResponse = await streamRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -102,15 +166,29 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(encryptResponse.statusCode); + + const encryptedPdf = await streamToBuffer(encryptResponse); + await wait(requestSettleDelayMs); + const [decryptResponse, text] = await testRequest({ + method: 'POST', + host: 'localhost', + port: setup.port, + path: '/decrypt?password=1234', + headers: { 'Content-Type': 'application/pdf' }, + body: encryptedPdf, + }); + expectStatusOk(decryptResponse.statusCode); + await wait(requestSettleDelayMs); - expect(text).toInclude('/Encrypt'); + expect(text.includes('/Encrypt')).toBeFalsy(); }); }); describe('decrypt', () => { it('pdf file is decrypted', async () => { const file = resolve(__dirname, 'assets/encrypted.pdf'); - const [, text] = await testRequest({ + const [response, text] = await testRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -118,6 +196,8 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(response.statusCode); + await wait(requestSettleDelayMs); expect(text).not.toInclude('/Encrypt'); }); @@ -126,7 +206,7 @@ describe('pdftk', () => { describe('dataFields', () => { it('return pdf data fields', async () => { const file = resolve(__dirname, 'assets/form.pdf'); - const [, text] = await testRequest({ + const [response, text] = await testRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -134,6 +214,8 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(response.statusCode); + await wait(requestSettleDelayMs); assertSnapshot(beautifyJson(text), './snapshots/dataFields.json'); }); @@ -142,7 +224,7 @@ describe('pdftk', () => { describe('dataDump', () => { it('return pdf data dump', async () => { const file = resolve(__dirname, 'assets/form.pdf'); - const [, text] = await testRequest({ + const [response, text] = await testRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -150,6 +232,8 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(response.statusCode); + await wait(requestSettleDelayMs); assertSnapshot(beautifyJson(text), './snapshots/dataDump.json'); }); @@ -158,7 +242,7 @@ describe('pdftk', () => { describe('dataFDF', () => { it('return pdf generated fdf', async () => { const file = resolve(__dirname, 'assets/form.pdf'); - const [, text] = await testRequest({ + const [response, text] = await testRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -166,6 +250,8 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); + expectStatusOk(response.statusCode); + await wait(requestSettleDelayMs); assertSnapshot(text, './snapshots/dataFdf.fdf'); }); @@ -196,11 +282,12 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, file, }); - expect(response.statusCode).toBe(200); + expectStatusOk(response.statusCode); const pdf = await streamToBuffer(response); + await wait(requestSettleDelayMs); - const [, text] = await testRequest({ + const [dataFieldsResponse, text] = await testRequest({ method: 'POST', host: 'localhost', port: setup.port, @@ -208,6 +295,8 @@ describe('pdftk', () => { headers: { 'Content-Type': 'application/pdf' }, body: pdf, }); + expectStatusOk(dataFieldsResponse.statusCode); + await wait(requestSettleDelayMs); assertSnapshot(beautifyJson(text), './snapshots/formFill.json'); }); }); diff --git a/apps/puppeteer/src/e2e/snapshot.spec.ts-snapshots/page-0-chromium.png b/apps/puppeteer/src/e2e/snapshot.spec.ts-snapshots/page-0-chromium.png index fe7ccb97..2d906217 100644 Binary files a/apps/puppeteer/src/e2e/snapshot.spec.ts-snapshots/page-0-chromium.png and b/apps/puppeteer/src/e2e/snapshot.spec.ts-snapshots/page-0-chromium.png differ diff --git a/apps/tesseract/src/main.ts b/apps/tesseract/src/main.ts index 19db85b0..112bdab3 100644 --- a/apps/tesseract/src/main.ts +++ b/apps/tesseract/src/main.ts @@ -1,4 +1,5 @@ import { imageToText } from '@riwi/binary/tesseract'; +import { handleRouteError } from '@riwi/http/error'; import { connect, healthEndpoints, post, processEndpoints } from '@riwi/http/route'; import { httpServer } from '@riwi/http/server'; @@ -18,10 +19,7 @@ httpServer( try { await imageToText({ input: req, output: res }); } catch (error) { - if (!res.headersSent) { - res.statusCode = 500; - res.end(error instanceof Error ? error.message : 'tesseract failed'); - } + handleRouteError(res, error, 'tesseract failed'); } }), ...healthEndpoints, diff --git a/libs/binary/libreoffice-fs/src/lib/directFsConvert.ts b/libs/binary/libreoffice-fs/src/lib/directFsConvert.ts index cc58ec8f..82a3ee96 100644 --- a/libs/binary/libreoffice-fs/src/lib/directFsConvert.ts +++ b/libs/binary/libreoffice-fs/src/lib/directFsConvert.ts @@ -1,6 +1,6 @@ -import { libreoffice } from '@riwi/binary/libreoffice'; import { mkdir, stat } from 'node:fs/promises'; import { dirname } from 'node:path'; +import { libreoffice } from '@riwi/binary/libreoffice'; import { assertReadableInputFile } from './assertReadableInputFile'; import type { DirectFsConvertOptions } from './convert'; diff --git a/libs/binary/pdftk/src/lib/compress.ts b/libs/binary/pdftk/src/lib/compress.ts index 7c21776a..deca74f1 100644 --- a/libs/binary/pdftk/src/lib/compress.ts +++ b/libs/binary/pdftk/src/lib/compress.ts @@ -5,6 +5,21 @@ import { type PdftkOptions, pdftk } from './pdftk'; export async function compressStream( { input, output }: { input: Readable; output: Writable }, options?: PdftkOptions, +): Promise; +export async function compressStream( + { input, output }: { input: string; output: string }, + options?: PdftkOptions, +): Promise; +export async function compressStream( + { input, output }: { input: Readable | string; output: Writable | string }, + options?: PdftkOptions, ): Promise { + if (typeof input === 'string' && typeof output === 'string') { + await pdftk([input, 'output', output, 'compress'], options); + return; + } + if (typeof input === 'string' || typeof output === 'string') { + throw new Error('input and output must both be streams or both be file paths'); + } return streamChildProcess(input, output, pdftk(['-', 'output', '-', 'compress'], options)); } diff --git a/libs/http/error/src/index.ts b/libs/http/error/src/index.ts index 1a0b1fb4..c136ba9a 100644 --- a/libs/http/error/src/index.ts +++ b/libs/http/error/src/index.ts @@ -1 +1,2 @@ +export * from './lib/handle-route-error'; export * from './lib/http-error'; diff --git a/libs/http/error/src/lib/handle-route-error.ts b/libs/http/error/src/lib/handle-route-error.ts new file mode 100644 index 00000000..902b4787 --- /dev/null +++ b/libs/http/error/src/lib/handle-route-error.ts @@ -0,0 +1,13 @@ +import type { ServerResponse } from 'node:http'; + +export const handleRouteError = (res: ServerResponse, error: unknown, fallbackMessage = 'request failed'): void => { + const normalizedError = error instanceof Error ? error : new Error(fallbackMessage); + + if (res.headersSent) { + res.destroy(normalizedError); + return; + } + + res.statusCode = 500; + res.end(error instanceof Error ? error.message : fallbackMessage); +}; diff --git a/libs/stream/src/index.ts b/libs/stream/src/index.ts index 768b8c5c..88b49164 100644 --- a/libs/stream/src/index.ts +++ b/libs/stream/src/index.ts @@ -1,8 +1,15 @@ -export * from './lib/process-tracker'; -export * from './lib/stream-child-process'; -export * from './lib/stream-child-process-to-buffer'; -export * from './lib/stream-includes'; -export * from './lib/stream-length'; -export * from './lib/stream-to-buffer'; -export * from './lib/stream-to-json'; -export * from './lib/stream-to-string'; +export { awaitChildProcess } from './lib/await-child-process'; +export type { ProcessTrackerConfig } from './lib/process-tracker'; +export { configureProcessTracker, processTracker } from './lib/process-tracker'; +export { + type InputType, + type StreamChildProcessOptions, + streamChildProcess, + streamInputToWriteable, +} from './lib/stream-child-process'; +export { streamChildProcessToBuffer } from './lib/stream-child-process-to-buffer'; +export { streamIncludes } from './lib/stream-includes'; +export { streamLength } from './lib/stream-length'; +export { streamToBuffer } from './lib/stream-to-buffer'; +export { streamToJson } from './lib/stream-to-json'; +export { streamToString } from './lib/stream-to-string'; diff --git a/libs/stream/src/lib/await-child-process.ts b/libs/stream/src/lib/await-child-process.ts new file mode 100644 index 00000000..1985cf89 --- /dev/null +++ b/libs/stream/src/lib/await-child-process.ts @@ -0,0 +1,34 @@ +import type { ChildProcessWithoutNullStreams } from 'node:child_process'; + +export async function awaitChildProcess(child: ChildProcessWithoutNullStreams): Promise { + const stderrChunks: Buffer[] = []; + child.stderr.on('data', (chunk) => { + stderrChunks.push(Buffer.from(chunk)); + }); + + child.stderr.on('error', (error) => { + console.error(error); + }); + + child.stdout.on('error', (error) => { + console.error(error); + }); + + // Set up exit listener before awaiting to avoid race condition + const exitResult = await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => { + child.on('exit', (code, signal) => { + resolve({ code, signal }); + }); + }); + + const stderrOutput = Buffer.concat(stderrChunks).toString('utf-8'); + + const { code, signal } = exitResult; + + if (code !== 0) { + if (code === null && signal) { + throw new Error(`Child process exited with signal ${signal}${stderrOutput ? `: ${stderrOutput}` : ''}`); + } + throw new Error(`Child process exited with code ${code}${stderrOutput ? `: ${stderrOutput}` : ''}`); + } +} diff --git a/libs/stream/src/lib/stream-child-process-to-buffer.ts b/libs/stream/src/lib/stream-child-process-to-buffer.ts index f3e70533..0c201288 100644 --- a/libs/stream/src/lib/stream-child-process-to-buffer.ts +++ b/libs/stream/src/lib/stream-child-process-to-buffer.ts @@ -1,6 +1,6 @@ import type { ChildProcessWithoutNullStreams } from 'node:child_process'; import { processTracker } from './process-tracker'; -import type { InputType } from './stream-child-process'; +import { type InputType, isEpipeError, streamInputToWriteable } from './stream-child-process'; import { streamToBuffer } from './stream-to-buffer'; export interface StreamChildProcessToBufferOptions { @@ -19,19 +19,71 @@ export async function streamChildProcessToBuffer( if (track) { processTracker.register(child); } + const stderrChunks: Buffer[] = []; - child.stdin.on('error', (error) => { + child.stderr.on('data', (chunk) => { + const stderrChunk = Buffer.from(chunk); + stderrChunks.push(stderrChunk); + process.stderr.write(stderrChunk); + }); + + child.stderr.on('error', (error) => { console.error(error); }); - if (typeof input === 'string' || Buffer.isBuffer(input)) { - child.stdin.end(input); - } else { - input.pipe(child.stdin, { end: true }); - } + child.stdin.on('error', (error) => { + if (!isEpipeError(error)) { + console.error(error); + } + }); - child.stderr.pipe(process.stderr).on('error', (error) => { - console.error(error); + const exitPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => { + child.on('exit', (code, signal) => { + resolve({ code, signal }); + }); + }); + + const inputPromise = streamInputToWriteable(input, child.stdin, { end: true }).catch((error) => { + if (isEpipeError(error)) { + return; + } + child.kill(); + throw error; }); - return streamToBuffer(child.stdout); + const outputPromise = streamToBuffer(child.stdout); + + const [inputResult, outputResult, exitResult] = await Promise.allSettled([inputPromise, outputPromise, exitPromise]); + + const stderrOutput = Buffer.concat(stderrChunks).toString('utf-8'); + + if (inputResult.status === 'rejected') { + const message = inputResult.reason instanceof Error ? inputResult.reason.message : String(inputResult.reason); + if (stderrOutput) { + throw new Error(`${message}: ${stderrOutput}`); + } + throw inputResult.reason; + } + + if (outputResult.status === 'rejected') { + const message = outputResult.reason instanceof Error ? outputResult.reason.message : String(outputResult.reason); + if (stderrOutput) { + throw new Error(`${message}: ${stderrOutput}`); + } + throw outputResult.reason; + } + + if (exitResult.status === 'rejected') { + throw exitResult.reason; + } + + const { code, signal } = exitResult.value; + + if (code !== 0) { + if (code === null && signal) { + throw new Error(`Child process exited with signal ${signal}${stderrOutput ? `: ${stderrOutput}` : ''}`); + } + throw new Error(`Child process exited with code ${code}${stderrOutput ? `: ${stderrOutput}` : ''}`); + } + + return outputResult.value; } diff --git a/libs/stream/src/lib/stream-child-process.spec.ts b/libs/stream/src/lib/stream-child-process.spec.ts index 5387e3e2..1632698c 100644 --- a/libs/stream/src/lib/stream-child-process.spec.ts +++ b/libs/stream/src/lib/stream-child-process.spec.ts @@ -1,7 +1,18 @@ import { describe, expect, it } from 'bun:test'; +import { isEpipeError } from './stream-child-process'; describe('streamChildProcess', () => { it('should work', () => { expect(true).toBe(true); }); + + it('detects epipe error', () => { + const error = { code: 'EPIPE' } as NodeJS.ErrnoException; + expect(isEpipeError(error)).toBe(true); + }); + + it('does not treat other errors as epipe', () => { + const error = { code: 'ECONNRESET' } as NodeJS.ErrnoException; + expect(isEpipeError(error)).toBe(false); + }); }); diff --git a/libs/stream/src/lib/stream-child-process.ts b/libs/stream/src/lib/stream-child-process.ts index ceced14f..c4315827 100644 --- a/libs/stream/src/lib/stream-child-process.ts +++ b/libs/stream/src/lib/stream-child-process.ts @@ -11,6 +11,36 @@ export interface StreamChildProcessOptions { track?: boolean; } +export const isEpipeError = (error: unknown): boolean => { + return (error as NodeJS.ErrnoException | undefined)?.code === 'EPIPE'; +}; + +export const writeToWritable = (writable: Writable, chunk: string | Buffer): Promise => { + return new Promise((resolve, reject) => { + writable.write(chunk, (error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +}; + +export const endWritable = (writable: Writable): Promise => { + return new Promise((resolve, reject) => { + const onError = (error: Error) => { + writable.off('error', onError); + reject(error); + }; + writable.on('error', onError); + writable.end(() => { + writable.off('error', onError); + resolve(); + }); + }); +}; + export async function streamInputToWriteable( input: InputType, writable: Writable, @@ -18,9 +48,9 @@ export async function streamInputToWriteable( ): Promise { const { end = true } = options ?? {}; if (typeof input === 'string' || Buffer.isBuffer(input)) { - writable.write(input); + await writeToWritable(writable, input); if (end) { - writable.end(); + await endWritable(writable); } } else { await finished(input.pipe(writable, { end })); @@ -50,21 +80,10 @@ export async function streamChildProcess( }); child.stdin.on('error', (error) => { - console.error(error); - child.kill(); - }); - - try { - await streamInputToWriteable(input, child.stdin, { end: true }); - } catch (error) { - child.kill(); - const stderrOutput = Buffer.concat(stderrChunks).toString('utf-8'); - if (stderrOutput) { - const message = error instanceof Error ? error.message : String(error); - throw new Error(`${message}: ${stderrOutput}`); + if (!isEpipeError(error)) { + console.error(error); } - throw error; - } + }); child.stdout .on('error', (error) => { @@ -87,17 +106,47 @@ export async function streamChildProcess( }); }); - await finished(child.stdout); - stdoutFinished = true; + const inputPromise = streamInputToWriteable(input, child.stdin, { end: true }).catch((error) => { + if (isEpipeError(error)) { + return; + } + child.kill(); + throw error; + }); + + const stdoutPromise = finished(child.stdout).then(() => { + stdoutFinished = true; + }); + + const [inputResult, stdoutResult, exitResult] = await Promise.allSettled([inputPromise, stdoutPromise, exitPromise]); + const stderrOutput = Buffer.concat(stderrChunks).toString('utf-8'); - // Wait for the process to exit and check the exit code - const { code, signal } = await exitPromise; + if (inputResult.status === 'rejected') { + const message = inputResult.reason instanceof Error ? inputResult.reason.message : String(inputResult.reason); + if (stderrOutput) { + throw new Error(`${message}: ${stderrOutput}`); + } + throw inputResult.reason; + } + + if (exitResult.status === 'rejected') { + throw exitResult.reason; + } + + const { code, signal } = exitResult.value; if (code !== 0) { - const stderrOutput = Buffer.concat(stderrChunks).toString('utf-8'); if (code === null && signal) { throw new Error(`Child process exited with signal ${signal}${stderrOutput ? `: ${stderrOutput}` : ''}`); } throw new Error(`Child process exited with code ${code}${stderrOutput ? `: ${stderrOutput}` : ''}`); } + + if (stdoutResult.status === 'rejected') { + const message = stdoutResult.reason instanceof Error ? stdoutResult.reason.message : String(stdoutResult.reason); + if (stderrOutput) { + throw new Error(`${message}: ${stderrOutput}`); + } + throw stdoutResult.reason; + } } diff --git a/libs/test/bun/src/lib/use-test-container.ts b/libs/test/bun/src/lib/use-test-container.ts index 6a5c64ec..30f4d95a 100644 --- a/libs/test/bun/src/lib/use-test-container.ts +++ b/libs/test/bun/src/lib/use-test-container.ts @@ -1,4 +1,4 @@ -import { afterAll, beforeAll } from 'bun:test'; +import { afterAll, afterEach, beforeAll, beforeEach } from 'bun:test'; import { type TestContainerProps, testContainer } from '@riwi/test/server'; import type { StartedTestContainer } from 'testcontainers'; @@ -9,30 +9,38 @@ export interface TestContainerOutput { export const useTestContainer = ({ localPort = 3000, timeout = 30_000, + type = 'all', ...props -}: TestContainerProps & { localPort?: number; timeout?: number }): TestContainerOutput => { +}: TestContainerProps & { localPort?: number; timeout?: number; type?: 'each' | 'all' }): TestContainerOutput => { const output: TestContainerOutput = {} as TestContainerOutput; let container: StartedTestContainer | undefined; - beforeAll( - async () => { - if (process.env.TEST_SERVER_RUNNER === 'local') { - output.port = localPort; - return; - } + const before = async () => { + if (process.env.TEST_SERVER_RUNNER === 'local') { + output.port = localPort; + return; + } - const [startedContainer, mappedPort] = await testContainer(props); - container = startedContainer; - output.port = mappedPort; - }, - { timeout }, - ); + const [startedContainer, mappedPort] = await testContainer(props); + container = startedContainer; + output.port = mappedPort; + }; - afterAll(async () => { + const after = async () => { if (process.env.TEST_SERVER_RUNNER !== 'local') { await container?.stop(); } - }); + }; + + if (type === 'all') { + beforeAll(before, { timeout }); + afterAll(after); + } else if (type === 'each') { + beforeEach(before, { timeout }); + afterEach(after); + } else { + throw new Error(`Invalid type: ${type}`); + } return output; }; diff --git a/libs/test/request/src/lib/test-request.ts b/libs/test/request/src/lib/test-request.ts index ba048b3a..ea5ced18 100644 --- a/libs/test/request/src/lib/test-request.ts +++ b/libs/test/request/src/lib/test-request.ts @@ -1,30 +1,71 @@ -import { createReadStream, type ReadStream } from 'node:fs'; -import { type IncomingMessage, type RequestOptions, request } from 'node:http'; +import type { ReadStream } from 'node:fs'; +import { readFile } from 'node:fs/promises'; +import { type IncomingMessage, type OutgoingHttpHeaders, type RequestOptions, request } from 'node:http'; import { Readable } from 'node:stream'; import { streamToString } from '@riwi/stream'; export const streamRequest = async ({ file, body, + timeout = 30_000, ...requestParams }: RequestOptions & { file?: string; body?: string | Buffer | Readable | ReadStream; + timeout?: number; }): Promise => { + const payload = (() => { + if (file) { + return readFile(file); + } + if (typeof body === 'string' || Buffer.isBuffer(body)) { + return Promise.resolve(Buffer.from(body)); + } + return Promise.resolve(null); + })(); + + const resolvedPayload = await payload; + return new Promise((resolve, reject) => { - const req = request({ host: 'localhost', pathname: '/', ...requestParams }, async (response) => { - try { - resolve(response); - } catch (e) { - reject(e); - } + const requestHeaders = requestParams.headers; + const headers = ( + requestHeaders && !Array.isArray(requestHeaders) ? { ...requestHeaders } : {} + ) as OutgoingHttpHeaders; + + if (resolvedPayload && headers['Content-Length'] == null && headers['content-length'] == null) { + headers['Content-Length'] = String(resolvedPayload.length); + } + + if (headers['Connection'] == null && headers['connection'] == null) { + headers['Connection'] = 'close'; + } + + const req = request( + { host: 'localhost', pathname: '/', agent: false, ...requestParams, headers }, + async (response) => { + try { + resolve(response); + } catch (e) { + reject(e); + } + }, + ); + + req.on('error', (error) => { + reject(error); }); - if (file) { - createReadStream(file).pipe(req, { end: true }); - } else if (typeof body === 'string' || Buffer.isBuffer(body)) { - req.write(body); + + req.setTimeout(timeout, () => { + req.destroy(new Error(`Request timed out after ${timeout}ms`)); + }); + + if (resolvedPayload) { + req.write(resolvedPayload); req.end(); } else if (body instanceof Readable) { + body.on('error', (error: Error) => { + req.destroy(error); + }); body.pipe(req, { end: true }); } else { req.end(); @@ -35,12 +76,14 @@ export const streamRequest = async ({ export const testRequest = async ({ file, body, + timeout, ...requestParams }: RequestOptions & { file?: string; body?: string | Buffer | Readable | ReadStream; + timeout?: number; }): Promise<[IncomingMessage, string]> => { - const response = await streamRequest({ file, body, ...requestParams }); + const response = await streamRequest({ file, body, timeout, ...requestParams }); const text = await streamToString(response); return [response, text]; }; diff --git a/libs/test/server/src/lib/test-container.ts b/libs/test/server/src/lib/test-container.ts index 2dd7ed32..cdda3a5c 100644 --- a/libs/test/server/src/lib/test-container.ts +++ b/libs/test/server/src/lib/test-container.ts @@ -7,6 +7,7 @@ export interface TestContainerProps { healthPath?: string; healthPort?: number; healthStatusCode?: number; + memorySizeMB?: number; env?: Environment; hook?: (container: GenericContainer) => GenericContainer | Promise; } @@ -17,6 +18,7 @@ export const testContainer = async ({ healthPath = '/health/readiness', healthPort = containerPort, healthStatusCode = 200, + memorySizeMB = 128, env, hook, }: TestContainerProps): Promise<[StartedTestContainer, number]> => { @@ -25,7 +27,8 @@ export const testContainer = async ({ .withExposedPorts(containerPort) .withUser('1000:1000') .withLogConsumer((stream) => stream.pipe(process.stdout)) - .withWaitStrategy(Wait.forHttp(healthPath, healthPort).forStatusCode(healthStatusCode)); + .withWaitStrategy(Wait.forHttp(healthPath, healthPort).forStatusCode(healthStatusCode)) + .withSharedMemorySize(memorySizeMB * 1024 * 1024); if (hook) { genericContainer = await hook(genericContainer);