diff --git a/README.md b/README.md index 11b7855..63c6963 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,38 @@ const objectDetectionResp = await jigsaw.vision.object_detection({ }); ``` +### Live speech-to-text (streaming) + +Pipe real-time PCM16 audio (microphone, WebRTC, file) into the SDK and receive incremental and committed transcripts. + +```js +import { Readable } from "stream"; +import recorder from "node-record-lpcm16"; +import { JigsawStack } from "jigsawstack"; + +const jigsaw = JigsawStack({ apiKey: process.env.JIGSAWSTACK_API_KEY }); +// Streaming is English-only and expects mono 16-bit PCM input (downmix stereo sources beforehand). +const transcriber = jigsaw.audio.speech_to_text_live({ + sampleRate: 16000, +}); + +transcriber.on("delta", ({ text }) => process.stdout.write(`\r… ${text}`)); +transcriber.on("turn", ({ text }) => console.log(`\n${text}`)); + +await transcriber.connect(); + +const rec = recorder.record({ sampleRate: 16000, channels: 1, audioType: "raw" }); +Readable.toWeb(rec.stream()).pipeTo(transcriber.stream()); + +process.on("SIGINT", async () => { + rec.stop(); + await transcriber.close(); + process.exit(); +}); +``` + +See `examples/live-mic.js` for a full working example. + ## Community Join JigsawStack community on [Discord](https://discord.gg/dj8fMBpnqd) to connect with other developers, share ideas, and get help with the SDK. diff --git a/docs/superpowers/plans/2026-04-21-live-stt.md b/docs/superpowers/plans/2026-04-21-live-stt.md new file mode 100644 index 0000000..7e047b0 --- /dev/null +++ b/docs/superpowers/plans/2026-04-21-live-stt.md @@ -0,0 +1,1623 @@ +# Live Speech-to-Text Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `jigsaw.audio.speech_to_text_live(config)` — a Node-focused live transcriber that accepts a WritableStream of PCM16 audio, internally chunks with overlap, streams each chunk to `/v1/ai/transcribe?stream=true`, and emits `open`/`delta`/`turn`/`warning`/`error`/`close` events. + +**Architecture:** New subtree at `src/audio/live/` with four files: `chunker.ts` (PCM→WAV chunks with overlap), `stitcher.ts` (token overlap + fuzzy match), `sse.ts` (POST + SSE parser), `transcriber.ts` (public class — state machine, events, WritableStream sink). `RequestClient` gains a new `fetchJSSStream` method that returns the raw `Response` so the SSE module can iterate the body. + +**Tech Stack:** TypeScript, native `fetch`, WHATWG `WritableStream`/`ReadableStream`, `AbortController`, `crypto.randomUUID()`. Tests use `node:test` + `tsx` (existing infra). No new runtime dependencies. + +**Reference spec:** `docs/superpowers/specs/2026-04-21-live-stt-design.md` + +--- + +## File Structure + +**New files:** +- `src/audio/live/chunker.ts` — PCM16 buffer with overlap retention, produces WAV chunks +- `src/audio/live/stitcher.ts` — token-level overlap detection + fuzzy match +- `src/audio/live/sse.ts` — POST audio + parse SSE transcript events +- `src/audio/live/transcriber.ts` — public `Transcriber` class implementing `LiveTranscriber` +- `tests/live/chunker.test.ts` +- `tests/live/stitcher.test.ts` +- `tests/live/sse.test.ts` +- `tests/live/transcriber.test.ts` +- `tests/audio-live.test.ts` — opt-in live API test +- `examples/live-mic.js` — reference mic→SDK example (was `node-record.js`) + +**Modified files:** +- `src/audio/interfaces.ts` — add `LiveSTTConfig`, event payload types, `LiveTranscriber` interface +- `src/audio/audio.ts` — add `speech_to_text_live(config?)` method on `Audio` class +- `src/request.ts` — add `fetchJSSStream(...)` method returning raw `Response` +- `package.json` — remove `node-record-lpcm16` from `dependencies`, add `test:live` script + +**Deleted files:** +- `node-record.js` (moved to `examples/live-mic.js`) + +--- + +## Task 1: Extend RequestClient with streaming method + +**Files:** +- Modify: `src/request.ts` +- Test: `tests/live/request-stream.test.ts` (new) + +- [ ] **Step 1: Create the tests directory and write the failing test** + +Create `tests/live/request-stream.test.ts`: + +```ts +import { describe, test, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { RequestClient } from "../../src/request"; + +describe("RequestClient.fetchJSSStream", () => { + let origFetch: typeof fetch; + let lastCall: { url: string; init: RequestInit } | null; + + beforeEach(() => { + origFetch = globalThis.fetch; + lastCall = null; + globalThis.fetch = (async (url: any, init: any) => { + lastCall = { url: String(url), init }; + return new Response("hello", { status: 200, headers: { "content-type": "text/plain" } }); + }) as typeof fetch; + }); + + afterEach(() => { + globalThis.fetch = origFetch; + }); + + test("returns raw Response without reading the body", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const body = new Uint8Array([1, 2, 3]); + const resp = await client.fetchJSSStream("/v1/stream", "POST", body, { stream: "true" }, { "Content-Type": "audio/wav" }); + assert.equal(resp.status, 200); + assert.equal(resp.bodyUsed, false); + assert.equal(lastCall!.url, "https://api.test/v1/stream?stream=true"); + const headers = lastCall!.init.headers as Record; + assert.equal(headers["x-api-key"], "k"); + assert.equal(headers["Content-Type"], "audio/wav"); + }); + + test("forwards AbortSignal", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const ac = new AbortController(); + await client.fetchJSSStream("/v1/stream", "POST", new Uint8Array([1]), undefined, undefined, ac.signal); + assert.equal(lastCall!.init.signal, ac.signal); + }); + + test("omits undefined query params", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + await client.fetchJSSStream("/v1/stream", "POST", new Uint8Array([1]), { stream: "true", translate: undefined }); + assert.equal(lastCall!.url, "https://api.test/v1/stream?stream=true"); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `yarn test:run tests/live/request-stream.test.ts` +Expected: FAIL with "client.fetchJSSStream is not a function". + +- [ ] **Step 3: Implement `fetchJSSStream`** + +Modify `src/request.ts` — add this method to the `RequestClient` class, right after `fetchJSS`: + +```ts + readonly fetchJSSStream = async ( + path: string, + method: "POST" | "GET", + body?: Uint8Array | Record, + searchParams?: { + [key: string]: any; + }, + headers?: { + [key: string]: string; + }, + signal?: AbortSignal + ): Promise => { + const isBinary = body instanceof Uint8Array; + + searchParams = searchParams ? removeUndefinedProperties(searchParams) : undefined; + + const _headers = { + "x-api-key": this.config?.apiKey, + ...(!isBinary && body !== undefined ? { "Content-Type": "application/json" } : {}), + ...this.config?.headers, + ...headers, + }; + + const _body = isBinary ? body : body !== undefined ? JSON.stringify(body) : undefined; + + const url = `${this.config?.baseURL || baseURL}${path}`; + const urlParams = searchParams && Object.keys(searchParams).length ? `?${new URLSearchParams(searchParams).toString()}` : ""; + + return fetch(`${url}${urlParams}`, { + method, + headers: _headers, + body: method === "POST" ? _body : undefined, + signal, + }); + }; +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `yarn test:run tests/live/request-stream.test.ts` +Expected: all 3 tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/request.ts tests/live/request-stream.test.ts +git commit -m "feat(request): add fetchJSSStream for raw Response access" +``` + +--- + +## Task 2: Chunker + +**Files:** +- Create: `src/audio/live/chunker.ts` +- Test: `tests/live/chunker.test.ts` + +- [ ] **Step 1: Write the failing test** + +Create `tests/live/chunker.test.ts`: + +```ts +import { describe, test } from "node:test"; +import assert from "node:assert/strict"; +import { Chunker } from "../../src/audio/live/chunker"; + +const RATE = 16000; +const BYTES_PER_SEC = RATE * 1 * 2; // mono 16-bit = 32000 bytes/s + +function makeChunker(overrides: Partial[0]> = {}) { + return new Chunker({ + sampleRate: RATE, + channels: 1, + chunkSeconds: 5, + overlapSeconds: 2, + maxBufferSeconds: 30, + ...overrides, + }); +} + +describe("Chunker", () => { + test("buffers bytes and emits nothing before chunkSeconds is reached", () => { + const c = makeChunker(); + const { dropped } = c.push(new Uint8Array(BYTES_PER_SEC)); // 1s + assert.equal(dropped, 0); + assert.equal(c.tryEmit(), null); + }); + + test("emits a WAV chunk once chunkSeconds of audio is buffered", () => { + const c = makeChunker(); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); // 5s of silence + const wav = c.tryEmit(); + assert.ok(wav); + assert.equal(wav!.byteLength, 44 + 5 * BYTES_PER_SEC); + // WAV header: RIFF / WAVE / fmt / data + const dv = new DataView(wav!.buffer, wav!.byteOffset, wav!.byteLength); + assert.equal(String.fromCharCode(dv.getUint8(0), dv.getUint8(1), dv.getUint8(2), dv.getUint8(3)), "RIFF"); + assert.equal(String.fromCharCode(dv.getUint8(8), dv.getUint8(9), dv.getUint8(10), dv.getUint8(11)), "WAVE"); + assert.equal(String.fromCharCode(dv.getUint8(12), dv.getUint8(13), dv.getUint8(14), dv.getUint8(15)), "fmt "); + assert.equal(String.fromCharCode(dv.getUint8(36), dv.getUint8(37), dv.getUint8(38), dv.getUint8(39)), "data"); + assert.equal(dv.getUint32(24, true), RATE); // sample rate little-endian + assert.equal(dv.getUint16(22, true), 1); // channels + assert.equal(dv.getUint16(34, true), 16); // bits per sample + assert.equal(dv.getUint32(40, true), 5 * BYTES_PER_SEC); // data size + }); + + test("tryEmit returns null while a chunk is pending ack", () => { + const c = makeChunker(); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); + assert.ok(c.tryEmit()); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); // more audio arrives + assert.equal(c.tryEmit(), null); // still pending + }); + + test("ackChunk trims the emitted chunk but keeps overlapSeconds at front", () => { + const c = makeChunker(); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); + c.tryEmit(); + c.ackChunk(); + // 2s overlap retained from the 5s chunk; next chunk needs 3s more before emitting. + c.push(new Uint8Array(2 * BYTES_PER_SEC)); + assert.equal(c.tryEmit(), null); // 2s overlap + 2s new = 4s < 5s + c.push(new Uint8Array(1 * BYTES_PER_SEC)); + const wav = c.tryEmit(); // 2 + 2 + 1 = 5s + assert.ok(wav); + assert.equal(wav!.byteLength, 44 + 5 * BYTES_PER_SEC); + }); + + test("buffer overflow drops oldest bytes and reports count", () => { + const c = makeChunker({ maxBufferSeconds: 4 }); // 4s cap, 5s chunk — overflow expected + const { dropped } = c.push(new Uint8Array(10 * BYTES_PER_SEC)); // push 10s + assert.equal(dropped, 6 * BYTES_PER_SEC); // keeps last 4s + assert.equal(c.tryEmit(), null); // only 4s < 5s chunk + }); + + test("flush returns remaining audio as WAV when ≥ minFlushSeconds, else null", () => { + const c = makeChunker(); + c.push(new Uint8Array(Math.floor(0.3 * BYTES_PER_SEC))); // 0.3s + assert.equal(c.flush(), null); + + const c2 = makeChunker(); + c2.push(new Uint8Array(Math.floor(1 * BYTES_PER_SEC))); // 1s + const wav = c2.flush(); + assert.ok(wav); + const dv = new DataView(wav!.buffer, wav!.byteOffset, wav!.byteLength); + assert.equal(dv.getUint32(40, true), 1 * BYTES_PER_SEC); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `yarn test:run tests/live/chunker.test.ts` +Expected: FAIL — module `src/audio/live/chunker` does not exist. + +- [ ] **Step 3: Implement the Chunker** + +Create `src/audio/live/chunker.ts`: + +```ts +export interface ChunkerConfig { + sampleRate: number; + channels: number; + chunkSeconds: number; + overlapSeconds: number; + maxBufferSeconds: number; + minFlushSeconds?: number; +} + +const SAMPLE_WIDTH = 2; // 16-bit PCM + +export class Chunker { + private buffer = new Uint8Array(0); + private pendingChunkBytes = 0; + private readonly bytesPerSecond: number; + private readonly chunkBytes: number; + private readonly overlapBytes: number; + private readonly maxBufferBytes: number; + private readonly minFlushBytes: number; + + constructor(private readonly config: ChunkerConfig) { + this.bytesPerSecond = config.sampleRate * config.channels * SAMPLE_WIDTH; + this.chunkBytes = Math.floor(config.chunkSeconds * this.bytesPerSecond); + this.overlapBytes = Math.floor(config.overlapSeconds * this.bytesPerSecond); + this.maxBufferBytes = Math.floor(config.maxBufferSeconds * this.bytesPerSecond); + this.minFlushBytes = Math.floor((config.minFlushSeconds ?? 0.5) * this.bytesPerSecond); + } + + push(bytes: Uint8Array): { dropped: number } { + const next = new Uint8Array(this.buffer.byteLength + bytes.byteLength); + next.set(this.buffer, 0); + next.set(bytes, this.buffer.byteLength); + this.buffer = next; + + let dropped = 0; + if (this.buffer.byteLength > this.maxBufferBytes) { + dropped = this.buffer.byteLength - this.maxBufferBytes; + this.buffer = this.buffer.slice(dropped); + } + return { dropped }; + } + + tryEmit(): Uint8Array | null { + if (this.pendingChunkBytes > 0) return null; + if (this.buffer.byteLength < this.chunkBytes) return null; + this.pendingChunkBytes = this.chunkBytes; + const pcm = this.buffer.slice(0, this.chunkBytes); + return buildWav(pcm, this.config.sampleRate, this.config.channels); + } + + ackChunk(): void { + if (this.pendingChunkBytes === 0) return; + const drop = Math.max(0, this.pendingChunkBytes - this.overlapBytes); + this.buffer = drop >= this.buffer.byteLength ? new Uint8Array(0) : this.buffer.slice(drop); + this.pendingChunkBytes = 0; + } + + flush(): Uint8Array | null { + if (this.buffer.byteLength < this.minFlushBytes) return null; + const wav = buildWav(this.buffer, this.config.sampleRate, this.config.channels); + this.buffer = new Uint8Array(0); + this.pendingChunkBytes = 0; + return wav; + } +} + +function buildWav(pcm: Uint8Array, sampleRate: number, channels: number): Uint8Array { + const bitsPerSample = SAMPLE_WIDTH * 8; + const byteRate = sampleRate * channels * SAMPLE_WIDTH; + const blockAlign = channels * SAMPLE_WIDTH; + const dataSize = pcm.byteLength; + const out = new Uint8Array(44 + dataSize); + const dv = new DataView(out.buffer); + writeAscii(dv, 0, "RIFF"); + dv.setUint32(4, 36 + dataSize, true); + writeAscii(dv, 8, "WAVE"); + writeAscii(dv, 12, "fmt "); + dv.setUint32(16, 16, true); + dv.setUint16(20, 1, true); + dv.setUint16(22, channels, true); + dv.setUint32(24, sampleRate, true); + dv.setUint32(28, byteRate, true); + dv.setUint16(32, blockAlign, true); + dv.setUint16(34, bitsPerSample, true); + writeAscii(dv, 36, "data"); + dv.setUint32(40, dataSize, true); + out.set(pcm, 44); + return out; +} + +function writeAscii(dv: DataView, offset: number, str: string): void { + for (let i = 0; i < str.length; i++) dv.setUint8(offset + i, str.charCodeAt(i)); +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `yarn test:run tests/live/chunker.test.ts` +Expected: all 6 tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/audio/live/chunker.ts tests/live/chunker.test.ts +git commit -m "feat(audio/live): add PCM16 Chunker with overlap + WAV framing" +``` + +--- + +## Task 3: Stitcher + +**Files:** +- Create: `src/audio/live/stitcher.ts` +- Test: `tests/live/stitcher.test.ts` + +- [ ] **Step 1: Write the failing test** + +Create `tests/live/stitcher.test.ts`: + +```ts +import { describe, test } from "node:test"; +import assert from "node:assert/strict"; +import { Stitcher } from "../../src/audio/live/stitcher"; + +describe("Stitcher", () => { + test("returns current text when there is no previous transcript", () => { + const s = new Stitcher(); + assert.equal(s.preview("hello world"), "hello world"); + assert.equal(s.commit("hello world"), "hello world"); + }); + + test("strips exact-match token overlap between commits", () => { + const s = new Stitcher(); + s.commit("the quick brown fox jumps"); + // next chunk starts with overlap "brown fox jumps" then continues + const out = s.commit("brown fox jumps over the lazy dog"); + assert.equal(out, "over the lazy dog"); + }); + + test("preview does not mutate state", () => { + const s = new Stitcher(); + s.commit("hello world"); + s.preview("world there"); // should match overlap "world" + const out = s.commit("world there"); // still works because state unchanged by preview + assert.equal(out, "there"); + }); + + test("fuzzy match handles single-character substitution on long tokens", () => { + const s = new Stitcher(); + s.commit("the quick brown foxes"); + // model returns "foxxs" instead of "foxes" in the overlap region + const out = s.commit("foxxs jumped high"); + assert.equal(out, "jumped high"); + }); + + test("fuzzy match handles single insertion/deletion on long tokens", () => { + const s = new Stitcher(); + s.commit("hello international world"); + // "internationl" (missing one char) + const out = s.commit("internationl world, how are you"); + assert.equal(out, ", how are you"); + }); + + test("no overlap returns current unchanged", () => { + const s = new Stitcher(); + s.commit("one two three"); + const out = s.commit("apple banana cherry"); + assert.equal(out, "apple banana cherry"); + }); + + test("empty inputs are handled", () => { + const s = new Stitcher(); + assert.equal(s.preview(""), ""); + s.commit("hello"); + assert.equal(s.commit(""), ""); + }); + + test("returns empty string when current is entirely overlap", () => { + const s = new Stitcher(); + s.commit("hello world foo bar"); + const out = s.commit("foo bar"); + assert.equal(out, ""); + }); + + test("reset clears state", () => { + const s = new Stitcher(); + s.commit("previous"); + s.reset(); + assert.equal(s.commit("previous again"), "previous again"); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `yarn test:run tests/live/stitcher.test.ts` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement the Stitcher** + +Create `src/audio/live/stitcher.ts`: + +```ts +export class Stitcher { + private prevTranscript = ""; + + preview(current: string): string { + return stitch(this.prevTranscript, current); + } + + commit(current: string): string { + const stitched = stitch(this.prevTranscript, current); + this.prevTranscript = current; + return stitched; + } + + reset(): void { + this.prevTranscript = ""; + } +} + +function stitch(prev: string, current: string): string { + if (!prev || !current) return current; + const prevTokens = tokenize(prev); + const curTokens = tokenize(current); + const overlap = findTokenOverlap(prevTokens, curTokens); + if (overlap > 0) { + const newTokens = curTokens.slice(overlap); + return newTokens.length ? newTokens.join(" ") : ""; + } + return current; +} + +function tokenize(text: string): string[] { + return text.trim().split(/\s+/).filter(Boolean); +} + +function normalizeToken(token: string): string { + return token.toLowerCase().replace(/[^a-z0-9]+/g, ""); +} + +function fuzzyTokenMatch(a: string, b: string): boolean { + const na = normalizeToken(a); + const nb = normalizeToken(b); + if (na === nb) return true; + if (na.length < 4 || nb.length < 4) return false; + if (Math.abs(na.length - nb.length) > 1) return false; + if (na.length === nb.length) { + let diffs = 0; + for (let i = 0; i < na.length; i++) if (na[i] !== nb[i]) diffs++; + return diffs <= 1; + } + const [short, long] = na.length < nb.length ? [na, nb] : [nb, na]; + let diffs = 0; + let si = 0; + let li = 0; + while (si < short.length && li < long.length) { + if (short[si] !== long[li]) { + diffs++; + li++; + } else { + si++; + li++; + } + } + return diffs + (long.length - li) <= 1; +} + +function findTokenOverlap(prevTokens: string[], curTokens: string[]): number { + const max = Math.min(prevTokens.length, curTokens.length); + for (let overlap = max; overlap > 0; overlap--) { + const prevSlice = prevTokens.slice(-overlap); + const curSlice = curTokens.slice(0, overlap); + if (prevSlice.every((t, i) => fuzzyTokenMatch(t, curSlice[i]))) return overlap; + } + return 0; +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `yarn test:run tests/live/stitcher.test.ts` +Expected: all 9 tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/audio/live/stitcher.ts tests/live/stitcher.test.ts +git commit -m "feat(audio/live): add Stitcher with token overlap + fuzzy match" +``` + +--- + +## Task 4: SSE transport + +**Files:** +- Create: `src/audio/live/sse.ts` +- Test: `tests/live/sse.test.ts` + +- [ ] **Step 1: Write the failing test** + +Create `tests/live/sse.test.ts`: + +```ts +import { describe, test, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { RequestClient } from "../../src/request"; +import { transcribeChunk } from "../../src/audio/live/sse"; + +function sseResponse(events: string[]): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + for (const e of events) controller.enqueue(encoder.encode(e)); + controller.close(); + }, + }); + return new Response(stream, { status: 200, headers: { "content-type": "text/event-stream" } }); +} + +describe("transcribeChunk (SSE)", () => { + let origFetch: typeof fetch; + beforeEach(() => { origFetch = globalThis.fetch; }); + afterEach(() => { globalThis.fetch = origFetch; }); + + test("invokes onDelta for each transcript.delta and returns final text on transcript.done", async () => { + globalThis.fetch = (async () => sseResponse([ + 'data: {"type":"transcript.delta","delta":"hello"}\n', + 'data: {"type":"transcript.delta","delta":" world"}\n', + 'data: {"type":"transcript.done","text":"hello world"}\n', + "data: [DONE]\n", + ])) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const deltas: string[] = []; + const final = await transcribeChunk( + client, + new Uint8Array([1, 2, 3]), + { language: "en", vadThreshold: 0.4 }, + (d) => deltas.push(d) + ); + assert.deepEqual(deltas, ["hello", " world"]); + assert.equal(final, "hello world"); + }); + + test("handles events split across network chunks", async () => { + globalThis.fetch = (async () => sseResponse([ + 'data: {"type":"transcript.delt', + 'a","delta":"hi"}\n', + 'data: {"type":"transcript.done","text":"hi"}\n', + ])) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const deltas: string[] = []; + const final = await transcribeChunk( + client, + new Uint8Array([1]), + { language: "en", vadThreshold: 0.4 }, + (d) => deltas.push(d) + ); + assert.deepEqual(deltas, ["hi"]); + assert.equal(final, "hi"); + }); + + test("skips malformed JSON lines without throwing", async () => { + globalThis.fetch = (async () => sseResponse([ + "data: {not valid json}\n", + 'data: {"type":"transcript.delta","delta":"ok"}\n', + 'data: {"type":"transcript.done","text":"ok"}\n', + ])) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const deltas: string[] = []; + const final = await transcribeChunk( + client, + new Uint8Array([1]), + { language: "en", vadThreshold: 0.4 }, + (d) => deltas.push(d) + ); + assert.deepEqual(deltas, ["ok"]); + assert.equal(final, "ok"); + }); + + test("throws on non-2xx with truncated body in message", async () => { + globalThis.fetch = (async () => new Response("server on fire".repeat(50), { status: 500 })) as typeof fetch; + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + await assert.rejects( + transcribeChunk(client, new Uint8Array([1]), { language: "en", vadThreshold: 0.4 }, () => {}), + /Transcribe failed 500/ + ); + }); + + test("forwards query params including translate when set", async () => { + let captured = ""; + globalThis.fetch = (async (url: any) => { + captured = String(url); + return sseResponse(['data: {"type":"transcript.done","text":""}\n']); + }) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + await transcribeChunk( + client, + new Uint8Array([1]), + { language: "fr", vadThreshold: 0.5, translate: true }, + () => {} + ); + assert.match(captured, /stream=true/); + assert.match(captured, /vad=true/); + assert.match(captured, /vad_threshold=0\.5/); + assert.match(captured, /language=fr/); + assert.match(captured, /translate=true/); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `yarn test:run tests/live/sse.test.ts` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement the SSE transport** + +Create `src/audio/live/sse.ts`: + +```ts +import { RequestClient } from "../../request"; + +export interface SSETranscribeParams { + language: string; + vadThreshold: number; + translate?: boolean; +} + +export async function transcribeChunk( + client: RequestClient, + wavBuf: Uint8Array, + params: SSETranscribeParams, + onDelta: (text: string) => void, + signal?: AbortSignal +): Promise { + const searchParams: Record = { + stream: "true", + vad: "true", + vad_threshold: params.vadThreshold, + language: params.language, + translate: params.translate ? "true" : undefined, + }; + + const resp = await client.fetchJSSStream( + "/v1/ai/transcribe", + "POST", + wavBuf, + searchParams, + { "Content-Type": "audio/wav" }, + signal + ); + + if (!resp.ok) { + const text = await resp.text().catch(() => ""); + throw new Error(`Transcribe failed ${resp.status}: ${text.slice(0, 200)}`); + } + + if (!resp.body) { + throw new Error("Transcribe response has no body"); + } + + const reader = resp.body.getReader(); + const decoder = new TextDecoder(); + let buf = ""; + let finalText = ""; + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + let idx: number; + while ((idx = buf.indexOf("\n")) !== -1) { + const line = buf.slice(0, idx).trim(); + buf = buf.slice(idx + 1); + if (!line.startsWith("data:")) continue; + const data = line.slice(5).trim(); + if (!data || data === "[DONE]") continue; + let event: any; + try { + event = JSON.parse(data); + } catch { + continue; + } + if (event.type === "transcript.delta" && typeof event.delta === "string") { + onDelta(event.delta); + } else if ( + (event.type === "transcript.done" || event.type === "transcript.final") && + typeof event.text === "string" + ) { + finalText = event.text.trim(); + } + } + } + } finally { + reader.releaseLock?.(); + } + + return finalText; +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `yarn test:run tests/live/sse.test.ts` +Expected: all 5 tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/audio/live/sse.ts tests/live/sse.test.ts +git commit -m "feat(audio/live): add SSE transcript parser" +``` + +--- + +## Task 5: Add LiveSTT types to interfaces + +**Files:** +- Modify: `src/audio/interfaces.ts` + +- [ ] **Step 1: Append the live-stt types** + +Add these exports at the bottom of `src/audio/interfaces.ts`: + +```ts +export interface LiveSTTConfig { + language?: LanguageCodes | "auto"; + sampleRate?: number; + channels?: 1 | 2; + translate?: boolean; + chunkSeconds?: number; + overlapSeconds?: number; + vadThreshold?: number; + maxBufferSeconds?: number; +} + +export interface LiveSTTDelta { + text: string; + chunkIndex: number; +} + +export interface LiveSTTTurn { + text: string; + chunkIndex: number; + isFinal: boolean; +} + +export interface LiveSTTWarning { + code: "buffer_overflow" | "chunk_error"; + message: string; +} + +export interface LiveSTTEvents { + open: (payload: { id: string }) => void; + delta: (payload: LiveSTTDelta) => void; + turn: (payload: LiveSTTTurn) => void; + warning: (payload: LiveSTTWarning) => void; + error: (err: Error) => void; + close: () => void; +} + +export interface LiveTranscriber { + on(event: E, handler: LiveSTTEvents[E]): this; + off(event: E, handler: LiveSTTEvents[E]): this; + connect(): Promise; + stream(): WritableStream; + close(): Promise; +} +``` + +- [ ] **Step 2: Verify the file compiles** + +Run: `yarn build` +Expected: pkgroll build succeeds. If it fails with "LanguageCodes not found", the existing import in this file already resolves it — no extra import needed. + +- [ ] **Step 3: Commit** + +```bash +git add src/audio/interfaces.ts +git commit -m "feat(audio): add LiveSTT types" +``` + +--- + +## Task 6: Transcriber class + +**Files:** +- Create: `src/audio/live/transcriber.ts` +- Test: `tests/live/transcriber.test.ts` + +- [ ] **Step 1: Write the failing test** + +Create `tests/live/transcriber.test.ts`: + +```ts +import { describe, test, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { RequestClient } from "../../src/request"; +import { Transcriber } from "../../src/audio/live/transcriber"; + +const RATE = 16000; +const BYTES_PER_SEC = RATE * 2; + +function encode(events: string[]): ReadableStream { + const enc = new TextEncoder(); + return new ReadableStream({ + start(c) { + for (const e of events) c.enqueue(enc.encode(e)); + c.close(); + }, + }); +} + +function mockFetchSequence(responses: Array<() => Response>): () => typeof fetch { + let i = 0; + const original = globalThis.fetch; + globalThis.fetch = (async () => { + const idx = Math.min(i++, responses.length - 1); + return responses[idx](); + }) as typeof fetch; + return () => { + globalThis.fetch = original; + }; +} + +function successResponse(deltas: string[], final: string): Response { + const evts = [ + ...deltas.map((d) => `data: ${JSON.stringify({ type: "transcript.delta", delta: d })}\n`), + `data: ${JSON.stringify({ type: "transcript.done", text: final })}\n`, + ]; + return new Response(encode(evts), { status: 200 }); +} + +function makeTranscriber() { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + return new Transcriber(client, { sampleRate: RATE, channels: 1, chunkSeconds: 5, overlapSeconds: 2, maxBufferSeconds: 30 }); +} + +async function writeBytes(writer: WritableStreamDefaultWriter, bytes: Uint8Array, chunkSize = 0) { + if (chunkSize <= 0) { + await writer.write(bytes); + return; + } + for (let offset = 0; offset < bytes.byteLength; offset += chunkSize) { + await writer.write(bytes.slice(offset, Math.min(offset + chunkSize, bytes.byteLength))); + } +} + +describe("Transcriber", () => { + let restore: (() => void) | null = null; + beforeEach(() => { restore = null; }); + afterEach(() => { restore?.(); }); + + test("connect() emits open once with a session id", async () => { + const t = makeTranscriber(); + const opens: any[] = []; + t.on("open", (p) => opens.push(p)); + await t.connect(); + assert.equal(opens.length, 1); + assert.equal(typeof opens[0].id, "string"); + assert.ok(opens[0].id.length > 0); + }); + + test("stream() before connect() throws", () => { + const t = makeTranscriber(); + assert.throws(() => t.stream(), /open/); + }); + + test("invalid config rejects connect()", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const bad = new Transcriber(client, { chunkSeconds: 2, overlapSeconds: 3 }); + await assert.rejects(bad.connect(), /chunkSeconds/); + }); + + test("pipes PCM bytes, emits delta+turn+close, in order, with correct chunkIndex", async () => { + restore = mockFetchSequence([ + () => successResponse(["hello"], "hello world"), + () => successResponse(["bye"], "bye now"), + ]); + const t = makeTranscriber(); + const events: Array<{ kind: string; payload?: any }> = []; + t.on("open", (p) => events.push({ kind: "open", payload: p })); + t.on("delta", (p) => events.push({ kind: "delta", payload: p })); + t.on("turn", (p) => events.push({ kind: "turn", payload: p })); + t.on("close", () => events.push({ kind: "close" })); + await t.connect(); + + const writer = t.stream().getWriter(); + // 10s of silence → triggers two chunks (5s each, serialized) + await writeBytes(writer, new Uint8Array(10 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + + const kinds = events.map((e) => e.kind); + assert.equal(kinds[0], "open"); + assert.equal(kinds[kinds.length - 1], "close"); + + const turns = events.filter((e) => e.kind === "turn").map((e) => e.payload); + assert.ok(turns.length >= 1); + assert.equal(turns[0].chunkIndex, 0); + assert.equal(typeof turns[0].text, "string"); + }); + + test("empty transcripts are suppressed", async () => { + restore = mockFetchSequence([() => successResponse([], "")]); + const t = makeTranscriber(); + const turns: any[] = []; + const deltas: any[] = []; + t.on("delta", (p) => deltas.push(p)); + t.on("turn", (p) => turns.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(5 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + assert.equal(deltas.length, 0); + assert.equal(turns.length, 0); + }); + + test("single chunk HTTP error emits warning and continues", async () => { + restore = mockFetchSequence([ + () => new Response("boom", { status: 500 }), + () => successResponse(["ok"], "ok"), + ]); + const t = makeTranscriber(); + const warnings: any[] = []; + const turns: any[] = []; + t.on("warning", (p) => warnings.push(p)); + t.on("turn", (p) => turns.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(10 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + assert.ok(warnings.some((w) => w.code === "chunk_error")); + assert.ok(turns.some((tr) => tr.text === "ok")); + }); + + test("3 consecutive chunk errors escalate to fatal error", async () => { + restore = mockFetchSequence([ + () => new Response("e", { status: 500 }), + () => new Response("e", { status: 500 }), + () => new Response("e", { status: 500 }), + ]); + const t = makeTranscriber(); + let errored: Error | null = null; + let closed = false; + t.on("error", (e) => { errored = e; }); + t.on("close", () => { closed = true; }); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(15 * BYTES_PER_SEC)); // 3 chunks + try { await writer.close(); } catch {} + await t.close(); + assert.ok(errored); + assert.match((errored as Error).message, /3 consecutive/); + assert.equal(closed, true); + }); + + test("close() flushes remaining ≥ 0.5s buffer as final chunk", async () => { + restore = mockFetchSequence([() => successResponse(["tail"], "tail")]); + const t = makeTranscriber(); + const turns: any[] = []; + t.on("turn", (p) => turns.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(1 * BYTES_PER_SEC)); // only 1s (below chunkSeconds) + await writer.close(); + await t.close(); + assert.equal(turns.length, 1); + assert.equal(turns[0].isFinal, true); + assert.equal(turns[0].text, "tail"); + }); + + test("buffer overflow emits warning", async () => { + // Mock fetch so the flush at close() time (if any) doesn't hit the network. + restore = mockFetchSequence([() => successResponse([], "")]); + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const t = new Transcriber(client, { chunkSeconds: 5, overlapSeconds: 2, maxBufferSeconds: 4 }); + const warnings: any[] = []; + t.on("warning", (p) => warnings.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writer.write(new Uint8Array(10 * BYTES_PER_SEC)); + try { await writer.close(); } catch {} + await t.close(); + assert.ok(warnings.some((w) => w.code === "buffer_overflow")); + }); +}); +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `yarn test:run tests/live/transcriber.test.ts` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement the Transcriber** + +Create `src/audio/live/transcriber.ts`: + +```ts +import { RequestClient } from "../../request"; +import { LiveSTTConfig, LiveSTTEvents, LiveTranscriber } from "../interfaces"; +import { Chunker } from "./chunker"; +import { transcribeChunk } from "./sse"; +import { Stitcher } from "./stitcher"; + +type State = "idle" | "open" | "closing" | "closed" | "errored"; + +const DEFAULTS = { + language: "en" as const, + sampleRate: 16000, + channels: 1 as 1 | 2, + translate: false, + chunkSeconds: 5, + overlapSeconds: 2, + vadThreshold: 0.4, + maxBufferSeconds: 30, +}; + +const MAX_CONSECUTIVE_ERRORS = 3; +const CHUNK_TIMEOUT_MS = 30_000; + +export class Transcriber implements LiveTranscriber { + private readonly listeners = new Map>(); + private state: State = "idle"; + private readonly cfg: Required; + private readonly chunker: Chunker; + private readonly stitcher = new Stitcher(); + private chunkIndex = 0; + private consecutiveErrors = 0; + private inFlight: Promise | null = null; + private readonly topAbort = new AbortController(); + private closePromise: Promise | null = null; + private sessionId = ""; + + constructor(private readonly client: RequestClient, config?: LiveSTTConfig) { + this.cfg = { ...DEFAULTS, ...(config ?? {}) } as Required; + this.chunker = new Chunker({ + sampleRate: this.cfg.sampleRate, + channels: this.cfg.channels, + chunkSeconds: this.cfg.chunkSeconds, + overlapSeconds: this.cfg.overlapSeconds, + maxBufferSeconds: this.cfg.maxBufferSeconds, + }); + } + + on(event: E, handler: LiveSTTEvents[E]): this { + let set = this.listeners.get(event); + if (!set) { + set = new Set(); + this.listeners.set(event, set); + } + set.add(handler as unknown as Function); + return this; + } + + off(event: E, handler: LiveSTTEvents[E]): this { + this.listeners.get(event)?.delete(handler as unknown as Function); + return this; + } + + private emit(event: E, ...args: Parameters): void { + const set = this.listeners.get(event); + if (!set) return; + for (const h of set) { + try { + (h as (...a: any[]) => void)(...(args as any[])); + } catch { + // swallow listener errors to preserve session + } + } + } + + async connect(): Promise { + if (this.state !== "idle") throw new Error("connect() called on transcriber that is not idle"); + if (!(this.cfg.sampleRate > 0)) throw new Error("sampleRate must be > 0"); + if (!(this.cfg.channels === 1 || this.cfg.channels === 2)) throw new Error("channels must be 1 or 2"); + if (!(this.cfg.chunkSeconds > this.cfg.overlapSeconds && this.cfg.overlapSeconds > 0)) + throw new Error("chunkSeconds > overlapSeconds > 0 required"); + if (!(this.cfg.maxBufferSeconds > this.cfg.chunkSeconds)) + throw new Error("maxBufferSeconds must be > chunkSeconds"); + this.sessionId = crypto.randomUUID(); + this.state = "open"; + this.emit("open", { id: this.sessionId }); + } + + stream(): WritableStream { + if (this.state !== "open") throw new Error("stream() can only be called on an open transcriber"); + return new WritableStream({ + write: async (bytes) => { + if (this.state !== "open") return; + const { dropped } = this.chunker.push(bytes); + if (dropped > 0) { + this.emit("warning", { + code: "buffer_overflow", + message: `dropped ${dropped} bytes to stay under maxBufferSeconds`, + }); + } + this.pump(); + }, + close: async () => { + await this.finalize(); + }, + abort: async (reason) => { + this.fail(reason instanceof Error ? reason : new Error(String(reason ?? "stream aborted"))); + }, + }); + } + + private pump(): void { + if (this.inFlight) return; + if (this.state !== "open" && this.state !== "closing") return; + const chunk = this.chunker.tryEmit(); + if (!chunk) return; + const idx = this.chunkIndex++; + this.inFlight = this.processChunk(chunk, idx, false).finally(() => { + this.inFlight = null; + if (this.state === "open") this.pump(); + }); + } + + private async processChunk(wav: Uint8Array, idx: number, isFinal: boolean): Promise { + let committed = ""; + const chunkAbort = new AbortController(); + const onTop = () => chunkAbort.abort(this.topAbort.signal.reason ?? new Error("aborted")); + this.topAbort.signal.addEventListener("abort", onTop); + const timer = setTimeout(() => chunkAbort.abort(new Error("transcribe timeout")), CHUNK_TIMEOUT_MS); + + try { + committed = await transcribeChunk( + this.client, + wav, + { language: this.cfg.language, vadThreshold: this.cfg.vadThreshold, translate: this.cfg.translate }, + (delta) => { + if (this.state !== "open" && this.state !== "closing") return; + const preview = this.stitcher.preview(delta); + if (preview) this.emit("delta", { text: preview, chunkIndex: idx }); + }, + chunkAbort.signal + ); + this.consecutiveErrors = 0; + } catch (err: any) { + if (this.topAbort.signal.aborted) { + this.chunker.ackChunk(); + return; + } + this.consecutiveErrors++; + this.emit("warning", { code: "chunk_error", message: err?.message ?? String(err) }); + this.chunker.ackChunk(); + if (this.consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { + this.fail(new Error("live stt aborted after 3 consecutive chunk failures")); + } + return; + } finally { + clearTimeout(timer); + this.topAbort.signal.removeEventListener("abort", onTop); + } + + this.chunker.ackChunk(); + if (committed) { + const stitched = this.stitcher.commit(committed); + if (stitched) this.emit("turn", { text: stitched, chunkIndex: idx, isFinal }); + } + } + + private async finalize(): Promise { + if (this.closePromise) return this.closePromise; + this.closePromise = (async () => { + if (this.state === "closed") return; + if (this.state === "errored") { + this.emitClose(); + return; + } + if (this.state !== "open") return; + this.state = "closing"; + while (this.inFlight) { + try { + await this.inFlight; + } catch { + // errors already surfaced via events + } + } + const flush = this.chunker.flush(); + if (flush && (this.state as State) === "closing") { + await this.processChunk(flush, this.chunkIndex++, true); + } + this.emitClose(); + })(); + return this.closePromise; + } + + async close(): Promise { + return this.finalize(); + } + + private emitClose(): void { + if (this.state === "closed") return; + this.state = "closed"; + this.emit("close"); + } + + private fail(err: Error): void { + if (this.state === "errored" || this.state === "closed") return; + this.state = "errored"; + this.topAbort.abort(err); + this.emit("error", err); + this.emitClose(); + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `yarn test:run tests/live/transcriber.test.ts` +Expected: all 8 tests pass. + +If any test times out, the most likely cause is the pump loop not being re-entered after an error path — re-check the `.finally` block on `inFlight` and the error-path early returns. + +- [ ] **Step 5: Commit** + +```bash +git add src/audio/live/transcriber.ts tests/live/transcriber.test.ts +git commit -m "feat(audio/live): add Transcriber with state machine + events" +``` + +--- + +## Task 7: Wire `speech_to_text_live` onto the Audio API + +**Files:** +- Modify: `src/audio/audio.ts` + +- [ ] **Step 1: Add the method** + +Modify `src/audio/audio.ts` — add the import and the method: + +Change the imports at the top: + +```ts +import { RequestClient } from "../request"; +import { createFileUploadFormData } from "../utils"; +import { + LiveSTTConfig, + LiveTranscriber, + SpeechToTextParams, + SpeechToTextParamsWithWebhook, + SpeechToTextParamsWithoutWebhook, + SpeechToTextResponse, + SpeechToTextWebhookResponse, +} from "./interfaces"; +import { Transcriber } from "./live/transcriber"; +``` + +Add this method inside the `Audio` class, after the existing `speech_to_text` implementation: + +```ts + speech_to_text_live(config?: LiveSTTConfig): LiveTranscriber { + return new Transcriber(this.client, config); + } +``` + +- [ ] **Step 2: Verify the build succeeds** + +Run: `yarn build` +Expected: pkgroll build succeeds. No type errors. + +- [ ] **Step 3: Add a smoke test for the wiring** + +Append to `tests/live/transcriber.test.ts` at the end of the `describe` block (inside it): + +```ts + test("jigsaw.audio.speech_to_text_live is exposed on the SDK", async () => { + const { JigsawStack } = await import("../../index"); + const jigsaw = JigsawStack({ apiKey: "k", baseURL: "https://api.test" }); + const t = jigsaw.audio.speech_to_text_live({ chunkSeconds: 5, overlapSeconds: 2 }); + await t.connect(); + await t.close(); + assert.ok(t); // connect + close roundtrip succeeded + }); +``` + +- [ ] **Step 4: Run tests to verify** + +Run: `yarn test:run tests/live/transcriber.test.ts` +Expected: all tests (including the new one) pass. + +- [ ] **Step 5: Commit** + +```bash +git add src/audio/audio.ts tests/live/transcriber.test.ts +git commit -m "feat(audio): expose speech_to_text_live on audio namespace" +``` + +--- + +## Task 8: Add examples/live-mic.js reference example + +**Files:** +- Create: `examples/live-mic.js` +- (Conditional) Modify: `package.json` — only if `node-record-lpcm16` is present in `dependencies` +- (Conditional) Delete: `node-record.js` — only if present at repo root + +**Note:** This branch was started from `main`, so the scratch `node-record.js` file and the `node-record-lpcm16` dependency may not exist. Step 1 and Step 3 are no-ops in that case. + +- [ ] **Step 1: Remove `node-record-lpcm16` from dependencies (if present)** + +Check `package.json`. If `dependencies` contains `"node-record-lpcm16": ...`, delete that line and run `yarn install` to update `yarn.lock`. If it's not present, skip this step. + +- [ ] **Step 2: Create `examples/` directory and the example file** + +Create `examples/live-mic.js`: + +```js +import { Readable } from "stream"; +import recorder from "node-record-lpcm16"; +import { JigsawStack } from "jigsawstack"; + +const jigsaw = JigsawStack({ apiKey: process.env.JIGSAWSTACK_API_KEY }); + +const transcriber = jigsaw.audio.speech_to_text_live({ + language: "en", + sampleRate: 16000, + channels: 1, +}); + +transcriber.on("open", ({ id }) => console.log("session", id)); +transcriber.on("delta", ({ text }) => process.stdout.write(`\r… ${text}`)); +transcriber.on("turn", ({ text }) => console.log(`\n${text}`)); +transcriber.on("warning", ({ code, message }) => console.warn("\n[warn]", code, message)); +transcriber.on("error", (err) => console.error("\n[error]", err)); +transcriber.on("close", () => console.log("\n[done]")); + +await transcriber.connect(); + +const rec = recorder.record({ + sampleRate: 16000, + channels: 1, + audioType: "raw", + recorder: "sox", + encoding: "signed-integer", + endianness: "little", + bits: 16, +}); + +Readable.toWeb(rec.stream()).pipeTo(transcriber.stream()); + +process.on("SIGINT", async () => { + rec.stop(); + await transcriber.close(); + process.exit(0); +}); +``` + +- [ ] **Step 3: Delete the root `node-record.js` (if present)** + +If a file `node-record.js` exists at the repo root, delete it with `rm node-record.js`. Otherwise skip. + +- [ ] **Step 4: Verify tests still pass and the build still succeeds** + +Run: `yarn build && yarn test:run tests/live/*.ts tests/live/request-stream.test.ts` +Expected: all pass. + +- [ ] **Step 5: Commit** + +Stage whichever of these actually changed (skip files that are already in the desired state): + +```bash +git add examples/live-mic.js +# if package.json / yarn.lock changed: +git add package.json yarn.lock +# if node-record.js existed and was deleted: +git rm node-record.js +git commit -m "chore: add examples/live-mic.js reference" +``` + +--- + +## Task 9: Add opt-in live integration test + +**Files:** +- Create: `tests/audio-live.test.ts` +- Modify: `package.json` (add `test:audio:live` script) + +- [ ] **Step 1: Download a short fixture audio (reuse the existing preview URL)** + +The test streams the remote preview file as PCM bytes. No new fixture needed — we reuse the sample at `https://jigsawstack.com/preview/stt-example.wav` and decode its PCM payload. + +- [ ] **Step 2: Create the test** + +Create `tests/audio-live.test.ts`: + +```ts +import { describe, test } from "node:test"; +import assert from "node:assert/strict"; +import { createJigsawStackClient } from "./test-helpers.js"; + +const PREVIEW_WAV_URL = "https://jigsawstack.com/preview/stt-example.wav"; + +async function fetchPcm16(url: string): Promise<{ pcm: Uint8Array; sampleRate: number; channels: number }> { + const resp = await fetch(url); + if (!resp.ok) throw new Error(`fixture fetch failed ${resp.status}`); + const buf = new Uint8Array(await resp.arrayBuffer()); + const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength); + // Minimal WAV parse: seek to "data" chunk. + const channels = dv.getUint16(22, true); + const sampleRate = dv.getUint32(24, true); + let offset = 12; + while (offset < buf.byteLength - 8) { + const id = String.fromCharCode(buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3]); + const size = dv.getUint32(offset + 4, true); + if (id === "data") { + return { pcm: buf.slice(offset + 8, offset + 8 + size), sampleRate, channels }; + } + offset += 8 + size; + } + throw new Error("data chunk not found"); +} + +describe("Live STT (integration)", { skip: !process.env.JIGSAWSTACK_API_KEY }, () => { + test("streams PCM through transcriber and receives at least one turn", async () => { + const client = createJigsawStackClient(); + const { pcm, sampleRate, channels } = await fetchPcm16(PREVIEW_WAV_URL); + + const transcriber = client.audio.speech_to_text_live({ + language: "en", + sampleRate, + channels: channels === 2 ? 2 : 1, + }); + + const turns: string[] = []; + transcriber.on("turn", ({ text }) => turns.push(text)); + + await transcriber.connect(); + const writer = transcriber.stream().getWriter(); + // Write the whole fixture in ~0.5s slices to mimic a live stream. + const sliceBytes = sampleRate * channels * 2 * 0.5; + for (let off = 0; off < pcm.byteLength; off += sliceBytes) { + await writer.write(pcm.slice(off, Math.min(off + sliceBytes, pcm.byteLength))); + } + await writer.close(); + await transcriber.close(); + + assert.ok(turns.length > 0, "expected at least one turn"); + assert.ok(turns.join(" ").length > 0, "expected non-empty transcript"); + }); +}); +``` + +- [ ] **Step 3: Add the script to package.json** + +Modify `package.json` — add under `scripts` (keep alphabetical-ish grouping with other `test:*` entries): + +```json + "test:audio:live": "yarn test:run tests/audio-live.test.ts", +``` + +- [ ] **Step 4: Run the live test locally with an API key** + +Run: `JIGSAWSTACK_API_KEY= yarn test:audio:live` +Expected: test passes. Without the env var, the describe block is skipped — `yarn test:audio:live` exits clean. + +- [ ] **Step 5: Commit** + +```bash +git add tests/audio-live.test.ts package.json +git commit -m "test(audio): add opt-in live STT integration test" +``` + +--- + +## Task 10: README — document the new method + +**Files:** +- Modify: `README.md` + +- [ ] **Step 1: Add a short section to the README** + +Append this section to `README.md` after the existing usage examples (before any closing section): + +```markdown + +### Live speech-to-text (streaming) + +Pipe real-time PCM16 audio (microphone, WebRTC, file) into the SDK and receive incremental and committed transcripts. + +```js +import { Readable } from "stream"; +import recorder from "node-record-lpcm16"; +import { JigsawStack } from "jigsawstack"; + +const jigsaw = JigsawStack({ apiKey: process.env.JIGSAWSTACK_API_KEY }); +const transcriber = jigsaw.audio.speech_to_text_live({ + language: "en", + sampleRate: 16000, + channels: 1, +}); + +transcriber.on("delta", ({ text }) => process.stdout.write(`\r… ${text}`)); +transcriber.on("turn", ({ text }) => console.log(`\n${text}`)); + +await transcriber.connect(); + +const rec = recorder.record({ sampleRate: 16000, channels: 1, audioType: "raw" }); +Readable.toWeb(rec.stream()).pipeTo(transcriber.stream()); + +process.on("SIGINT", async () => { + rec.stop(); + await transcriber.close(); + process.exit(); +}); +``` + +See `examples/live-mic.js` for a full working example. +``` + +- [ ] **Step 2: Commit** + +```bash +git add README.md +git commit -m "docs: document speech_to_text_live in README" +``` + +--- + +## Final verification + +- [ ] **Step 1: Run the full build and non-live test suite** + +Run: `yarn build` +Expected: clean build. + +Run: `yarn test:run tests/live/*.ts tests/live/request-stream.test.ts` +Expected: all unit + integration tests pass. + +- [ ] **Step 2: Format check** + +Run: `yarn format` +Expected: biome applies any formatting fixes. Commit the resulting diff if non-empty: + +```bash +git add -u +git commit -m "chore: apply biome formatter" +``` + +- [ ] **Step 3: Smoke-run the example (optional, requires sox + a mic)** + +Run: `brew install sox` (if not already), then `JIGSAWSTACK_API_KEY= node examples/live-mic.js` +Expected: speak for ~15s; preview text streams, committed turns print on newlines, Ctrl-C exits cleanly. diff --git a/docs/superpowers/specs/2026-04-21-live-stt-design.md b/docs/superpowers/specs/2026-04-21-live-stt-design.md new file mode 100644 index 0000000..5bcafc3 --- /dev/null +++ b/docs/superpowers/specs/2026-04-21-live-stt-design.md @@ -0,0 +1,249 @@ +# Live Speech-to-Text — Design + +## Goal + +Expose a live streaming speech-to-text transcriber on the JigsawStack JS SDK so users can pipe real-time audio (microphone, WebRTC, file stream) and receive incremental + committed transcripts. The SDK must internally handle chunking, overlap stitching, and SSE parsing because the transcribe endpoint is HTTP/SSE, not WebSocket. + +## Non-goals + +- Mic capture helper (users bring their own audio source). +- Official browser support (the implementation is WHATWG-stream-based so it will likely work, but v1 is Node-tested only). +- Parallel chunk transcription. +- Custom pluggable stitcher/chunker. +- Automatic mid-stream language switching. +- Persistent server-side session state. +- Retry of failed chunks (a failed chunk is dropped; the next chunk picks up from retained overlap audio). + +## Public API + +```ts +const transcriber = jigsaw.audio.speech_to_text_live(config?: LiveSTTConfig); +``` + +### `LiveSTTConfig` + +| Field | Type | Default | Purpose | +|------------------------|-------------------------------------|---------|---------| +| `language` | `LanguageCodes \| "auto"` | `"en"` | Forwarded to transcribe endpoint | +| `sampleRate` | `number` | `16000` | PCM16 sample rate of piped bytes | +| `channels` | `1 \| 2` | `1` | PCM16 channel count | +| `translate` | `boolean` | `false` | Server-side translation to English | +| `chunkSeconds` | `number` | `5` | Chunk size sent per transcribe request | +| `overlapSeconds` | `number` | `2` | Audio retained between chunks for stitching | +| `vadThreshold` | `number` | `0.4` | Forwarded as `vad_threshold` query param | +| `maxBufferSeconds` | `number` | `30` | Upper bound on internal buffer before oldest frames are dropped | + +### `LiveTranscriber` + +```ts +interface LiveTranscriber { + on(event: E, handler: LiveSTTEvents[E]): this; + off(event: E, handler: LiveSTTEvents[E]): this; + connect(): Promise; + stream(): WritableStream; + close(): Promise; +} +``` + +- `connect()` validates config (`sampleRate > 0`, `channels ∈ {1,2}`, `chunkSeconds > overlapSeconds > 0`, `maxBufferSeconds > chunkSeconds`), generates a session id via `crypto.randomUUID()` (Node ≥18 built-in), transitions to `open`, emits `open`. No network call. Throws if called twice. +- `stream()` returns the sink. Calling before `connect()` throws. Users pipe raw PCM16 little-endian bytes at the configured sample rate/channel count. +- `close()` stops accepting writes, awaits the in-flight chunk, flushes any remaining buffered audio ≥ 0.5s as a final chunk (`isFinal: true`), emits `close`. Idempotent — repeated calls return the same promise. + +### Events + +```ts +interface LiveSTTEvents { + open: (payload: { id: string }) => void; + delta: (payload: { text: string; chunkIndex: number }) => void; + turn: (payload: { text: string; chunkIndex: number; isFinal: boolean }) => void; + warning: (payload: { code: "buffer_overflow" | "chunk_error"; message: string }) => void; + error: (err: Error) => void; + close: () => void; +} +``` + +- `open` fires exactly once after `connect()`. `id` is a locally-generated UUID for user-side logging/correlation (no server session exists). +- `delta` fires as SSE `transcript.delta` events stream in during a chunk. `text` is the preview already stitched against the last committed transcript. Drives ephemeral preview UI. +- `turn` fires once per chunk on `transcript.done`/`transcript.final`, after stitching. `isFinal` is true only on the flush-on-close chunk. +- `warning` fires for non-fatal issues (buffer overflow drops, single-chunk HTTP errors). Session continues. +- `error` fires for fatal issues (invalid config, 3 consecutive chunk failures, stream aborted). Terminates session. +- `close` fires exactly once — either after clean flush or after a fatal error. + +**Ordering guarantees:** `delta` events for chunk N precede `turn` for chunk N; `turn` for chunk N precedes any event for chunk N+1; `close` is always last. Empty deltas/turns (silent chunks) are suppressed. + +### Example + +```ts +import { Readable } from "stream"; +import recorder from "node-record-lpcm16"; +import { JigsawStack } from "jigsawstack"; + +const jigsaw = JigsawStack({ apiKey: process.env.JIGSAWSTACK_API_KEY }); +const transcriber = jigsaw.audio.speech_to_text_live({ language: "en" }); + +transcriber.on("open", ({ id }) => console.log("session", id)); +transcriber.on("delta", ({ text }) => process.stdout.write(`\r… ${text}`)); +transcriber.on("turn", ({ text }) => console.log(`\n${text}`)); +transcriber.on("warning", ({ code, message }) => console.warn(code, message)); +transcriber.on("error", (err) => console.error(err)); +transcriber.on("close", () => console.log("done")); + +await transcriber.connect(); + +const rec = recorder.record({ sampleRate: 16000, channels: 1, audioType: "raw" }); +Readable.toWeb(rec.stream()).pipeTo(transcriber.stream()); + +process.on("SIGINT", async () => { + rec.stop(); + await transcriber.close(); + process.exit(); +}); +``` + +## Internal architecture + +``` +src/audio/ +├── audio.ts # existing, gains speech_to_text_live method +├── interfaces.ts # existing, gains LiveSTT types +└── live/ + ├── transcriber.ts # public class: event emitter, lifecycle, WritableStream sink + ├── chunker.ts # PCM buffer → WAV chunks with overlap retention + ├── stitcher.ts # token overlap detection, fuzzy match + └── sse.ts # POST + SSE parser via RequestClient.fetchJSSStream +``` + +### Data flow per chunk + +``` +user pipe + → WritableStream (transcriber.stream()) + → Chunker.push(bytes) + → when buffered ≥ chunkSeconds: Chunker.emit(wavBuf) + → SSE.transcribe(wavBuf, onDelta) + → each delta: Stitcher.preview(delta) → emit "delta" + → on done: Stitcher.commit(text) → emit "turn" + → Chunker.trimToOverlap() +``` + +### Components + +**`Chunker`** — maintains a rolling `Uint8Array` buffer of PCM16 bytes; knows sample rate, channels, chunk size, overlap. `push(bytes) → wavBuf | null` appends and returns a complete WAV when the buffer crosses `chunkSeconds`. `trimToOverlap()` retains only the last `overlapSeconds` worth of audio. `flush() → wavBuf | null` returns remaining audio if ≥ 0.5s, otherwise null. Pure — no I/O, no events. + +**`Stitcher`** — holds `prevTranscript` state. `preview(deltaStreamText) → string` returns the stitched preview (does not mutate state). `commit(chunkText) → string` returns the stitched committed text and updates state. Fuzzy token match handles minor diffs in the overlap region (exact match, single-char substitution, single insertion/deletion on tokens ≥ 4 chars). Pure. + +**`SSE`** — `transcribe(wavBuf, onDelta) → Promise`. Calls `RequestClient.fetchJSSStream("/v1/ai/transcribe", "POST", wavBuf, { stream: true, vad: true, vad_threshold, language }, { "Content-Type": "audio/wav" })`. Iterates `resp.body`, parses SSE lines (`data: ...`, `[DONE]` sentinel, malformed JSON skipped not thrown). Invokes `onDelta(text)` for each `transcript.delta`, resolves with final text on `transcript.done`/`transcript.final`. 30s `AbortSignal.timeout` per request. + +**`Transcriber`** — owns lifecycle state machine, emits events, implements the `WritableStream` sink via `new WritableStream({ write, close, abort })`. Serializes chunk processing: only one in-flight SSE request at a time (stitching state requires strict chunk ordering). Owns the buffer-overflow drop policy. Tracks consecutive chunk error count for fatal escalation. + +### Why serial requests + +Overlap stitching needs `prevTranscript` from chunk N to stitch chunk N+1. Parallel requests would produce out-of-order stitches and duplicated overlap regions. Serial is also cheaper on the server and fine in practice (5s chunks, typical transcribe latency < 2s). + +### Buffer overflow + +If a `push` would extend the buffer past `maxBufferSeconds` of audio, drop the oldest frames (shift the buffer forward by the overflow amount) and emit one `warning { code: "buffer_overflow" }`. Prevents OOM when the server lags or transcribe requests back up. + +### RequestClient extension + +Add `fetchJSSStream(path, method, body?, searchParams?, headers?): Promise` to `src/request.ts`. Same auth/base-URL logic as `fetchJSS`, but returns the raw `Response` without reading the body, so the SSE module can iterate `response.body`. Keeps auth/header handling in one place. + +## Lifecycle + error handling + +``` +idle → connecting → open → closing → closed + ↓ + errored → closed +``` + +| Source | Classification | Behavior | +|------------------------------------------|----------------|----------| +| Invalid config at `connect()` | fatal | throw + `error` + `close` | +| Single chunk HTTP failure (4xx/5xx/timeout) | warning | emit `warning { code: "chunk_error" }`, continue | +| 3 consecutive chunk failures (hardcoded) | fatal | `error` + `close`; abort in-flight | +| Network aborted / stream `abort()` | fatal | `error` + `close` | +| Buffer overflow | warning | drop oldest frames, emit `warning` | +| User calls `close()` | normal | flush ≥ 0.5s remainder, emit `close` | + +One `AbortController` per in-flight request for the 30s timeout; one top-level `AbortController` aborted on fatal error or explicit close to unwind pending work. + +The SDK installs no signal handlers — users own `SIGINT`/`SIGTERM` and call `close()` themselves, matching the AssemblyAI example. + +## TypeScript surface + +New types exported from `src/audio/interfaces.ts`: + +```ts +export interface LiveSTTConfig { + language?: LanguageCodes | "auto"; + sampleRate?: number; + channels?: 1 | 2; + translate?: boolean; + chunkSeconds?: number; + overlapSeconds?: number; + vadThreshold?: number; + maxBufferSeconds?: number; +} + +export interface LiveSTTDelta { text: string; chunkIndex: number; } +export interface LiveSTTTurn { text: string; chunkIndex: number; isFinal: boolean; } +export interface LiveSTTWarning { code: "buffer_overflow" | "chunk_error"; message: string; } + +export interface LiveSTTEvents { + open: (payload: { id: string }) => void; + delta: (payload: LiveSTTDelta) => void; + turn: (payload: LiveSTTTurn) => void; + warning: (payload: LiveSTTWarning) => void; + error: (err: Error) => void; + close: () => void; +} + +export interface LiveTranscriber { + on(event: E, handler: LiveSTTEvents[E]): this; + off(event: E, handler: LiveSTTEvents[E]): this; + connect(): Promise; + stream(): WritableStream; + close(): Promise; +} +``` + +New method on `AudioApis`: + +```ts +speech_to_text_live(config?: LiveSTTConfig): LiveTranscriber; +``` + +Existing `speech_to_text` overloads, `SpeechToTextParams`, `SpeechToTextResponse` are unchanged. + +Since `jigsaw.audio` is exposed as the full `AudioApis` instance, no `core.ts` changes are required — the new method appears automatically. + +## Testing strategy + +**Unit (pure, fast, no network):** + +- `tests/live/chunker.test.ts` — push various sizes, assert chunk boundaries sample-aligned, WAV header validity (RIFF/WAVE/fmt/data magic, correct byte sizes, little-endian), overlap retention, `flush()` returns sub-chunk remainder, `flush()` drops <0.5s. +- `tests/live/stitcher.test.ts` — exact token overlap, fuzzy single-char diff, insertion/deletion within tolerance, no overlap returns current unchanged, empty inputs, trailing punctuation. +- `tests/live/sse.test.ts` — line parser handles `data:` prefix, `[DONE]`, malformed JSON (skip, not throw), events split across network chunks. Uses a fake `Response` with a `ReadableStream` body. + +**Integration (mocked HTTP, tests wiring):** + +- `tests/live/transcriber.test.ts` — pipe synthetic PCM16 into `transcriber.stream()`, stub `global.fetch` to emit a canned SSE transcript, assert event sequence `open → delta* → turn → close` with correct `chunkIndex`. +- Lifecycle edges: `close()` mid-chunk flushes then closes; buffer overflow emits warning; 3 consecutive chunk errors escalate to fatal. + +**Live (opt-in, real API):** + +- `tests/audio-live.test.ts` — feed a recorded 16kHz PCM16 WAV file into `speech_to_text_live`, assert `turn` events contain expected keywords. Uses `JIGSAWSTACK_API_KEY`. Gated behind `test:audio:live` script; not part of default `yarn test`. + +**No mic in tests** — mic capture isn't part of the SDK. Tests feed bytes directly for determinism. + +**Infra:** reuses `node:test` and `tests/test-helpers.ts`. HTTP mocking stubs `global.fetch` per test, no new dependency. + +## Dependencies + cleanup + +- No new runtime dependencies. +- Remove `node-record-lpcm16` from `package.json` `dependencies` — it's a user-side concern, not SDK surface. +- Move `node-record.js` → `examples/live-mic.js` as a reference for users wiring a Node mic to the new transcriber. + +## Backward compatibility + +No breaking changes. All existing exports, types, and methods remain. The new method is additive on `AudioApis`. diff --git a/examples/live-mic.js b/examples/live-mic.js new file mode 100644 index 0000000..1e5f1c0 --- /dev/null +++ b/examples/live-mic.js @@ -0,0 +1,36 @@ +import { Readable } from "stream"; +import { JigsawStack } from "jigsawstack"; +import recorder from "node-record-lpcm16"; + +const jigsaw = JigsawStack({ apiKey: process.env.JIGSAWSTACK_API_KEY }); + +const transcriber = jigsaw.audio.speech_to_text_live({ + sampleRate: 16000, +}); + +transcriber.on("open", ({ id }) => console.log("session", id)); +transcriber.on("delta", ({ text }) => process.stdout.write(`\r… ${text}`)); +transcriber.on("turn", ({ text }) => console.log(`\n${text}`)); +transcriber.on("warning", ({ code, message }) => console.warn("\n[warn]", code, message)); +transcriber.on("error", (err) => console.error("\n[error]", err)); +transcriber.on("close", () => console.log("\n[done]")); + +await transcriber.connect(); + +const rec = recorder.record({ + sampleRate: 16000, + channels: 1, + audioType: "raw", + recorder: "sox", + encoding: "signed-integer", + endianness: "little", + bits: 16, +}); + +Readable.toWeb(rec.stream()).pipeTo(transcriber.stream()); + +process.on("SIGINT", async () => { + rec.stop(); + await transcriber.close(); + process.exit(0); +}); diff --git a/package.json b/package.json index 7cb0825..092c02a 100644 --- a/package.json +++ b/package.json @@ -36,8 +36,9 @@ "format": "biome check --write .", "test": "yarn build && yarn test:all", "test:run": "node --test --no-warnings --import tsx --test-reporter=spec", - "test:all": "yarn test:run tests/*.ts", + "test:all": "yarn test:run tests/*.ts tests/live/*.ts", "test:audio": "yarn test:run tests/audio.test.ts", + "test:audio:live": "yarn test:run tests/audio-live.test.ts", "test:classification": "yarn test:run tests/classification.test.ts", "test:validate": "yarn test:run tests/validate.test.ts", "test:vision": "yarn test:run tests/vision.test.ts", diff --git a/src/audio/audio.ts b/src/audio/audio.ts index a87c0c3..934f75f 100644 --- a/src/audio/audio.ts +++ b/src/audio/audio.ts @@ -1,12 +1,15 @@ import { RequestClient } from "../request"; import { createFileUploadFormData } from "../utils"; import { + LiveSTTConfig, + LiveTranscriber, SpeechToTextParams, SpeechToTextParamsWithWebhook, SpeechToTextParamsWithoutWebhook, SpeechToTextResponse, SpeechToTextWebhookResponse, } from "./interfaces"; +import { Transcriber } from "./live/transcriber"; class Audio { constructor(private readonly client: RequestClient) {} @@ -34,6 +37,10 @@ class Audio { } return await this.client.fetchJSS("/v1/ai/transcribe", "POST", params); } + + speech_to_text_live(config?: LiveSTTConfig): LiveTranscriber { + return new Transcriber(this.client, config); + } } export default Audio; diff --git a/src/audio/interfaces.ts b/src/audio/interfaces.ts index c5403d4..8912378 100644 --- a/src/audio/interfaces.ts +++ b/src/audio/interfaces.ts @@ -37,3 +37,51 @@ export interface SpeechToTextWebhookResponse extends BaseResponse { status: "processing" | "error"; id: string; } + +// Streaming transcribe is English-only per the JigsawStack docs, so `language` is not exposed. +// Audio must be mono 16-bit PCM — downmix stereo sources before piping. +export interface LiveSTTConfig { + // Server-side transcribe params. + translate?: boolean; + vad?: boolean; + vadThreshold?: number; + + // Client-side audio + chunking params. + sampleRate?: number; + chunkSeconds?: number; + overlapSeconds?: number; + maxBufferSeconds?: number; +} + +export interface LiveSTTDelta { + text: string; + chunkIndex: number; +} + +export interface LiveSTTTurn { + text: string; + chunkIndex: number; + isFinal: boolean; +} + +export interface LiveSTTWarning { + code: "buffer_overflow" | "chunk_error"; + message: string; +} + +export interface LiveSTTEvents { + open: (payload: { id: string }) => void; + delta: (payload: LiveSTTDelta) => void; + turn: (payload: LiveSTTTurn) => void; + warning: (payload: LiveSTTWarning) => void; + error: (err: Error) => void; + close: () => void; +} + +export interface LiveTranscriber { + on(event: E, handler: LiveSTTEvents[E]): this; + off(event: E, handler: LiveSTTEvents[E]): this; + connect(): Promise; + stream(): WritableStream; + close(): Promise; +} diff --git a/src/audio/live/chunker.ts b/src/audio/live/chunker.ts new file mode 100644 index 0000000..0a80f2e --- /dev/null +++ b/src/audio/live/chunker.ts @@ -0,0 +1,96 @@ +export interface ChunkerConfig { + sampleRate: number; + channels: number; + chunkSeconds: number; + overlapSeconds: number; + maxBufferSeconds: number; + minFlushSeconds?: number; +} + +const SAMPLE_WIDTH = 2; // 16-bit PCM + +export class Chunker { + private buffer = new Uint8Array(0); + private pendingChunkBytes = 0; + private readonly bytesPerSecond: number; + private readonly chunkBytes: number; + private readonly overlapBytes: number; + private readonly maxBufferBytes: number; + private readonly minFlushBytes: number; + + constructor(private readonly config: ChunkerConfig) { + this.bytesPerSecond = config.sampleRate * config.channels * SAMPLE_WIDTH; + this.chunkBytes = Math.floor(config.chunkSeconds * this.bytesPerSecond); + this.overlapBytes = Math.floor(config.overlapSeconds * this.bytesPerSecond); + this.maxBufferBytes = Math.floor(config.maxBufferSeconds * this.bytesPerSecond); + this.minFlushBytes = Math.floor((config.minFlushSeconds ?? 0.5) * this.bytesPerSecond); + } + + push(bytes: Uint8Array): { dropped: number } { + const next = new Uint8Array(this.buffer.byteLength + bytes.byteLength); + next.set(this.buffer, 0); + next.set(bytes, this.buffer.byteLength); + this.buffer = next; + + let dropped = 0; + if (this.buffer.byteLength > this.maxBufferBytes) { + dropped = this.buffer.byteLength - this.maxBufferBytes; + this.buffer = this.buffer.slice(dropped); + if (this.pendingChunkBytes > 0) { + this.pendingChunkBytes = Math.max(0, this.pendingChunkBytes - dropped); + } + } + return { dropped }; + } + + tryEmit(): Uint8Array | null { + if (this.pendingChunkBytes > 0) return null; + if (this.buffer.byteLength < this.chunkBytes) return null; + this.pendingChunkBytes = this.chunkBytes; + const pcm = this.buffer.slice(0, this.chunkBytes); + return buildWav(pcm, this.config.sampleRate, this.config.channels); + } + + ackChunk(): void { + if (this.pendingChunkBytes === 0) return; + const drop = Math.max(0, this.pendingChunkBytes - this.overlapBytes); + this.buffer = drop >= this.buffer.byteLength ? new Uint8Array(0) : this.buffer.slice(drop); + this.pendingChunkBytes = 0; + } + + flush(): Uint8Array | null { + if (this.buffer.byteLength < this.minFlushBytes) return null; + const wav = buildWav(this.buffer, this.config.sampleRate, this.config.channels); + this.buffer = new Uint8Array(0); + this.pendingChunkBytes = 0; + return wav; + } +} + +function buildWav(pcm: Uint8Array, sampleRate: number, channels: number): Uint8Array { + const bitsPerSample = SAMPLE_WIDTH * 8; + const byteRate = sampleRate * channels * SAMPLE_WIDTH; + const blockAlign = channels * SAMPLE_WIDTH; + const dataSize = pcm.byteLength; + const out = new Uint8Array(44 + dataSize); + const dv = new DataView(out.buffer); + writeAscii(dv, 0, "RIFF"); + dv.setUint32(4, 36 + dataSize, true); + writeAscii(dv, 8, "WAVE"); + writeAscii(dv, 12, "fmt "); + dv.setUint32(16, 16, true); + dv.setUint16(20, 1, true); + dv.setUint16(22, channels, true); + dv.setUint32(24, sampleRate, true); + dv.setUint32(28, byteRate, true); + dv.setUint16(32, blockAlign, true); + dv.setUint16(34, bitsPerSample, true); + writeAscii(dv, 36, "data"); + dv.setUint32(40, dataSize, true); + out.set(pcm, 44); + return out; +} + +function writeAscii(dv: DataView, offset: number, str: string): void { + for (let i = 0; i < str.length; i++) dv.setUint8(offset + i, str.charCodeAt(i)); +} diff --git a/src/audio/live/sse.ts b/src/audio/live/sse.ts new file mode 100644 index 0000000..8da85f7 --- /dev/null +++ b/src/audio/live/sse.ts @@ -0,0 +1,71 @@ +import { RequestClient } from "../../request"; + +export interface SSETranscribeParams { + language: string; + vadThreshold: number; + translate?: boolean; + vad?: boolean; +} + +export async function transcribeChunk( + client: RequestClient, + wavBuf: Uint8Array, + params: SSETranscribeParams, + onDelta: (text: string) => void, + signal?: AbortSignal +): Promise { + const searchParams: Record = { + stream: "true", + vad: params.vad === undefined ? undefined : params.vad ? "true" : "false", + vad_threshold: params.vadThreshold, + language: params.language, + translate: params.translate ? "true" : undefined, + }; + + const resp = await client.fetchJSSStream("/v1/ai/transcribe", "POST", wavBuf, searchParams, { "Content-Type": "audio/wav" }, signal); + + if (!resp.ok) { + const text = await resp.text().catch(() => ""); + throw new Error(`Transcribe failed ${resp.status}: ${text.slice(0, 200)}`); + } + + if (!resp.body) { + throw new Error("Transcribe response has no body"); + } + + const reader = resp.body.getReader(); + const decoder = new TextDecoder(); + let buf = ""; + let finalText = ""; + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + let idx: number; + while ((idx = buf.indexOf("\n")) !== -1) { + const line = buf.slice(0, idx).trim(); + buf = buf.slice(idx + 1); + if (!line.startsWith("data:")) continue; + const data = line.slice(5).trim(); + if (!data || data === "[DONE]") continue; + let event: any; + try { + event = JSON.parse(data); + } catch { + continue; + } + if (event.type === "transcript.delta" && typeof event.delta === "string") { + onDelta(event.delta); + } else if ((event.type === "transcript.done" || event.type === "transcript.final") && typeof event.text === "string") { + finalText = event.text.trim(); + } + } + } + } finally { + reader.releaseLock?.(); + } + + return finalText; +} diff --git a/src/audio/live/stitcher.ts b/src/audio/live/stitcher.ts new file mode 100644 index 0000000..a4aae26 --- /dev/null +++ b/src/audio/live/stitcher.ts @@ -0,0 +1,108 @@ +export class Stitcher { + private prevTranscript = ""; + + preview(current: string): string { + return stitch(this.prevTranscript, current); + } + + commit(current: string): string { + const stitched = stitch(this.prevTranscript, current); + this.prevTranscript = current; + return stitched; + } + + reset(): void { + this.prevTranscript = ""; + } +} + +function stitch(prev: string, current: string): string { + if (!prev || !current) return current; + const prevTokens = tokenize(prev); + const curTokens = tokenize(current); + const overlap = findTokenOverlap(prevTokens, curTokens); + if (overlap > 0) { + const cutPos = findCutPosition(current, prevTokens, curTokens, overlap); + if (cutPos === -1) return ""; + return current.slice(cutPos).trimStart(); + } + return current; +} + +/** + * Find the position in `current` immediately after the last overlap token's + * alphanumeric core. For the last overlap token, if prev's matching token + * carried the same trailing punctuation, strip that punctuation too — it was + * already emitted with prev. Otherwise preserve the trailing punctuation in + * the remainder (e.g. "world," → keep ","). + */ +function findCutPosition(current: string, prevTokens: string[], curTokens: string[], overlap: number): number { + let pos = 0; + for (let i = 0; i < overlap; i++) { + while (pos < current.length && /\s/.test(current[pos])) pos++; + const token = curTokens[i]; + const coreMatch = token.match(/^[^a-zA-Z0-9]*([a-zA-Z0-9].*?[a-zA-Z0-9]|[a-zA-Z0-9])[^a-zA-Z0-9]*$/); + const core = coreMatch ? coreMatch[1] : token; + const coreIdx = current.indexOf(core, pos); + if (coreIdx === -1) { + pos += token.length; + } else { + pos = coreIdx + core.length; + } + if (i === overlap - 1) { + const prevToken = prevTokens[prevTokens.length - overlap + i]; + const prevPunctMatch = prevToken.match(/[^a-zA-Z0-9]+$/); + const prevPunct = prevPunctMatch ? prevPunctMatch[0] : ""; + if (prevPunct && current.slice(pos, pos + prevPunct.length) === prevPunct) { + pos += prevPunct.length; + } + } + } + if (pos >= current.length) return -1; + return pos; +} + +function tokenize(text: string): string[] { + return text.trim().split(/\s+/).filter(Boolean); +} + +function normalizeToken(token: string): string { + return token.toLowerCase().replace(/[^a-z0-9]+/g, ""); +} + +function fuzzyTokenMatch(a: string, b: string): boolean { + const na = normalizeToken(a); + const nb = normalizeToken(b); + if (na === nb) return true; + if (na.length < 4 || nb.length < 4) return false; + if (Math.abs(na.length - nb.length) > 1) return false; + if (na.length === nb.length) { + let diffs = 0; + for (let i = 0; i < na.length; i++) if (na[i] !== nb[i]) diffs++; + return diffs <= 1; + } + const [short, long] = na.length < nb.length ? [na, nb] : [nb, na]; + let diffs = 0; + let si = 0; + let li = 0; + while (si < short.length && li < long.length) { + if (short[si] !== long[li]) { + diffs++; + li++; + } else { + si++; + li++; + } + } + return diffs + (long.length - li) <= 1; +} + +function findTokenOverlap(prevTokens: string[], curTokens: string[]): number { + const max = Math.min(prevTokens.length, curTokens.length); + for (let overlap = max; overlap > 0; overlap--) { + const prevSlice = prevTokens.slice(-overlap); + const curSlice = curTokens.slice(0, overlap); + if (prevSlice.every((t, i) => fuzzyTokenMatch(t, curSlice[i]))) return overlap; + } + return 0; +} diff --git a/src/audio/live/transcriber.ts b/src/audio/live/transcriber.ts new file mode 100644 index 0000000..987b674 --- /dev/null +++ b/src/audio/live/transcriber.ts @@ -0,0 +1,226 @@ +import { RequestClient } from "../../request"; +import { LiveSTTConfig, LiveSTTEvents, LiveTranscriber } from "../interfaces"; +import { Chunker } from "./chunker"; +import { transcribeChunk } from "./sse"; +import { Stitcher } from "./stitcher"; + +type State = "idle" | "open" | "closing" | "closed" | "errored"; + +const DEFAULTS = { + sampleRate: 16000, + translate: false, + chunkSeconds: 5, + overlapSeconds: 2, + vad: true, + vadThreshold: 0.4, + maxBufferSeconds: 30, +}; + +const LANGUAGE = "en"; // streaming is English-only per JigsawStack docs +const CHANNELS = 1; // audio must be mono PCM16 + +const MAX_CONSECUTIVE_ERRORS = 3; +const CHUNK_TIMEOUT_MS = 30_000; + +export class Transcriber implements LiveTranscriber { + private readonly listeners = new Map>(); + private state: State = "idle"; + private readonly cfg: Required; + private readonly chunker: Chunker; + private readonly stitcher = new Stitcher(); + private chunkIndex = 0; + private consecutiveErrors = 0; + private inFlight: Promise | null = null; + private readonly topAbort = new AbortController(); + private closePromise: Promise | null = null; + private sessionId = ""; + + constructor( + private readonly client: RequestClient, + config?: LiveSTTConfig + ) { + this.cfg = { ...DEFAULTS, ...(config ?? {}) } as Required; + this.chunker = new Chunker({ + sampleRate: this.cfg.sampleRate, + channels: CHANNELS, + chunkSeconds: this.cfg.chunkSeconds, + overlapSeconds: this.cfg.overlapSeconds, + maxBufferSeconds: this.cfg.maxBufferSeconds, + }); + } + + on(event: E, handler: LiveSTTEvents[E]): this { + let set = this.listeners.get(event); + if (!set) { + set = new Set(); + this.listeners.set(event, set); + } + set.add(handler as unknown as Function); + return this; + } + + off(event: E, handler: LiveSTTEvents[E]): this { + this.listeners.get(event)?.delete(handler as unknown as Function); + return this; + } + + private emit(event: E, ...args: Parameters): void { + const set = this.listeners.get(event); + if (!set) return; + for (const h of set) { + try { + (h as (...a: any[]) => void)(...(args as any[])); + } catch { + // swallow listener errors to preserve session + } + } + } + + async connect(): Promise { + if (this.state !== "idle") throw new Error("connect() called on transcriber that is not idle"); + if (!(this.cfg.sampleRate > 0)) throw new Error("sampleRate must be > 0"); + if (!(this.cfg.chunkSeconds > this.cfg.overlapSeconds && this.cfg.overlapSeconds > 0)) + throw new Error("chunkSeconds > overlapSeconds > 0 required"); + if (!(this.cfg.maxBufferSeconds > this.cfg.chunkSeconds)) throw new Error("maxBufferSeconds must be > chunkSeconds"); + this.sessionId = crypto.randomUUID(); + this.state = "open"; + this.emit("open", { id: this.sessionId }); + } + + stream(): WritableStream { + if (this.state !== "open") throw new Error("stream() can only be called on an open transcriber"); + return new WritableStream({ + write: async (bytes) => { + if (this.state !== "open") return; + const { dropped } = this.chunker.push(bytes); + if (dropped > 0) { + this.emit("warning", { + code: "buffer_overflow", + message: `dropped ${dropped} bytes to stay under maxBufferSeconds`, + }); + } + this.pump(); + }, + close: async () => { + await this.finalize(); + }, + abort: async (reason) => { + this.fail(reason instanceof Error ? reason : new Error(String(reason ?? "stream aborted"))); + }, + }); + } + + private pump(): void { + if (this.inFlight) return; + if (this.state !== "open" && this.state !== "closing") return; + const chunk = this.chunker.tryEmit(); + if (!chunk) return; + const idx = this.chunkIndex++; + this.inFlight = this.processChunk(chunk, idx, false).finally(() => { + this.inFlight = null; + if (this.state === "open" || this.state === "closing") this.pump(); + }); + } + + private async processChunk(wav: Uint8Array, idx: number, isFinal: boolean): Promise { + let committed = ""; + const chunkAbort = new AbortController(); + const onTop = () => chunkAbort.abort(this.topAbort.signal.reason ?? new Error("aborted")); + this.topAbort.signal.addEventListener("abort", onTop); + const timer = setTimeout(() => chunkAbort.abort(new Error("transcribe timeout")), CHUNK_TIMEOUT_MS); + + try { + committed = await transcribeChunk( + this.client, + wav, + { + language: LANGUAGE, + vad: this.cfg.vad, + vadThreshold: this.cfg.vadThreshold, + translate: this.cfg.translate, + }, + (delta) => { + if (this.state !== "open" && this.state !== "closing") return; + const preview = this.stitcher.preview(delta); + if (preview) this.emit("delta", { text: preview, chunkIndex: idx }); + }, + chunkAbort.signal + ); + this.consecutiveErrors = 0; + } catch (err: any) { + if (this.topAbort.signal.aborted) { + this.chunker.ackChunk(); + return; + } + this.consecutiveErrors++; + this.emit("warning", { code: "chunk_error", message: err?.message ?? String(err) }); + this.chunker.ackChunk(); + if (this.consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { + this.fail(new Error("live stt aborted after 3 consecutive chunk failures")); + } + return; + } finally { + clearTimeout(timer); + this.topAbort.signal.removeEventListener("abort", onTop); + } + + this.chunker.ackChunk(); + if (committed) { + const stitched = this.stitcher.commit(committed); + if (stitched) this.emit("turn", { text: stitched, chunkIndex: idx, isFinal }); + } + } + + private async finalize(): Promise { + if (this.closePromise) return this.closePromise; + this.closePromise = (async () => { + if (this.state === "closed") return; + if (this.state === "errored") { + this.emitClose(); + return; + } + if (this.state !== "open") return; + this.state = "closing"; + // Drain all remaining full chunks serially, then do a final flush. + // pump() will keep scheduling chunks while buffer has >= chunkBytes. + // We loop: kick pump, await inFlight, repeat until nothing more to schedule. + this.pump(); + while (this.inFlight) { + try { + await this.inFlight; + } catch { + // errors already surfaced via events + } + if (this.state !== "closing") break; + } + if ((this.state as State) !== "closing") { + this.emitClose(); + return; + } + const flush = this.chunker.flush(); + if (flush) { + await this.processChunk(flush, this.chunkIndex++, true); + } + this.emitClose(); + })(); + return this.closePromise; + } + + async close(): Promise { + return this.finalize(); + } + + private emitClose(): void { + if (this.state === "closed") return; + this.state = "closed"; + this.emit("close"); + } + + private fail(err: Error): void { + if (this.state === "errored" || this.state === "closed") return; + this.state = "errored"; + this.topAbort.abort(err); + this.emit("error", err); + this.emitClose(); + } +} diff --git a/src/request.ts b/src/request.ts index ce8f16f..84aa9f6 100644 --- a/src/request.ts +++ b/src/request.ts @@ -59,4 +59,40 @@ export class RequestClient { return result; }; + + readonly fetchJSSStream = async ( + path: string, + method: "POST" | "GET", + body?: Uint8Array | Record, + searchParams?: { + [key: string]: any; + }, + headers?: { + [key: string]: string; + }, + signal?: AbortSignal + ): Promise => { + const isBinary = body instanceof Uint8Array; + + searchParams = searchParams ? removeUndefinedProperties(searchParams) : undefined; + + const _headers = { + "x-api-key": this.config?.apiKey, + ...(!isBinary && body !== undefined ? { "Content-Type": "application/json" } : {}), + ...this.config?.headers, + ...headers, + }; + + const _body = isBinary ? body : body !== undefined ? JSON.stringify(body) : undefined; + + const url = `${this.config?.baseURL || baseURL}${path}`; + const urlParams = searchParams && Object.keys(searchParams).length ? `?${new URLSearchParams(searchParams).toString()}` : ""; + + return fetch(`${url}${urlParams}`, { + method, + headers: _headers, + body: method === "POST" ? _body : undefined, + signal, + }); + }; } diff --git a/tests/audio-live.test.ts b/tests/audio-live.test.ts new file mode 100644 index 0000000..a2b8495 --- /dev/null +++ b/tests/audio-live.test.ts @@ -0,0 +1,63 @@ +import assert from "node:assert/strict"; +import { describe, test } from "node:test"; +import { createJigsawStackClient } from "./test-helpers.js"; + +const PREVIEW_WAV_URL = "https://jigsawstack.com/preview/stt-example.wav"; + +async function fetchPcm16(url: string): Promise<{ pcm: Uint8Array; sampleRate: number; channels: number }> { + const resp = await fetch(url); + if (!resp.ok) throw new Error(`fixture fetch failed ${resp.status}`); + const buf = new Uint8Array(await resp.arrayBuffer()); + const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength); + const channels = dv.getUint16(22, true); + const sampleRate = dv.getUint32(24, true); + let offset = 12; + while (offset < buf.byteLength - 8) { + const id = String.fromCharCode(buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3]); + const size = dv.getUint32(offset + 4, true); + if (id === "data") { + return { pcm: buf.slice(offset + 8, offset + 8 + size), sampleRate, channels }; + } + offset += 8 + size; + } + throw new Error("data chunk not found"); +} + +function downmixToMono(pcm: Uint8Array, channels: number): Uint8Array { + if (channels === 1) return pcm; + const frames = pcm.byteLength / (channels * 2); + const out = new Uint8Array(frames * 2); + const src = new DataView(pcm.buffer, pcm.byteOffset, pcm.byteLength); + const dst = new DataView(out.buffer); + for (let i = 0; i < frames; i++) { + let sum = 0; + for (let c = 0; c < channels; c++) sum += src.getInt16((i * channels + c) * 2, true); + dst.setInt16(i * 2, Math.round(sum / channels), true); + } + return out; +} + +describe("Live STT (integration)", { skip: !process.env.JIGSAWSTACK_API_KEY }, () => { + test("streams PCM through transcriber and receives at least one turn", async () => { + const client = createJigsawStackClient(); + const { pcm, sampleRate, channels } = await fetchPcm16(PREVIEW_WAV_URL); + const monoPcm = downmixToMono(pcm, channels); + + const transcriber = client.audio.speech_to_text_live({ sampleRate }); + + const turns: string[] = []; + transcriber.on("turn", ({ text }) => turns.push(text)); + + await transcriber.connect(); + const writer = transcriber.stream().getWriter(); + const sliceBytes = sampleRate * 2 * 0.5; + for (let off = 0; off < monoPcm.byteLength; off += sliceBytes) { + await writer.write(monoPcm.slice(off, Math.min(off + sliceBytes, monoPcm.byteLength))); + } + await writer.close(); + await transcriber.close(); + + assert.ok(turns.length > 0, "expected at least one turn"); + assert.ok(turns.join(" ").length > 0, "expected non-empty transcript"); + }); +}); diff --git a/tests/live/chunker.test.ts b/tests/live/chunker.test.ts new file mode 100644 index 0000000..364d1fa --- /dev/null +++ b/tests/live/chunker.test.ts @@ -0,0 +1,100 @@ +import assert from "node:assert/strict"; +import { describe, test } from "node:test"; +import { Chunker } from "../../src/audio/live/chunker"; + +const RATE = 16000; +const BYTES_PER_SEC = RATE * 1 * 2; // mono 16-bit = 32000 bytes/s + +function makeChunker(overrides: Partial[0]> = {}) { + return new Chunker({ + sampleRate: RATE, + channels: 1, + chunkSeconds: 5, + overlapSeconds: 2, + maxBufferSeconds: 30, + ...overrides, + }); +} + +describe("Chunker", () => { + test("buffers bytes and emits nothing before chunkSeconds is reached", () => { + const c = makeChunker(); + const { dropped } = c.push(new Uint8Array(BYTES_PER_SEC)); // 1s + assert.equal(dropped, 0); + assert.equal(c.tryEmit(), null); + }); + + test("emits a WAV chunk once chunkSeconds of audio is buffered", () => { + const c = makeChunker(); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); // 5s of silence + const wav = c.tryEmit(); + assert.ok(wav); + assert.equal(wav!.byteLength, 44 + 5 * BYTES_PER_SEC); + const dv = new DataView(wav!.buffer, wav!.byteOffset, wav!.byteLength); + assert.equal(String.fromCharCode(dv.getUint8(0), dv.getUint8(1), dv.getUint8(2), dv.getUint8(3)), "RIFF"); + assert.equal(String.fromCharCode(dv.getUint8(8), dv.getUint8(9), dv.getUint8(10), dv.getUint8(11)), "WAVE"); + assert.equal(String.fromCharCode(dv.getUint8(12), dv.getUint8(13), dv.getUint8(14), dv.getUint8(15)), "fmt "); + assert.equal(String.fromCharCode(dv.getUint8(36), dv.getUint8(37), dv.getUint8(38), dv.getUint8(39)), "data"); + assert.equal(dv.getUint32(24, true), RATE); + assert.equal(dv.getUint16(22, true), 1); + assert.equal(dv.getUint16(34, true), 16); + assert.equal(dv.getUint32(40, true), 5 * BYTES_PER_SEC); + }); + + test("tryEmit returns null while a chunk is pending ack", () => { + const c = makeChunker(); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); + assert.ok(c.tryEmit()); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); + assert.equal(c.tryEmit(), null); + }); + + test("ackChunk trims the emitted chunk but keeps overlapSeconds at front", () => { + const c = makeChunker(); + c.push(new Uint8Array(5 * BYTES_PER_SEC)); + c.tryEmit(); + c.ackChunk(); + c.push(new Uint8Array(2 * BYTES_PER_SEC)); + assert.equal(c.tryEmit(), null); + c.push(new Uint8Array(1 * BYTES_PER_SEC)); + const wav = c.tryEmit(); + assert.ok(wav); + assert.equal(wav!.byteLength, 44 + 5 * BYTES_PER_SEC); + }); + + test("buffer overflow drops oldest bytes and reports count", () => { + const c = makeChunker({ maxBufferSeconds: 4 }); + const { dropped } = c.push(new Uint8Array(10 * BYTES_PER_SEC)); + assert.equal(dropped, 6 * BYTES_PER_SEC); + assert.equal(c.tryEmit(), null); + }); + + test("flush returns remaining audio as WAV when ≥ minFlushSeconds, else null", () => { + const c = makeChunker(); + c.push(new Uint8Array(Math.floor(0.3 * BYTES_PER_SEC))); + assert.equal(c.flush(), null); + + const c2 = makeChunker(); + c2.push(new Uint8Array(Math.floor(1 * BYTES_PER_SEC))); + const wav = c2.flush(); + assert.ok(wav); + const dv = new DataView(wav!.buffer, wav!.byteOffset, wav!.byteLength); + assert.equal(dv.getUint32(40, true), 1 * BYTES_PER_SEC); + }); + + test("overflow while a chunk is pending keeps ackChunk overlap math consistent", () => { + const c = makeChunker({ maxBufferSeconds: 6 }); // chunk=5s, overlap=2s, max=6s + c.push(new Uint8Array(5 * BYTES_PER_SEC)); // 5s — fills chunk exactly + assert.ok(c.tryEmit()); + // Push enough to overflow by 3s (buffer becomes 9s → clips to 6s → drops 3s from front) + c.push(new Uint8Array(4 * BYTES_PER_SEC)); + c.ackChunk(); + // After overflow, pendingChunkBytes should have been adjusted from 160_000 → 64_000 (5s - 3s dropped = 2s). + // drop = max(0, 64_000 - 64_000) = 0 → buffer is unchanged. + // The 6s buffer (2s overlap-ish + 4s new) should still be intact; tryEmit on next 5s window should succeed + // after we just append 0 more bytes (since buffer already has 6s, > chunkBytes). + const wav = c.tryEmit(); + assert.ok(wav); + assert.equal(wav!.byteLength, 44 + 5 * BYTES_PER_SEC); + }); +}); diff --git a/tests/live/request-stream.test.ts b/tests/live/request-stream.test.ts new file mode 100644 index 0000000..6a80772 --- /dev/null +++ b/tests/live/request-stream.test.ts @@ -0,0 +1,46 @@ +import assert from "node:assert/strict"; +import { afterEach, beforeEach, describe, test } from "node:test"; +import { RequestClient } from "../../src/request"; + +describe("RequestClient.fetchJSSStream", () => { + let origFetch: typeof fetch; + let lastCall: { url: string; init: RequestInit } | null; + + beforeEach(() => { + origFetch = globalThis.fetch; + lastCall = null; + globalThis.fetch = (async (url: any, init: any) => { + lastCall = { url: String(url), init }; + return new Response("hello", { status: 200, headers: { "content-type": "text/plain" } }); + }) as typeof fetch; + }); + + afterEach(() => { + globalThis.fetch = origFetch; + }); + + test("returns raw Response without reading the body", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const body = new Uint8Array([1, 2, 3]); + const resp = await client.fetchJSSStream("/v1/stream", "POST", body, { stream: "true" }, { "Content-Type": "audio/wav" }); + assert.equal(resp.status, 200); + assert.equal(resp.bodyUsed, false); + assert.equal(lastCall!.url, "https://api.test/v1/stream?stream=true"); + const headers = lastCall!.init.headers as Record; + assert.equal(headers["x-api-key"], "k"); + assert.equal(headers["Content-Type"], "audio/wav"); + }); + + test("forwards AbortSignal", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const ac = new AbortController(); + await client.fetchJSSStream("/v1/stream", "POST", new Uint8Array([1]), undefined, undefined, ac.signal); + assert.equal(lastCall!.init.signal, ac.signal); + }); + + test("omits undefined query params", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + await client.fetchJSSStream("/v1/stream", "POST", new Uint8Array([1]), { stream: "true", translate: undefined }); + assert.equal(lastCall!.url, "https://api.test/v1/stream?stream=true"); + }); +}); diff --git a/tests/live/sse.test.ts b/tests/live/sse.test.ts new file mode 100644 index 0000000..73e8a08 --- /dev/null +++ b/tests/live/sse.test.ts @@ -0,0 +1,92 @@ +import assert from "node:assert/strict"; +import { afterEach, beforeEach, describe, test } from "node:test"; +import { transcribeChunk } from "../../src/audio/live/sse"; +import { RequestClient } from "../../src/request"; + +function sseResponse(events: string[]): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + for (const e of events) controller.enqueue(encoder.encode(e)); + controller.close(); + }, + }); + return new Response(stream, { status: 200, headers: { "content-type": "text/event-stream" } }); +} + +describe("transcribeChunk (SSE)", () => { + let origFetch: typeof fetch; + beforeEach(() => { + origFetch = globalThis.fetch; + }); + afterEach(() => { + globalThis.fetch = origFetch; + }); + + test("invokes onDelta for each transcript.delta and returns final text on transcript.done", async () => { + globalThis.fetch = (async () => + sseResponse([ + 'data: {"type":"transcript.delta","delta":"hello"}\n', + 'data: {"type":"transcript.delta","delta":" world"}\n', + 'data: {"type":"transcript.done","text":"hello world"}\n', + "data: [DONE]\n", + ])) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const deltas: string[] = []; + const final = await transcribeChunk(client, new Uint8Array([1, 2, 3]), { language: "en", vadThreshold: 0.4 }, (d) => deltas.push(d)); + assert.deepEqual(deltas, ["hello", " world"]); + assert.equal(final, "hello world"); + }); + + test("handles events split across network chunks", async () => { + globalThis.fetch = (async () => + sseResponse(['data: {"type":"transcript.delt', 'a","delta":"hi"}\n', 'data: {"type":"transcript.done","text":"hi"}\n'])) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const deltas: string[] = []; + const final = await transcribeChunk(client, new Uint8Array([1]), { language: "en", vadThreshold: 0.4 }, (d) => deltas.push(d)); + assert.deepEqual(deltas, ["hi"]); + assert.equal(final, "hi"); + }); + + test("skips malformed JSON lines without throwing", async () => { + globalThis.fetch = (async () => + sseResponse([ + "data: {not valid json}\n", + 'data: {"type":"transcript.delta","delta":"ok"}\n', + 'data: {"type":"transcript.done","text":"ok"}\n', + ])) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const deltas: string[] = []; + const final = await transcribeChunk(client, new Uint8Array([1]), { language: "en", vadThreshold: 0.4 }, (d) => deltas.push(d)); + assert.deepEqual(deltas, ["ok"]); + assert.equal(final, "ok"); + }); + + test("throws on non-2xx with truncated body in message", async () => { + globalThis.fetch = (async () => new Response("server on fire".repeat(50), { status: 500 })) as typeof fetch; + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + await assert.rejects( + transcribeChunk(client, new Uint8Array([1]), { language: "en", vadThreshold: 0.4 }, () => {}), + /Transcribe failed 500/ + ); + }); + + test("forwards query params including translate when set", async () => { + let captured = ""; + globalThis.fetch = (async (url: any) => { + captured = String(url); + return sseResponse(['data: {"type":"transcript.done","text":""}\n']); + }) as typeof fetch; + + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + await transcribeChunk(client, new Uint8Array([1]), { language: "fr", vad: true, vadThreshold: 0.5, translate: true }, () => {}); + assert.match(captured, /stream=true/); + assert.match(captured, /vad=true/); + assert.match(captured, /vad_threshold=0\.5/); + assert.match(captured, /language=fr/); + assert.match(captured, /translate=true/); + }); +}); diff --git a/tests/live/stitcher.test.ts b/tests/live/stitcher.test.ts new file mode 100644 index 0000000..51ebff2 --- /dev/null +++ b/tests/live/stitcher.test.ts @@ -0,0 +1,82 @@ +import assert from "node:assert/strict"; +import { describe, test } from "node:test"; +import { Stitcher } from "../../src/audio/live/stitcher"; + +describe("Stitcher", () => { + test("returns current text when there is no previous transcript", () => { + const s = new Stitcher(); + assert.equal(s.preview("hello world"), "hello world"); + assert.equal(s.commit("hello world"), "hello world"); + }); + + test("strips exact-match token overlap between commits", () => { + const s = new Stitcher(); + s.commit("the quick brown fox jumps"); + const out = s.commit("brown fox jumps over the lazy dog"); + assert.equal(out, "over the lazy dog"); + }); + + test("preview does not mutate state", () => { + const s = new Stitcher(); + s.commit("hello world"); + s.preview("world there"); + const out = s.commit("world there"); + assert.equal(out, "there"); + }); + + test("fuzzy match handles single-character substitution on long tokens", () => { + const s = new Stitcher(); + s.commit("the quick brown foxes"); + const out = s.commit("foxxs jumped high"); + assert.equal(out, "jumped high"); + }); + + test("fuzzy match handles single insertion/deletion on long tokens", () => { + const s = new Stitcher(); + s.commit("hello international world"); + const out = s.commit("internationl world, how are you"); + assert.equal(out, ", how are you"); + }); + + test("no overlap returns current unchanged", () => { + const s = new Stitcher(); + s.commit("one two three"); + const out = s.commit("apple banana cherry"); + assert.equal(out, "apple banana cherry"); + }); + + test("empty inputs are handled", () => { + const s = new Stitcher(); + assert.equal(s.preview(""), ""); + s.commit("hello"); + assert.equal(s.commit(""), ""); + }); + + test("returns empty string when current is entirely overlap", () => { + const s = new Stitcher(); + s.commit("hello world foo bar"); + const out = s.commit("foo bar"); + assert.equal(out, ""); + }); + + test("reset clears state", () => { + const s = new Stitcher(); + s.commit("previous"); + s.reset(); + assert.equal(s.commit("previous again"), "previous again"); + }); + + test("strips trailing punctuation when it was already in the previous commit", () => { + const s = new Stitcher(); + s.commit("Hi Vineet, how are you doing?"); + const out = s.commit("Hi Vineet, how are you doing? I am doing fine."); + assert.equal(out, "I am doing fine."); + }); + + test("preserves trailing punctuation that is new in current (not in prev)", () => { + const s = new Stitcher(); + s.commit("hello international world"); + const out = s.commit("internationl world, how are you"); + assert.equal(out, ", how are you"); + }); +}); diff --git a/tests/live/transcriber.test.ts b/tests/live/transcriber.test.ts new file mode 100644 index 0000000..a5ea3a9 --- /dev/null +++ b/tests/live/transcriber.test.ts @@ -0,0 +1,207 @@ +import assert from "node:assert/strict"; +import { afterEach, beforeEach, describe, test } from "node:test"; +import { Transcriber } from "../../src/audio/live/transcriber"; +import { RequestClient } from "../../src/request"; + +const RATE = 16000; +const BYTES_PER_SEC = RATE * 2; + +function encode(events: string[]): ReadableStream { + const enc = new TextEncoder(); + return new ReadableStream({ + start(c) { + for (const e of events) c.enqueue(enc.encode(e)); + c.close(); + }, + }); +} + +function mockFetchSequence(responses: Array<() => Response>): () => void { + let i = 0; + const original = globalThis.fetch; + globalThis.fetch = (async () => { + const idx = Math.min(i++, responses.length - 1); + return responses[idx](); + }) as typeof fetch; + return () => { + globalThis.fetch = original; + }; +} + +function successResponse(deltas: string[], final: string): Response { + const evts = [ + ...deltas.map((d) => `data: ${JSON.stringify({ type: "transcript.delta", delta: d })}\n`), + `data: ${JSON.stringify({ type: "transcript.done", text: final })}\n`, + ]; + return new Response(encode(evts), { status: 200 }); +} + +function makeTranscriber() { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + return new Transcriber(client, { sampleRate: RATE, chunkSeconds: 5, overlapSeconds: 2, maxBufferSeconds: 30 }); +} + +async function writeBytes(writer: WritableStreamDefaultWriter, bytes: Uint8Array, chunkSize = 0) { + if (chunkSize <= 0) { + await writer.write(bytes); + return; + } + for (let offset = 0; offset < bytes.byteLength; offset += chunkSize) { + await writer.write(bytes.slice(offset, Math.min(offset + chunkSize, bytes.byteLength))); + } +} + +describe("Transcriber", () => { + let restore: (() => void) | null = null; + beforeEach(() => { + restore = null; + }); + afterEach(() => { + restore?.(); + }); + + test("connect() emits open once with a session id", async () => { + const t = makeTranscriber(); + const opens: any[] = []; + t.on("open", (p) => opens.push(p)); + await t.connect(); + assert.equal(opens.length, 1); + assert.equal(typeof opens[0].id, "string"); + assert.ok(opens[0].id.length > 0); + }); + + test("stream() before connect() throws", () => { + const t = makeTranscriber(); + assert.throws(() => t.stream(), /open/); + }); + + test("invalid config rejects connect()", async () => { + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const bad = new Transcriber(client, { chunkSeconds: 2, overlapSeconds: 3 }); + await assert.rejects(bad.connect(), /chunkSeconds/); + }); + + test("pipes PCM bytes, emits delta+turn+close, in order, with correct chunkIndex", async () => { + restore = mockFetchSequence([() => successResponse(["hello"], "hello world"), () => successResponse(["bye"], "bye now")]); + const t = makeTranscriber(); + const events: Array<{ kind: string; payload?: any }> = []; + t.on("open", (p) => events.push({ kind: "open", payload: p })); + t.on("delta", (p) => events.push({ kind: "delta", payload: p })); + t.on("turn", (p) => events.push({ kind: "turn", payload: p })); + t.on("close", () => events.push({ kind: "close" })); + await t.connect(); + + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(10 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + + const kinds = events.map((e) => e.kind); + assert.equal(kinds[0], "open"); + assert.equal(kinds[kinds.length - 1], "close"); + + const turns = events.filter((e) => e.kind === "turn").map((e) => e.payload); + assert.ok(turns.length >= 1); + assert.equal(turns[0].chunkIndex, 0); + assert.equal(typeof turns[0].text, "string"); + }); + + test("empty transcripts are suppressed", async () => { + restore = mockFetchSequence([() => successResponse([], "")]); + const t = makeTranscriber(); + const turns: any[] = []; + const deltas: any[] = []; + t.on("delta", (p) => deltas.push(p)); + t.on("turn", (p) => turns.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(5 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + assert.equal(deltas.length, 0); + assert.equal(turns.length, 0); + }); + + test("single chunk HTTP error emits warning and continues", async () => { + restore = mockFetchSequence([() => new Response("boom", { status: 500 }), () => successResponse(["ok"], "ok")]); + const t = makeTranscriber(); + const warnings: any[] = []; + const turns: any[] = []; + t.on("warning", (p) => warnings.push(p)); + t.on("turn", (p) => turns.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(10 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + assert.ok(warnings.some((w) => w.code === "chunk_error")); + assert.ok(turns.some((tr) => tr.text === "ok")); + }); + + test("3 consecutive chunk errors escalate to fatal error", async () => { + restore = mockFetchSequence([ + () => new Response("e", { status: 500 }), + () => new Response("e", { status: 500 }), + () => new Response("e", { status: 500 }), + ]); + const t = makeTranscriber(); + let errored: Error | null = null; + let closed = false; + t.on("error", (e) => { + errored = e; + }); + t.on("close", () => { + closed = true; + }); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(15 * BYTES_PER_SEC)); + try { + await writer.close(); + } catch {} + await t.close(); + assert.ok(errored); + assert.match((errored as Error).message, /3 consecutive/); + assert.equal(closed, true); + }); + + test("close() flushes remaining ≥ 0.5s buffer as final chunk", async () => { + restore = mockFetchSequence([() => successResponse(["tail"], "tail")]); + const t = makeTranscriber(); + const turns: any[] = []; + t.on("turn", (p) => turns.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writeBytes(writer, new Uint8Array(1 * BYTES_PER_SEC)); + await writer.close(); + await t.close(); + assert.equal(turns.length, 1); + assert.equal(turns[0].isFinal, true); + assert.equal(turns[0].text, "tail"); + }); + + test("buffer overflow emits warning", async () => { + restore = mockFetchSequence([() => successResponse([], "")]); + const client = new RequestClient({ apiKey: "k", baseURL: "https://api.test" }); + const t = new Transcriber(client, { chunkSeconds: 5, overlapSeconds: 2, maxBufferSeconds: 6 }); + const warnings: any[] = []; + t.on("warning", (p) => warnings.push(p)); + await t.connect(); + const writer = t.stream().getWriter(); + await writer.write(new Uint8Array(10 * BYTES_PER_SEC)); + try { + await writer.close(); + } catch {} + await t.close(); + assert.ok(warnings.some((w) => w.code === "buffer_overflow")); + }); + + test("jigsaw.audio.speech_to_text_live is exposed on the SDK", async () => { + const { JigsawStack } = await import("../../index"); + const jigsaw = JigsawStack({ apiKey: "k", baseURL: "https://api.test" }); + const t = jigsaw.audio.speech_to_text_live({ chunkSeconds: 5, overlapSeconds: 2 }); + await t.connect(); + await t.close(); + assert.ok(t); + }); +});