From 0354f9d825079acaf1dd2ed45a946f580ddd8fae Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 11 Mar 2026 21:58:58 +0000 Subject: [PATCH 1/4] Replace AssemblyAI STT with Inworld STT API - Create InworldSTTNode using Inworld's REST API (POST /stt/v1/transcribe) with energy-based VAD for end-of-turn detection - Remove assembly-ai-stt-ws-node.ts and its WebSocket-based streaming logic - Update ConversationGraphWrapper to hold inworldSTTNode reference - Update ConversationGraphConfig to accept inworldApiKey (was assemblyAIApiKey) - Replace ASSEMBLY_AI_API_KEY env var with INWORLD_API_KEY in graph-service, .env.example, and render.yaml (single key for all Inworld services) - Replace AssemblyAI turn-detection presets in server config with equivalent Inworld STT VAD presets (silenceThresholdMs / minSpeechMs / energyThreshold) - Rename ASSEMBLY_AI_EAGERNESS env var to INWORLD_STT_EAGERNESS - Update comments in connection-manager, transcript-extractor-node, server, and audio-processor.js to reflect the new STT provider https://claude.ai/code/session_01EDqcCeQHNj2f2TVeFb5Dxh --- backend/.env.example | 3 +- backend/src/config/server.ts | 69 +- backend/src/graphs/conversation-graph.ts | 53 +- .../graphs/nodes/assembly-ai-stt-ws-node.ts | 707 ------------------ backend/src/graphs/nodes/inworld-stt-node.ts | 457 +++++++++++ .../graphs/nodes/transcript-extractor-node.ts | 2 +- backend/src/helpers/connection-manager.ts | 10 +- backend/src/server.ts | 4 +- backend/src/services/graph-service.ts | 8 +- frontend/public/audio-processor.js | 4 +- render.yaml | 2 - 11 files changed, 529 insertions(+), 790 deletions(-) delete mode 100644 backend/src/graphs/nodes/assembly-ai-stt-ws-node.ts create mode 100644 backend/src/graphs/nodes/inworld-stt-node.ts diff --git a/backend/.env.example b/backend/.env.example index 2488c28..39a9da0 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -1,5 +1,4 @@ INWORLD_API_KEY= -ASSEMBLY_AI_API_KEY= SUPABASE_URL= -SUPABASE_SECRET_KEY= \ No newline at end of file +SUPABASE_SECRET_KEY= diff --git a/backend/src/config/server.ts b/backend/src/config/server.ts index aa6f9ac..9d7cef5 100644 --- a/backend/src/config/server.ts +++ b/backend/src/config/server.ts @@ -5,31 +5,29 @@ * Environment variables can override defaults where appropriate. */ -export interface AssemblyAITurnDetectionSettings { - endOfTurnConfidenceThreshold: number; - minEndOfTurnSilenceWhenConfident: number; - maxTurnSilence: number; +export interface InworldSTTSettings { + silenceThresholdMs: number; + minSpeechMs: number; + silenceEnergyThreshold: number; description: string; } -export type AssemblyAIEagerness = 'low' | 'medium' | 'high'; +export type InworldSTTEagerness = 'low' | 'medium' | 'high'; /** - * AssemblyAI turn detection presets based on their documentation - * @see https://www.assemblyai.com/docs/speech-to-text/universal-streaming/turn-detection + * Inworld STT VAD presets controlling how eagerly the system ends a turn. + * These mirror the former AssemblyAI turn-detection presets so existing + * environment-variable overrides (INWORLD_STT_EAGERNESS) behave predictably. */ -const assemblyAIPresets: Record< - AssemblyAIEagerness, - AssemblyAITurnDetectionSettings -> = { +const inworldSTTPresets: Record = { /** * Aggressive - Quick responses for rapid back-and-forth * Use cases: Agent Assist, IVR replacements, Retail/E-commerce, Telecom */ high: { - endOfTurnConfidenceThreshold: 0.4, - minEndOfTurnSilenceWhenConfident: 160, - maxTurnSilence: 400, + silenceThresholdMs: 400, + minSpeechMs: 100, + silenceEnergyThreshold: 0.01, description: 'Aggressive - Quick responses for rapid back-and-forth (IVR, order confirmations)', }, @@ -39,11 +37,10 @@ const assemblyAIPresets: Record< * Use cases: Customer Support, Tech Support, Financial Services, Travel */ medium: { - endOfTurnConfidenceThreshold: 0.4, - minEndOfTurnSilenceWhenConfident: 400, - maxTurnSilence: 1280, - description: - 'Balanced - Natural middle ground for most conversational turns', + silenceThresholdMs: 700, + minSpeechMs: 150, + silenceEnergyThreshold: 0.01, + description: 'Balanced - Natural middle ground for most conversational turns', }, /** @@ -51,9 +48,9 @@ const assemblyAIPresets: Record< * Use cases: Healthcare, Mental Health, Sales, Legal, Language Learning */ low: { - endOfTurnConfidenceThreshold: 0.7, - minEndOfTurnSilenceWhenConfident: 800, - maxTurnSilence: 3600, + silenceThresholdMs: 1000, + minSpeechMs: 200, + silenceEnergyThreshold: 0.01, description: 'Conservative - Patient, allows thinking pauses (Language Learning, Healthcare)', }, @@ -76,14 +73,12 @@ export const serverConfig = { }, /** - * AssemblyAI speech-to-text configuration + * Inworld STT configuration */ - assemblyAI: { - /** Turn detection eagerness level */ - eagerness: (process.env.ASSEMBLY_AI_EAGERNESS || - 'high') as AssemblyAIEagerness, - /** Format turns in output (typically false for real-time processing) */ - formatTurns: false, + inworldSTT: { + /** VAD eagerness level */ + eagerness: (process.env.INWORLD_STT_EAGERNESS || + 'high') as InworldSTTEagerness, }, /** @@ -96,18 +91,18 @@ export const serverConfig = { } as const; /** - * Get AssemblyAI turn detection settings for the configured eagerness level + * Get Inworld STT VAD settings for the configured eagerness level */ -export function getAssemblyAISettings(): AssemblyAITurnDetectionSettings { - return assemblyAIPresets[serverConfig.assemblyAI.eagerness]; +export function getInworldSTTSettings(): InworldSTTSettings { + return inworldSTTPresets[serverConfig.inworldSTT.eagerness]; } /** - * Get AssemblyAI turn detection settings for a specific eagerness level + * Get Inworld STT VAD settings for a specific eagerness level * @param eagerness - The eagerness level ('low' | 'medium' | 'high') */ -export function getAssemblyAISettingsForEagerness( - eagerness: AssemblyAIEagerness -): AssemblyAITurnDetectionSettings { - return assemblyAIPresets[eagerness]; +export function getInworldSTTSettingsForEagerness( + eagerness: InworldSTTEagerness +): InworldSTTSettings { + return inworldSTTPresets[eagerness]; } diff --git a/backend/src/graphs/conversation-graph.ts b/backend/src/graphs/conversation-graph.ts index 1c4f578..7e2f6d7 100644 --- a/backend/src/graphs/conversation-graph.ts +++ b/backend/src/graphs/conversation-graph.ts @@ -2,13 +2,13 @@ * Conversation Graph for Language Learning App - Inworld Runtime 0.9 * * This is a long-running circular graph that: - * - Processes continuous audio streams via AssemblyAI STT with built-in VAD + * - Processes continuous audio streams via Inworld STT with energy-based VAD * - Queues interactions for sequential processing * - Uses language-specific prompts and TTS voices * - Loops back for the next interaction automatically * * Graph Flow: - * AudioInput → AssemblyAI STT (loop) → TranscriptExtractor → InteractionQueue + * AudioInput → Inworld STT (loop) → TranscriptExtractor → InteractionQueue * → TextInput → DialogPromptBuilder → LLM → TextChunking → TTSRequestBuilder → TTS * → TextAggregator → StateUpdate → (loop back to InteractionQueue) */ @@ -23,7 +23,7 @@ import { TextAggregatorNode, } from '@inworld/runtime/graph'; -import { AssemblyAISTTWebSocketNode } from './nodes/assembly-ai-stt-ws-node.js'; +import { InworldSTTNode } from './nodes/inworld-stt-node.js'; import { DialogPromptBuilderNode } from './nodes/dialog-prompt-builder-node.js'; import { InteractionQueueNode } from './nodes/interaction-queue-node.js'; import { MemoryRetrievalNode } from './nodes/memory-retrieval-node.js'; @@ -37,33 +37,33 @@ import { DEFAULT_LANGUAGE_CODE, } from '../config/languages.js'; import { llmConfig } from '../config/llm.js'; -import { serverConfig, getAssemblyAISettings } from '../config/server.js'; +import { serverConfig, getInworldSTTSettings } from '../config/server.js'; import { graphLogger as logger } from '../utils/logger.js'; export interface ConversationGraphConfig { - assemblyAIApiKey: string; + inworldApiKey: string; connections: ConnectionsMap; defaultLanguageCode?: string; } /** * Wrapper class for the conversation graph - * Provides access to the graph and the AssemblyAI node for session management + * Provides access to the graph and the Inworld STT node for session management */ export class ConversationGraphWrapper { graph: Graph; - assemblyAINode: AssemblyAISTTWebSocketNode; + inworldSTTNode: InworldSTTNode; private constructor(params: { graph: Graph; - assemblyAINode: AssemblyAISTTWebSocketNode; + inworldSTTNode: InworldSTTNode; }) { this.graph = params.graph; - this.assemblyAINode = params.assemblyAINode; + this.inworldSTTNode = params.inworldSTTNode; } async destroy(): Promise { - await this.assemblyAINode.destroy(); + await this.inworldSTTNode.destroy(); await this.graph.stop(); } @@ -73,7 +73,7 @@ export class ConversationGraphWrapper { static create(config: ConversationGraphConfig): ConversationGraphWrapper { const { connections, - assemblyAIApiKey, + inworldApiKey, defaultLanguageCode = DEFAULT_LANGUAGE_CODE, } = config; // Use provided language code or default to Spanish @@ -92,20 +92,17 @@ export class ConversationGraphWrapper { // Start node (audio input proxy) const audioInputNode = new ProxyNode({ id: `audio-input-proxy${postfix}` }); - // AssemblyAI STT with built-in VAD (always uses multilingual model) - const turnDetectionSettings = getAssemblyAISettings(); - const assemblyAISTTNode = new AssemblyAISTTWebSocketNode({ - id: `assembly-ai-stt-ws-node${postfix}`, + // Inworld STT with energy-based VAD + const sttSettings = getInworldSTTSettings(); + const inworldSTTNode = new InworldSTTNode({ + id: `inworld-stt-node${postfix}`, config: { - apiKey: assemblyAIApiKey, + apiKey: inworldApiKey, connections: connections, sampleRate: serverConfig.audio.inputSampleRate, - formatTurns: serverConfig.assemblyAI.formatTurns, - endOfTurnConfidenceThreshold: - turnDetectionSettings.endOfTurnConfidenceThreshold, - minEndOfTurnSilenceWhenConfident: - turnDetectionSettings.minEndOfTurnSilenceWhenConfident, - maxTurnSilence: turnDetectionSettings.maxTurnSilence, + silenceThresholdMs: sttSettings.silenceThresholdMs, + minSpeechMs: sttSettings.minSpeechMs, + silenceEnergyThreshold: sttSettings.silenceEnergyThreshold, }, }); @@ -190,7 +187,7 @@ export class ConversationGraphWrapper { graphBuilder // Add all nodes .addNode(audioInputNode) - .addNode(assemblyAISTTNode) + .addNode(inworldSTTNode) .addNode(transcriptExtractorNode) .addNode(interactionQueueNode) .addNode(textInputNode) @@ -206,10 +203,10 @@ export class ConversationGraphWrapper { // ============================================================ // Audio Input Flow (STT with VAD) // ============================================================ - .addEdge(audioInputNode, assemblyAISTTNode) + .addEdge(audioInputNode, inworldSTTNode) - // AssemblyAI loops back to itself while stream is active - .addEdge(assemblyAISTTNode, assemblyAISTTNode, { + // Inworld STT loops back to itself while stream is active + .addEdge(inworldSTTNode, inworldSTTNode, { condition: async (input: unknown) => { const data = input as { stream_exhausted?: boolean }; return data?.stream_exhausted !== true; @@ -219,7 +216,7 @@ export class ConversationGraphWrapper { }) // When interaction is complete, extract transcript - .addEdge(assemblyAISTTNode, transcriptExtractorNode, { + .addEdge(inworldSTTNode, transcriptExtractorNode, { condition: async (input: unknown) => { const data = input as { interaction_complete?: boolean }; return data?.interaction_complete === true; @@ -283,7 +280,7 @@ export class ConversationGraphWrapper { return new ConversationGraphWrapper({ graph, - assemblyAINode: assemblyAISTTNode, + inworldSTTNode, }); } } diff --git a/backend/src/graphs/nodes/assembly-ai-stt-ws-node.ts b/backend/src/graphs/nodes/assembly-ai-stt-ws-node.ts deleted file mode 100644 index 3b566e9..0000000 --- a/backend/src/graphs/nodes/assembly-ai-stt-ws-node.ts +++ /dev/null @@ -1,707 +0,0 @@ -import { DataStreamWithMetadata } from '@inworld/runtime'; -import { CustomNode, GraphTypes, ProcessContext } from '@inworld/runtime/graph'; -import WebSocket from 'ws'; -import { v4 as uuidv4 } from 'uuid'; - -import { Connection } from '../../types/index.js'; -import { audioDataToPCM16 } from '../../helpers/audio-utils.js'; -import { createLogger } from '../../utils/logger.js'; - -const logger = createLogger('AssemblyAI'); - -/** - * Configuration interface for AssemblyAISTTWebSocketNode - */ -export interface AssemblyAISTTWebSocketNodeConfig { - /** Assembly.AI API key */ - apiKey: string; - /** Connections map to access session state */ - connections: { [sessionId: string]: Connection }; - /** Sample rate of the audio stream in Hz */ - sampleRate?: number; - /** Enable turn formatting from Assembly.AI */ - formatTurns?: boolean; - /** End of turn confidence threshold (0-1) */ - endOfTurnConfidenceThreshold?: number; - /** Minimum silence duration when confident (in milliseconds) */ - minEndOfTurnSilenceWhenConfident?: number; - /** Maximum turn silence (in milliseconds) */ - maxTurnSilence?: number; -} - -/** - * Manages a persistent WebSocket connection to Assembly.AI for a single session. - */ -class AssemblyAISession { - private ws: WebSocket | null = null; - private wsReady: boolean = false; - private wsConnectionPromise: Promise | null = null; - - public assemblySessionId: string = ''; - public sessionExpiresAt: number = 0; - public shouldStopProcessing: boolean = false; - - private inactivityTimeout: NodeJS.Timeout | null = null; - private lastActivityTime: number = Date.now(); - private readonly INACTIVITY_TIMEOUT_MS = 60000; // 60 seconds - - constructor( - public readonly sessionId: string, - private apiKey: string, - private url: string - ) {} - - /** - * Ensure WebSocket connection is ready, reconnecting if needed - */ - public async ensureConnection(): Promise { - const now = Math.floor(Date.now() / 1000); - const isExpired = this.sessionExpiresAt > 0 && now >= this.sessionExpiresAt; - - if ( - !this.ws || - !this.wsReady || - this.ws.readyState !== WebSocket.OPEN || - isExpired - ) { - if (isExpired) { - logger.info( - { sessionId: this.sessionId }, - 'session_expired_reconnecting' - ); - } - this.closeWebSocket(); - this.initializeWebSocket(); - } - - if (this.wsConnectionPromise) { - await this.wsConnectionPromise; - } - - this.shouldStopProcessing = false; - this.resetInactivityTimer(); - } - - private initializeWebSocket(): void { - logger.debug({ sessionId: this.sessionId }, 'initializing_websocket'); - - this.wsConnectionPromise = new Promise((resolve, reject) => { - this.ws = new WebSocket(this.url, { - headers: { Authorization: this.apiKey }, - }); - - this.ws.on('open', () => { - logger.debug({ sessionId: this.sessionId }, 'websocket_opened'); - this.wsReady = true; - resolve(); - }); - - // Permanent message handler for session metadata - this.ws.on('message', (data: WebSocket.Data) => { - try { - const message = JSON.parse(data.toString()); - if (message.type === 'Begin') { - this.assemblySessionId = message.id || message.session_id || ''; - this.sessionExpiresAt = message.expires_at || 0; - logger.debug( - { assemblySessionId: this.assemblySessionId }, - 'session_began' - ); - } - } catch { - // Ignore parsing errors - } - }); - - this.ws.on('error', (error: Error) => { - logger.error({ err: error }, 'websocket_error'); - this.wsReady = false; - reject(error); - }); - - this.ws.on('close', (code: number, reason: Buffer) => { - logger.debug({ code, reason: reason.toString() }, 'websocket_closed'); - this.wsReady = false; - }); - }); - } - - public onMessage(listener: (data: WebSocket.Data) => void): void { - if (this.ws) { - this.ws.on('message', listener); - } - } - - public offMessage(listener: (data: WebSocket.Data) => void): void { - if (this.ws) { - this.ws.off('message', listener); - } - } - - public sendAudio(pcm16Data: Int16Array): void { - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - this.ws.send(Buffer.from(pcm16Data.buffer)); - this.resetInactivityTimer(); - } - } - - private resetInactivityTimer(): void { - if (this.inactivityTimeout) { - clearTimeout(this.inactivityTimeout); - } - this.lastActivityTime = Date.now(); - this.inactivityTimeout = setTimeout(() => { - this.closeDueToInactivity(); - }, this.INACTIVITY_TIMEOUT_MS); - } - - /** - * Clear the inactivity timer without closing the connection. - * Used when we know no audio will be coming (e.g., text-only interactions). - */ - public clearInactivityTimer(): void { - if (this.inactivityTimeout) { - clearTimeout(this.inactivityTimeout); - this.inactivityTimeout = null; - } - } - - private closeDueToInactivity(): void { - const inactiveFor = Date.now() - this.lastActivityTime; - logger.info( - { sessionId: this.sessionId, inactiveMs: inactiveFor }, - 'closing_due_to_inactivity' - ); - // Only close the WebSocket to stop billing, but keep the session reusable. - // Don't set shouldStopProcessing - this allows the graph to continue waiting - // for input and reconnect when audio arrives. - this.closeWebSocket(); - // Note: We intentionally do NOT call onCleanup here anymore. - // The session stays in the map and can be reactivated by ensureConnection(). - } - - private closeWebSocket(): void { - if (this.ws) { - try { - this.ws.removeAllListeners(); - if (this.ws.readyState === WebSocket.OPEN) { - this.ws.close(); - } - } catch (e) { - logger.warn({ err: e }, 'error_closing_socket'); - } - this.ws = null; - this.wsReady = false; - } - } - - public async close(): Promise { - if (this.inactivityTimeout) { - clearTimeout(this.inactivityTimeout); - } - - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - try { - this.ws.send(JSON.stringify({ type: 'Terminate' })); - await new Promise((resolve) => setTimeout(resolve, 100)); - } catch { - // Ignore - } - } - - this.closeWebSocket(); - } -} - -/** - * AssemblyAISTTWebSocketNode processes continuous multimodal streams using Assembly.AI's - * streaming Speech-to-Text service via direct WebSocket connection. - * - * This node: - * - Receives MultimodalContent stream (audio and/or text) - * - For audio: extracts audio and feeds to Assembly.AI streaming transcriber - * - For text: bypasses STT and returns text directly - * - Detects turn endings using Assembly.AI's neural turn detection - * - Returns DataStreamWithMetadata with transcribed text when a turn completes - */ -export class AssemblyAISTTWebSocketNode extends CustomNode { - private apiKey: string; - private connections: { [sessionId: string]: Connection }; - private sampleRate: number; - private formatTurns: boolean; - private endOfTurnConfidenceThreshold: number; - private minEndOfTurnSilenceWhenConfident: number; - private maxTurnSilence: number; - private wsEndpointBaseUrl: string = 'wss://streaming.assemblyai.com/v3/ws'; - - private sessions: Map = new Map(); - private readonly TURN_COMPLETION_TIMEOUT_MS = 2000; - private readonly MAX_TRANSCRIPTION_DURATION_MS = 40000; - - constructor(props: { - id?: string; - config: AssemblyAISTTWebSocketNodeConfig; - }) { - const { config, ...nodeProps } = props; - - if (!config.apiKey) { - throw new Error('AssemblyAISTTWebSocketNode requires an API key.'); - } - if (!config.connections) { - throw new Error( - 'AssemblyAISTTWebSocketNode requires a connections object.' - ); - } - - super({ id: nodeProps.id || 'assembly-ai-stt-ws-node' }); - - this.apiKey = config.apiKey; - this.connections = config.connections; - this.sampleRate = config.sampleRate || 16000; - this.formatTurns = config.formatTurns ?? false; - this.endOfTurnConfidenceThreshold = - config.endOfTurnConfidenceThreshold ?? 0.7; - this.minEndOfTurnSilenceWhenConfident = - config.minEndOfTurnSilenceWhenConfident ?? 800; - this.maxTurnSilence = config.maxTurnSilence ?? 3600; - - logger.info( - { - threshold: this.endOfTurnConfidenceThreshold, - minSilenceMs: this.minEndOfTurnSilenceWhenConfident, - maxSilenceMs: this.maxTurnSilence, - }, - 'stt_node_configured' - ); - } - - /** - * Build WebSocket URL with query parameters - */ - private buildWebSocketUrl(): string { - const params = new URLSearchParams({ - sample_rate: this.sampleRate.toString(), - encoding: 'pcm_s16le', - format_turns: this.formatTurns.toString(), - end_of_turn_confidence_threshold: - this.endOfTurnConfidenceThreshold.toString(), - min_end_of_turn_silence_when_confident: - this.minEndOfTurnSilenceWhenConfident.toString(), - max_turn_silence: this.maxTurnSilence.toString(), - speech_model: 'universal-streaming-multilingual', - language_detection: 'true', - }); - - const url = `${this.wsEndpointBaseUrl}?${params.toString()}`; - logger.debug( - { - model: 'universal-streaming-multilingual', - threshold: this.endOfTurnConfidenceThreshold, - maxSilenceMs: this.maxTurnSilence, - }, - 'connecting_to_assemblyai' - ); - - return url; - } - - /** - * Process multimodal stream and transcribe using Assembly.AI WebSocket - */ - async process( - context: ProcessContext, - input0: AsyncIterableIterator, - input: DataStreamWithMetadata - ): Promise { - const multimodalStream = - input !== undefined && - input !== null && - input instanceof DataStreamWithMetadata - ? (input.toStream() as unknown as AsyncIterableIterator) - : input0; - - const sessionId = context.getDatastore().get('sessionId') as string; - const connection = this.connections[sessionId]; - - if (connection?.unloaded) { - throw Error(`Session unloaded for sessionId: ${sessionId}`); - } - if (!connection) { - throw Error(`Failed to read connection for sessionId: ${sessionId}`); - } - - // Get iteration from metadata or parse from interactionId - const metadata = input?.getMetadata?.() || {}; - let previousIteration = (metadata.iteration as number) || 0; - - if ( - !connection.state.interactionId || - connection.state.interactionId === '' - ) { - connection.state.interactionId = uuidv4(); - } - - const currentId = connection.state.interactionId; - const delimiterIndex = currentId.indexOf('#'); - - if (previousIteration === 0 && delimiterIndex !== -1) { - const iterationStr = currentId.substring(delimiterIndex + 1); - const parsedIteration = parseInt(iterationStr, 10); - if (!isNaN(parsedIteration) && /^\d+$/.test(iterationStr)) { - previousIteration = parsedIteration; - } - } - - const iteration = previousIteration + 1; - const baseId = - delimiterIndex !== -1 - ? currentId.substring(0, delimiterIndex) - : currentId; - const nextInteractionId = `${baseId}#${iteration}`; - - logger.debug({ iteration }, 'starting_transcription'); - - // State tracking - let transcriptText = ''; - let turnDetected = false; - let speechDetected = false; - let audioChunkCount = 0; - let totalAudioSamples = 0; - let isStreamExhausted = false; - let errorOccurred = false; - let errorMessage = ''; - let maxDurationReached = false; - let isTextInput = false; - let textContent: string | undefined; - - // Get or create session - let session = this.sessions.get(sessionId); - if (!session) { - session = new AssemblyAISession( - sessionId, - this.apiKey, - this.buildWebSocketUrl() - ); - this.sessions.set(sessionId, session); - } - - // Promise to capture turn result - let turnResolve: (value: string) => void = () => {}; - let turnReject: (error: Error) => void = () => {}; - let turnCompleted = false; - const turnPromise = new Promise((resolve, reject) => { - turnResolve = resolve; - turnReject = reject; - }); - const turnPromiseWithState = turnPromise.then((value) => { - turnCompleted = true; - return value; - }); - - // AssemblyAI message handler for this process() call - const messageHandler = (data: WebSocket.Data) => { - try { - const message = JSON.parse(data.toString()); - const msgType = message.type; - - if (msgType === 'Turn') { - if (session?.shouldStopProcessing) { - return; - } - - const transcript = message.transcript || ''; - const utterance = message.utterance || ''; - const isFinal = message.end_of_turn; - - if (!transcript) return; - - if (!isFinal) { - // Partial transcript - const textToSend = utterance || transcript; - if (textToSend) { - this.sendPartialTranscript( - sessionId, - nextInteractionId, - textToSend - ); - - if (connection?.onSpeechDetected && !speechDetected) { - logger.debug({ iteration }, 'speech_detected'); - speechDetected = true; - connection.onSpeechDetected(nextInteractionId); - } - } - return; - } - - // Final transcript - check for pending transcript to stitch - let finalTranscript = transcript; - - if (connection?.pendingTranscript) { - // Stitch the pending transcript with the new one - finalTranscript = - `${connection.pendingTranscript} ${transcript}`.trim(); - logger.debug( - { - iteration, - transcriptSnippet: finalTranscript.substring(0, 80), - }, - 'stitched_transcript' - ); - // Clear the pending transcript - connection.pendingTranscript = undefined; - } else { - logger.debug( - { iteration, transcriptSnippet: transcript.substring(0, 50) }, - 'turn_detected' - ); - } - - // Clear interrupt flag for new processing - if (connection) { - connection.isProcessingInterrupted = false; - } - - transcriptText = finalTranscript; - turnDetected = true; - if (session) session.shouldStopProcessing = true; - turnResolve(finalTranscript); - } else if (msgType === 'Termination') { - logger.debug({ iteration }, 'session_terminated'); - } - } catch (error) { - logger.error({ err: error }, 'error_handling_message'); - } - }; - - try { - await session.ensureConnection(); - session.onMessage(messageHandler); - - // Process multimodal content (audio chunks) - const audioProcessingPromise = (async () => { - let maxDurationTimeout: NodeJS.Timeout | null = null; - try { - // Safety timer: prevent infinite loops - maxDurationTimeout = setTimeout(() => { - maxDurationReached = true; - }, this.MAX_TRANSCRIPTION_DURATION_MS); - - while (true) { - if (session?.shouldStopProcessing) break; - - if (maxDurationReached && !transcriptText) { - logger.warn( - { maxDurationMs: this.MAX_TRANSCRIPTION_DURATION_MS }, - 'max_transcription_duration_reached' - ); - break; - } - - const result = await multimodalStream.next(); - - if (result.done) { - logger.debug( - { iteration, audioChunkCount }, - 'multimodal_stream_exhausted' - ); - isStreamExhausted = true; - break; - } - - if (session?.shouldStopProcessing) break; - - const content = result.value as GraphTypes.MultimodalContent; - - // Handle text input - if (content.text !== undefined && content.text !== null) { - logger.debug( - { iteration, textSnippet: content.text.substring(0, 50) }, - 'text_input_detected' - ); - isTextInput = true; - textContent = content.text; - transcriptText = content.text; - turnDetected = true; - if (session) { - session.shouldStopProcessing = true; - // Clear inactivity timer since we're handling text, not audio - // This prevents the 60s timeout from firing and disrupting the session - session.clearInactivityTimer(); - } - turnResolve(transcriptText); - break; - } - - // Extract audio - if (content.audio === undefined || content.audio === null) continue; - - const audioData = content.audio.data; - if (!audioData || audioData.length === 0) continue; - - audioChunkCount++; - totalAudioSamples += audioData.length; - - const pcm16Data = audioDataToPCM16(audioData); - session?.sendAudio(pcm16Data); - } - } catch (error) { - logger.error({ err: error }, 'error_processing_audio'); - errorOccurred = true; - errorMessage = error instanceof Error ? error.message : String(error); - throw error; - } finally { - if (maxDurationTimeout) { - clearTimeout(maxDurationTimeout); - } - } - })(); - - const raceResult = await Promise.race([ - turnPromiseWithState.then(() => ({ winner: 'turn' as const })), - audioProcessingPromise.then(() => ({ winner: 'audio' as const })), - ]); - - if ( - raceResult.winner === 'audio' && - !turnCompleted && - !maxDurationReached - ) { - logger.debug( - { waitMs: this.TURN_COMPLETION_TIMEOUT_MS }, - 'audio_ended_before_turn_waiting' - ); - - // Send silence to keep connection alive - AssemblyAI needs continuous audio - const silenceIntervalMs = 100; - const silenceSamples = Math.floor( - (silenceIntervalMs / 1000) * this.sampleRate - ); - const silenceFrame = new Int16Array(silenceSamples); - const silenceTimer = setInterval(() => { - if (session && !session.shouldStopProcessing) { - session.sendAudio(silenceFrame); - } - }, silenceIntervalMs); - - const timeoutPromise = new Promise<{ winner: 'timeout' }>((resolve) => - setTimeout( - () => resolve({ winner: 'timeout' }), - this.TURN_COMPLETION_TIMEOUT_MS - ) - ); - - const waitResult = await Promise.race([ - turnPromiseWithState.then(() => ({ winner: 'turn' as const })), - timeoutPromise, - ]); - - clearInterval(silenceTimer); - - if (waitResult.winner === 'timeout' && !turnCompleted) { - logger.warn('timed_out_waiting_for_turn'); - turnReject?.(new Error('Timed out waiting for turn completion')); - } - } - - await audioProcessingPromise.catch(() => {}); - - logger.debug( - { iteration, transcriptSnippet: transcriptText?.substring(0, 50) }, - 'transcription_complete' - ); - - if (turnDetected) { - connection.state.interactionId = ''; - } - - const taggedStream = Object.assign(multimodalStream, { - type: 'MultimodalContent', - abort: () => {}, - getMetadata: () => ({}), - }); - - return new DataStreamWithMetadata(taggedStream, { - elementType: 'MultimodalContent', - iteration: iteration, - interactionId: nextInteractionId, - session_id: sessionId, - assembly_session_id: session.assemblySessionId, - transcript: transcriptText, - turn_detected: turnDetected, - audio_chunk_count: audioChunkCount, - total_audio_samples: totalAudioSamples, - sample_rate: this.sampleRate, - stream_exhausted: isStreamExhausted, - interaction_complete: turnDetected && transcriptText.length > 0, - error_occurred: errorOccurred, - error_message: errorMessage, - is_text_input: isTextInput, - text_content: textContent, - }); - } catch (error) { - logger.error({ err: error, iteration }, 'transcription_failed'); - - const taggedStream = Object.assign(multimodalStream, { - type: 'MultimodalContent', - abort: () => {}, - getMetadata: () => ({}), - }); - - return new DataStreamWithMetadata(taggedStream, { - elementType: 'MultimodalContent', - iteration: iteration, - interactionId: nextInteractionId, - session_id: sessionId, - transcript: '', - turn_detected: false, - stream_exhausted: isStreamExhausted, - interaction_complete: false, - error_occurred: true, - error_message: error instanceof Error ? error.message : String(error), - is_text_input: isTextInput, - text_content: textContent, - }); - } finally { - if (session) { - session.offMessage(messageHandler); - } - } - } - - private sendPartialTranscript( - sessionId: string, - interactionId: string, - text: string - ): void { - const connection = this.connections[sessionId]; - if (!connection?.onPartialTranscript) return; - - try { - connection.onPartialTranscript(text, interactionId); - } catch (error) { - logger.error({ err: error }, 'error_sending_partial_transcript'); - } - } - - async closeSession(sessionId: string): Promise { - const session = this.sessions.get(sessionId); - if (session) { - logger.debug({ sessionId }, 'closing_session'); - await session.close(); - this.sessions.delete(sessionId); - } - } - - async destroy(): Promise { - logger.info({ sessionCount: this.sessions.size }, 'destroying_node'); - - const promises: Promise[] = []; - for (const session of this.sessions.values()) { - promises.push(session.close()); - } - - await Promise.all(promises); - this.sessions.clear(); - } -} diff --git a/backend/src/graphs/nodes/inworld-stt-node.ts b/backend/src/graphs/nodes/inworld-stt-node.ts new file mode 100644 index 0000000..fee66e4 --- /dev/null +++ b/backend/src/graphs/nodes/inworld-stt-node.ts @@ -0,0 +1,457 @@ +import { DataStreamWithMetadata } from '@inworld/runtime'; +import { CustomNode, GraphTypes, ProcessContext } from '@inworld/runtime/graph'; +import { v4 as uuidv4 } from 'uuid'; + +import { Connection } from '../../types/index.js'; +import { audioDataToPCM16 } from '../../helpers/audio-utils.js'; +import { createLogger } from '../../utils/logger.js'; + +const logger = createLogger('InworldSTT'); + +const INWORLD_STT_URL = 'https://api.inworld.ai/stt/v1/transcribe'; + +/** + * Configuration interface for InworldSTTNode + */ +export interface InworldSTTNodeConfig { + /** Inworld API key (Base64 credentials) */ + apiKey: string; + /** Connections map to access session state */ + connections: { [sessionId: string]: Connection }; + /** Sample rate of the audio stream in Hz */ + sampleRate?: number; + /** + * Duration of silence (ms) after speech that signals end-of-turn. + * Lower = more responsive; higher = more patient with pauses. + */ + silenceThresholdMs?: number; + /** Minimum speech duration (ms) before a turn is considered valid */ + minSpeechMs?: number; + /** + * RMS energy threshold below which audio is considered silence (0–1 scale). + * Tune this based on ambient noise levels. + */ + silenceEnergyThreshold?: number; +} + +/** + * Compute root-mean-square energy of a PCM16 frame. + * Returns a value in [0, 1] (normalised by the max Int16 value). + */ +function computeRMS(pcm16: Int16Array): number { + if (pcm16.length === 0) return 0; + let sum = 0; + for (let i = 0; i < pcm16.length; i++) { + const s = pcm16[i] / 32768; + sum += s * s; + } + return Math.sqrt(sum / pcm16.length); +} + +/** + * Encode a list of PCM16 chunks as a base64 LINEAR16 string suitable for + * the Inworld STT REST API. + */ +function encodePCM16ToBase64(chunks: Int16Array[]): string { + const totalSamples = chunks.reduce((acc, c) => acc + c.length, 0); + const buffer = new Int16Array(totalSamples); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; + } + // Convert Int16Array to Buffer (little-endian bytes) + const byteBuffer = Buffer.from(buffer.buffer, buffer.byteOffset, buffer.byteLength); + return byteBuffer.toString('base64'); +} + +/** + * Call the Inworld STT REST API with buffered PCM16 audio. + */ +async function callInworldSTT( + apiKey: string, + audioBase64: string +): Promise { + const response = await fetch(INWORLD_STT_URL, { + method: 'POST', + headers: { + Authorization: `Basic ${apiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + transcribeConfig: { + modelId: 'groq/whisper-large-v3', + audioEncoding: 'LINEAR16', + }, + audioData: { + content: audioBase64, + }, + }), + }); + + if (!response.ok) { + const errText = await response.text().catch(() => ''); + throw new Error( + `Inworld STT request failed: ${response.status} ${response.statusText} - ${errText}` + ); + } + + const json = (await response.json()) as { + transcription?: { transcript?: string }; + }; + return json?.transcription?.transcript ?? ''; +} + +/** + * InworldSTTNode processes continuous multimodal streams using Inworld's + * Speech-to-Text REST API combined with energy-based VAD. + * + * This node: + * - Receives MultimodalContent stream (audio and/or text) + * - For audio: buffers PCM16 data, detects end-of-turn via silence energy, + * then POSTs to the Inworld STT API for transcription + * - For text: bypasses STT and returns text directly + * - Returns DataStreamWithMetadata with transcribed text when a turn completes + */ +export class InworldSTTNode extends CustomNode { + private apiKey: string; + private connections: { [sessionId: string]: Connection }; + private sampleRate: number; + private silenceThresholdMs: number; + private minSpeechMs: number; + private silenceEnergyThreshold: number; + + private readonly MAX_TRANSCRIPTION_DURATION_MS = 40000; + + constructor(props: { id?: string; config: InworldSTTNodeConfig }) { + const { config, ...nodeProps } = props; + + if (!config.apiKey) { + throw new Error('InworldSTTNode requires an API key.'); + } + if (!config.connections) { + throw new Error('InworldSTTNode requires a connections object.'); + } + + super({ id: nodeProps.id || 'inworld-stt-node' }); + + this.apiKey = config.apiKey; + this.connections = config.connections; + this.sampleRate = config.sampleRate ?? 16000; + this.silenceThresholdMs = config.silenceThresholdMs ?? 800; + this.minSpeechMs = config.minSpeechMs ?? 200; + this.silenceEnergyThreshold = config.silenceEnergyThreshold ?? 0.01; + + logger.info( + { + silenceThresholdMs: this.silenceThresholdMs, + minSpeechMs: this.minSpeechMs, + energyThreshold: this.silenceEnergyThreshold, + }, + 'stt_node_configured' + ); + } + + /** + * Process multimodal stream, detect turn end via VAD, then transcribe via + * Inworld STT REST API. + */ + async process( + context: ProcessContext, + input0: AsyncIterableIterator, + input: DataStreamWithMetadata + ): Promise { + const multimodalStream = + input !== undefined && + input !== null && + input instanceof DataStreamWithMetadata + ? (input.toStream() as unknown as AsyncIterableIterator) + : input0; + + const sessionId = context.getDatastore().get('sessionId') as string; + const connection = this.connections[sessionId]; + + if (connection?.unloaded) { + throw Error(`Session unloaded for sessionId: ${sessionId}`); + } + if (!connection) { + throw Error(`Failed to read connection for sessionId: ${sessionId}`); + } + + // Compute iteration counter + const metadata = input?.getMetadata?.() || {}; + let previousIteration = (metadata.iteration as number) || 0; + + if ( + !connection.state.interactionId || + connection.state.interactionId === '' + ) { + connection.state.interactionId = uuidv4(); + } + + const currentId = connection.state.interactionId; + const delimiterIndex = currentId.indexOf('#'); + + if (previousIteration === 0 && delimiterIndex !== -1) { + const iterationStr = currentId.substring(delimiterIndex + 1); + const parsedIteration = parseInt(iterationStr, 10); + if (!isNaN(parsedIteration) && /^\d+$/.test(iterationStr)) { + previousIteration = parsedIteration; + } + } + + const iteration = previousIteration + 1; + const baseId = + delimiterIndex !== -1 + ? currentId.substring(0, delimiterIndex) + : currentId; + const nextInteractionId = `${baseId}#${iteration}`; + + logger.debug({ iteration }, 'starting_transcription'); + + // State tracking + let transcriptText = ''; + let turnDetected = false; + let speechDetected = false; + let audioChunkCount = 0; + let totalAudioSamples = 0; + let isStreamExhausted = false; + let errorOccurred = false; + let errorMessage = ''; + let isTextInput = false; + let textContent: string | undefined; + + // VAD state + const speechBuffer: Int16Array[] = []; + const samplesPerMs = this.sampleRate / 1000; + const silenceChunksThreshold = Math.ceil( + this.silenceThresholdMs / 100 // chunks are ~100ms each (1600 samples @ 16kHz) + ); + const minSpeechChunks = Math.ceil(this.minSpeechMs / 100); + + let silenceChunkCount = 0; + let speechChunkCount = 0; + let maxDurationReached = false; + let turnEndedByVAD = false; + + // Safety timer + const maxDurationTimer = setTimeout(() => { + maxDurationReached = true; + }, this.MAX_TRANSCRIPTION_DURATION_MS); + + try { + for await (const content of multimodalStream) { + // Handle text input — bypass STT entirely + if (content.text !== undefined && content.text !== null) { + logger.debug( + { iteration, textSnippet: content.text.substring(0, 50) }, + 'text_input_detected' + ); + isTextInput = true; + textContent = content.text; + transcriptText = content.text; + turnDetected = true; + turnEndedByVAD = false; + break; + } + + // Safety: stop if max duration reached and we have some speech + if (maxDurationReached) { + if (speechChunkCount >= minSpeechChunks) { + logger.warn( + { maxDurationMs: this.MAX_TRANSCRIPTION_DURATION_MS }, + 'max_transcription_duration_reached' + ); + turnEndedByVAD = true; + } + break; + } + + // Extract audio + const audioData = content.audio?.data; + if (!audioData || audioData.length === 0) continue; + + audioChunkCount++; + totalAudioSamples += audioData.length; + + const pcm16 = audioDataToPCM16(audioData); + const rms = computeRMS(pcm16); + + if (rms > this.silenceEnergyThreshold) { + // Active speech + silenceChunkCount = 0; + speechChunkCount++; + speechBuffer.push(pcm16); + + if (!speechDetected) { + speechDetected = true; + logger.debug({ iteration }, 'speech_detected'); + + // Notify connection of speech start (triggers UI interrupt / speech indicator) + if (connection.onSpeechDetected) { + connection.onSpeechDetected(nextInteractionId); + } + } + } else { + // Silence + if (speechDetected) { + silenceChunkCount++; + // Still buffer the silence (avoids cutting off trailing phonemes) + speechBuffer.push(pcm16); + + if (silenceChunkCount >= silenceChunksThreshold) { + // Enough silence after speech — treat as end of turn + if (speechChunkCount >= minSpeechChunks) { + logger.debug( + { iteration, speechChunkCount, silenceChunkCount }, + 'vad_end_of_turn' + ); + turnEndedByVAD = true; + break; + } else { + // Too short — discard and reset + logger.debug({ iteration }, 'vad_noise_discarded'); + speechBuffer.length = 0; + speechDetected = false; + speechChunkCount = 0; + silenceChunkCount = 0; + } + } + } + } + } + + // Stream exhausted naturally (no more audio) + if (!isTextInput && !turnEndedByVAD) { + isStreamExhausted = true; + if (speechDetected && speechChunkCount >= minSpeechChunks) { + turnEndedByVAD = true; + } + } + + // Transcribe buffered speech via Inworld REST API + if (turnEndedByVAD && speechBuffer.length > 0) { + try { + const audioBase64 = encodePCM16ToBase64(speechBuffer); + + logger.debug( + { + iteration, + speechChunks: speechBuffer.length, + totalSamples: speechBuffer.reduce((a, c) => a + c.length, 0), + }, + 'calling_inworld_stt' + ); + + let rawTranscript = await callInworldSTT(this.apiKey, audioBase64); + + // Stitch pending transcript if present + if (connection.pendingTranscript) { + rawTranscript = + `${connection.pendingTranscript} ${rawTranscript}`.trim(); + logger.debug( + { iteration, transcriptSnippet: rawTranscript.substring(0, 80) }, + 'stitched_transcript' + ); + connection.pendingTranscript = undefined; + } + + transcriptText = rawTranscript; + turnDetected = transcriptText.length > 0; + + if (turnDetected) { + // Clear interrupt flag for new processing + if (connection) { + connection.isProcessingInterrupted = false; + } + logger.debug( + { iteration, transcriptSnippet: transcriptText.substring(0, 50) }, + 'turn_detected' + ); + } + } catch (err) { + logger.error({ err, iteration }, 'inworld_stt_call_failed'); + errorOccurred = true; + errorMessage = err instanceof Error ? err.message : String(err); + } + } + + if (turnDetected) { + connection.state.interactionId = ''; + } + + logger.debug( + { iteration, transcriptSnippet: transcriptText?.substring(0, 50) }, + 'transcription_complete' + ); + + const taggedStream = Object.assign(multimodalStream, { + type: 'MultimodalContent', + abort: () => {}, + getMetadata: () => ({}), + }); + + return new DataStreamWithMetadata(taggedStream, { + elementType: 'MultimodalContent', + iteration, + interactionId: nextInteractionId, + session_id: sessionId, + transcript: transcriptText, + turn_detected: turnDetected, + audio_chunk_count: audioChunkCount, + total_audio_samples: totalAudioSamples, + sample_rate: this.sampleRate, + stream_exhausted: isStreamExhausted, + interaction_complete: turnDetected && transcriptText.length > 0, + error_occurred: errorOccurred, + error_message: errorMessage, + is_text_input: isTextInput, + text_content: textContent, + }); + } catch (error) { + logger.error({ err: error, iteration }, 'transcription_failed'); + + const taggedStream = Object.assign(multimodalStream, { + type: 'MultimodalContent', + abort: () => {}, + getMetadata: () => ({}), + }); + + return new DataStreamWithMetadata(taggedStream, { + elementType: 'MultimodalContent', + iteration, + interactionId: nextInteractionId, + session_id: sessionId, + transcript: '', + turn_detected: false, + stream_exhausted: isStreamExhausted, + interaction_complete: false, + error_occurred: true, + error_message: error instanceof Error ? error.message : String(error), + is_text_input: isTextInput, + text_content: textContent, + }); + } finally { + clearTimeout(maxDurationTimer); + } + } + + private sendPartialTranscript( + sessionId: string, + interactionId: string, + text: string + ): void { + const connection = this.connections[sessionId]; + if (!connection?.onPartialTranscript) return; + + try { + connection.onPartialTranscript(text, interactionId); + } catch (error) { + logger.error({ err: error }, 'error_sending_partial_transcript'); + } + } + + async destroy(): Promise { + logger.info('destroying_node'); + } +} diff --git a/backend/src/graphs/nodes/transcript-extractor-node.ts b/backend/src/graphs/nodes/transcript-extractor-node.ts index 75d6132..83ee37d 100644 --- a/backend/src/graphs/nodes/transcript-extractor-node.ts +++ b/backend/src/graphs/nodes/transcript-extractor-node.ts @@ -1,6 +1,6 @@ /** * TranscriptExtractorNode extracts transcript information from - * DataStreamWithMetadata (output from AssemblyAISTTNode) and converts + * DataStreamWithMetadata (output from InworldSTTNode) and converts * it to InteractionInfo for downstream processing. */ diff --git a/backend/src/helpers/connection-manager.ts b/backend/src/helpers/connection-manager.ts index 9659d5a..6e8671a 100644 --- a/backend/src/helpers/connection-manager.ts +++ b/backend/src/helpers/connection-manager.ts @@ -4,7 +4,7 @@ * This replaces the AudioProcessor for Inworld Runtime 0.9. * Key differences from AudioProcessor: * - Uses MultimodalStreamManager to feed audio to a long-running graph - * - VAD is handled inside the graph by AssemblyAI (not external Silero) + * - VAD is handled inside the graph by InworldSTTNode (energy-based silence detection) * - Graph runs continuously for the session duration */ @@ -540,7 +540,7 @@ export class ConnectionManager { } /** - * Handle speech detected event from AssemblyAI + * Handle speech detected event from Inworld STT */ private handleSpeechDetected(interactionId: string): void { this.logger.debug({ interactionId }, 'speech_detected'); @@ -578,7 +578,7 @@ export class ConnectionManager { } /** - * Handle partial transcript from AssemblyAI + * Handle partial transcript from Inworld STT */ private handlePartialTranscript(text: string, interactionId: string): void { this.sendToClient({ @@ -1085,8 +1085,8 @@ export class ConnectionManager { // End the multimodal stream this.multimodalStreamManager.end(); - // Close AssemblyAI session - await this.graphWrapper.assemblyAINode.closeSession(this.sessionId); + // Destroy Inworld STT node + await this.graphWrapper.inworldSTTNode.destroy(); // Remove from connections map delete this.connections[this.sessionId]; diff --git a/backend/src/server.ts b/backend/src/server.ts index 90fc1d3..40bb7e5 100644 --- a/backend/src/server.ts +++ b/backend/src/server.ts @@ -1,7 +1,7 @@ /** * Language Learning Server - Inworld Runtime 0.9 * - * This server uses a long-running circular graph with AssemblyAI for VAD/STT. + * This server uses a long-running circular graph with Inworld STT for VAD/STT. * Key components: * - ConversationGraphWrapper: The main graph that processes audio → STT → LLM → TTS * - ConnectionManager: Manages WebSocket connections and feeds audio to the graph @@ -107,7 +107,7 @@ async function startServer(): Promise { await exportGraphConfigs(); server.listen(serverConfig.port, () => { logger.info({ port: serverConfig.port }, 'server_started'); - logger.info('using_inworld_runtime_0.9_with_assemblyai_stt'); + logger.info('using_inworld_runtime_0.9_with_inworld_stt'); }); } catch (error) { logger.fatal({ err: error }, 'server_start_failed'); diff --git a/backend/src/services/graph-service.ts b/backend/src/services/graph-service.ts index 1bfd910..5a48ea7 100644 --- a/backend/src/services/graph-service.ts +++ b/backend/src/services/graph-service.ts @@ -31,14 +31,14 @@ export function getGraphWrapper(): ConversationGraphWrapper | null { } export async function initializeGraph(): Promise { - const assemblyAIApiKey = process.env.ASSEMBLY_AI_API_KEY; - if (!assemblyAIApiKey) { - throw new Error('ASSEMBLY_AI_API_KEY environment variable is required'); + const inworldApiKey = process.env.INWORLD_API_KEY; + if (!inworldApiKey) { + throw new Error('INWORLD_API_KEY environment variable is required'); } logger.info('initializing_conversation_graph'); graphWrapper = getConversationGraph({ - assemblyAIApiKey, + inworldApiKey, connections, defaultLanguageCode: 'es', // Always Spanish }); diff --git a/frontend/public/audio-processor.js b/frontend/public/audio-processor.js index b2f19cf..4c6b152 100644 --- a/frontend/public/audio-processor.js +++ b/frontend/public/audio-processor.js @@ -1,6 +1,6 @@ /** * AudioWorklet processor for capturing and resampling microphone audio - * Buffers to 100ms chunks (1600 samples at 16kHz) to meet AssemblyAI requirements + * Buffers to 100ms chunks (1600 samples at 16kHz) for the Inworld STT backend * Outputs Float32 audio (backend handles conversion to PCM16) */ class AudioProcessor extends AudioWorkletProcessor { @@ -14,7 +14,7 @@ class AudioProcessor extends AudioWorkletProcessor { this.inputBuffer = null; // Output buffer to collect 100ms of resampled audio (1600 samples at 16kHz) - // AssemblyAI requires chunks between 50-1000ms + // 100ms chunks provide good granularity for energy-based VAD this.outputBuffer = []; this.outputBufferSize = 1600; // 100ms at 16kHz } diff --git a/render.yaml b/render.yaml index f9f53ca..6425564 100644 --- a/render.yaml +++ b/render.yaml @@ -12,8 +12,6 @@ services: value: production - key: INWORLD_API_KEY sync: false - - key: ASSEMBLY_AI_API_KEY - sync: false - key: SUPABASE_URL sync: false - key: SUPABASE_SECRET_KEY From 7d5de000b2745e62da951770d6abd5d2d333d1a1 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 11 Mar 2026 22:46:51 +0000 Subject: [PATCH 2/4] Fix ESLint errors in InworldSTTNode and server config - Fix prettier line-length formatting for Buffer.from() call in inworld-stt-node.ts - Remove unused samplesPerMs variable in inworld-stt-node.ts - Fix prettier line-length formatting for description string in server.ts https://claude.ai/code/session_01EDqcCeQHNj2f2TVeFb5Dxh --- backend/src/config/server.ts | 3 ++- backend/src/graphs/nodes/inworld-stt-node.ts | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/backend/src/config/server.ts b/backend/src/config/server.ts index 9d7cef5..9da0992 100644 --- a/backend/src/config/server.ts +++ b/backend/src/config/server.ts @@ -40,7 +40,8 @@ const inworldSTTPresets: Record = { silenceThresholdMs: 700, minSpeechMs: 150, silenceEnergyThreshold: 0.01, - description: 'Balanced - Natural middle ground for most conversational turns', + description: + 'Balanced - Natural middle ground for most conversational turns', }, /** diff --git a/backend/src/graphs/nodes/inworld-stt-node.ts b/backend/src/graphs/nodes/inworld-stt-node.ts index fee66e4..54ca471 100644 --- a/backend/src/graphs/nodes/inworld-stt-node.ts +++ b/backend/src/graphs/nodes/inworld-stt-node.ts @@ -61,7 +61,11 @@ function encodePCM16ToBase64(chunks: Int16Array[]): string { offset += chunk.length; } // Convert Int16Array to Buffer (little-endian bytes) - const byteBuffer = Buffer.from(buffer.buffer, buffer.byteOffset, buffer.byteLength); + const byteBuffer = Buffer.from( + buffer.buffer, + buffer.byteOffset, + buffer.byteLength + ); return byteBuffer.toString('base64'); } @@ -223,7 +227,6 @@ export class InworldSTTNode extends CustomNode { // VAD state const speechBuffer: Int16Array[] = []; - const samplesPerMs = this.sampleRate / 1000; const silenceChunksThreshold = Math.ceil( this.silenceThresholdMs / 100 // chunks are ~100ms each (1600 samples @ 16kHz) ); From a95e42adaf870d54b59c469aa4ba3b5854f89339 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 11 Mar 2026 22:47:06 +0000 Subject: [PATCH 3/4] Update package-lock.json after npm install https://claude.ai/code/session_01EDqcCeQHNj2f2TVeFb5Dxh --- backend/package-lock.json | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/backend/package-lock.json b/backend/package-lock.json index 51c3e72..0672fc4 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -1695,7 +1695,6 @@ "integrity": "sha512-tK3GPFWbirvNgsNKto+UmB/cRtn6TZfyw0D6IKrW55n6Vbs7KJoZtI//kpTKzE/DUmmnAFD8/Ca46s7Obs92/w==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.46.4", "@typescript-eslint/types": "8.46.4", @@ -2191,7 +2190,6 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -3222,7 +3220,6 @@ "integrity": "sha512-BhHmn2yNOFA9H9JmmIVKJmd288g9hrVRDkdoIgRCRuSySRUHH7r/DI6aAXW9T1WwUuY3DFgrcaqB+deURBLR5g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -3283,7 +3280,6 @@ "integrity": "sha512-iI1f+D2ViGn+uvv5HuHVUamg8ll4tN+JRHGc6IJi4TP9Kl976C57fzPXgseXNs8v0iA8aSJpHsTWjDb9QJamGQ==", "dev": true, "license": "MIT", - "peer": true, "bin": { "eslint-config-prettier": "bin/cli.js" }, @@ -5343,7 +5339,6 @@ "integrity": "sha512-I7AIg5boAr5R0FFtJ6rCfD+LFsWHp81dolrFD8S79U9tb8Az2nGrJncnMSnys+bpQJfRUzqs9hnA81OAA3hCuQ==", "dev": true, "license": "MIT", - "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -6293,7 +6288,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -6394,7 +6388,6 @@ "integrity": "sha512-ytQKuwgmrrkDTFP4LjR0ToE2nqgy886GpvRSpU0JAnrdBYppuY5rLkRUYPU1yCryb24SsKBTL/hlDQAEFVwtZg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "~0.25.0", "get-tsconfig": "^4.7.5" @@ -6455,7 +6448,6 @@ "integrity": "sha512-CWBzXQrc/qOkhidw1OzBTQuYRbfyxDXJMVJ1XNwUHGROVmuaeiEm3OslpZ1RV96d7SKKjZKrSJu3+t/xlw3R9A==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -6573,7 +6565,6 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", @@ -7151,7 +7142,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -7165,7 +7155,6 @@ "integrity": "sha512-E4t7DJ9pESL6E3I8nFjPa4xGUd3PmiWDLsDztS2qXSJWfHtbQnwAWylaBvSNY48I3vr8PTqIZlyK8TE3V3CA4Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/expect": "4.0.16", "@vitest/mocker": "4.0.16", From 719889cadc8e46f7cfca6155f359cfa66b9e3de3 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 12 Mar 2026 17:45:31 +0000 Subject: [PATCH 4/4] Fix code review issues from PR #55 - Add 30s timeout protection to callInworldSTT to prevent network stalls - Fix session lifecycle: remove shared STT node destruction from ConnectionManager.destroy() since the node is shared across sessions https://claude.ai/code/session_01TJxsn7u4AVgj7UHWPffSha --- backend/src/graphs/nodes/inworld-stt-node.ts | 68 +++++++++++++------- backend/src/helpers/connection-manager.ts | 5 +- 2 files changed, 46 insertions(+), 27 deletions(-) diff --git a/backend/src/graphs/nodes/inworld-stt-node.ts b/backend/src/graphs/nodes/inworld-stt-node.ts index 54ca471..7fe9397 100644 --- a/backend/src/graphs/nodes/inworld-stt-node.ts +++ b/backend/src/graphs/nodes/inworld-stt-node.ts @@ -69,6 +69,9 @@ function encodePCM16ToBase64(chunks: Int16Array[]): string { return byteBuffer.toString('base64'); } +/** Timeout for STT API requests in milliseconds */ +const STT_REQUEST_TIMEOUT_MS = 30000; + /** * Call the Inworld STT REST API with buffered PCM16 audio. */ @@ -76,34 +79,49 @@ async function callInworldSTT( apiKey: string, audioBase64: string ): Promise { - const response = await fetch(INWORLD_STT_URL, { - method: 'POST', - headers: { - Authorization: `Basic ${apiKey}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - transcribeConfig: { - modelId: 'groq/whisper-large-v3', - audioEncoding: 'LINEAR16', - }, - audioData: { - content: audioBase64, + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), STT_REQUEST_TIMEOUT_MS); + + try { + const response = await fetch(INWORLD_STT_URL, { + method: 'POST', + headers: { + Authorization: `Basic ${apiKey}`, + 'Content-Type': 'application/json', }, - }), - }); + body: JSON.stringify({ + transcribeConfig: { + modelId: 'groq/whisper-large-v3', + audioEncoding: 'LINEAR16', + }, + audioData: { + content: audioBase64, + }, + }), + signal: controller.signal, + }); + + if (!response.ok) { + const errText = await response.text().catch(() => ''); + throw new Error( + `Inworld STT request failed: ${response.status} ${response.statusText} - ${errText}` + ); + } - if (!response.ok) { - const errText = await response.text().catch(() => ''); - throw new Error( - `Inworld STT request failed: ${response.status} ${response.statusText} - ${errText}` - ); + const json = (await response.json()) as { + transcription?: { transcript?: string }; + }; + return json?.transcription?.transcript ?? ''; + } catch (error) { + if (error instanceof Error && error.name === 'AbortError') { + throw new Error( + `Inworld STT request timed out after ${STT_REQUEST_TIMEOUT_MS}ms` + ); + } + throw error; + } finally { + clearTimeout(timeoutId); } - - const json = (await response.json()) as { - transcription?: { transcript?: string }; - }; - return json?.transcription?.transcript ?? ''; } /** diff --git a/backend/src/helpers/connection-manager.ts b/backend/src/helpers/connection-manager.ts index 6e8671a..37fb28e 100644 --- a/backend/src/helpers/connection-manager.ts +++ b/backend/src/helpers/connection-manager.ts @@ -1085,8 +1085,9 @@ export class ConnectionManager { // End the multimodal stream this.multimodalStreamManager.end(); - // Destroy Inworld STT node - await this.graphWrapper.inworldSTTNode.destroy(); + // Note: Do NOT destroy the STT node here - it's shared across sessions + // via the cached ConversationGraphWrapper. The STT node will be cleaned + // up when destroyConversationGraph() is called during server shutdown. // Remove from connections map delete this.connections[this.sessionId];