From 746da6502a8fa30a71a6fc7f7c9157fbf90430a4 Mon Sep 17 00:00:00 2001 From: examples-bot Date: Sun, 5 Apr 2026 20:46:14 +0000 Subject: [PATCH] =?UTF-8?q?feat(examples):=20add=20490=20=E2=80=94=20Hayst?= =?UTF-8?q?ack=20Audio=20Transcription=20Pipeline=20(Python)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../.env.example | 2 + .../README.md | 55 ++++++ .../requirements.txt | 3 + .../src/__init__.py | 0 .../src/transcriber.py | 170 ++++++++++++++++++ .../tests/test_example.py | 108 +++++++++++ 6 files changed, 338 insertions(+) create mode 100644 examples/490-haystack-deepgram-stt-pipeline-python/.env.example create mode 100644 examples/490-haystack-deepgram-stt-pipeline-python/README.md create mode 100644 examples/490-haystack-deepgram-stt-pipeline-python/requirements.txt create mode 100644 examples/490-haystack-deepgram-stt-pipeline-python/src/__init__.py create mode 100644 examples/490-haystack-deepgram-stt-pipeline-python/src/transcriber.py create mode 100644 examples/490-haystack-deepgram-stt-pipeline-python/tests/test_example.py diff --git a/examples/490-haystack-deepgram-stt-pipeline-python/.env.example b/examples/490-haystack-deepgram-stt-pipeline-python/.env.example new file mode 100644 index 0000000..99314a3 --- /dev/null +++ b/examples/490-haystack-deepgram-stt-pipeline-python/.env.example @@ -0,0 +1,2 @@ +# Deepgram — https://console.deepgram.com/ +DEEPGRAM_API_KEY= diff --git a/examples/490-haystack-deepgram-stt-pipeline-python/README.md b/examples/490-haystack-deepgram-stt-pipeline-python/README.md new file mode 100644 index 0000000..9c4dfe1 --- /dev/null +++ b/examples/490-haystack-deepgram-stt-pipeline-python/README.md @@ -0,0 +1,55 @@ +# Haystack Audio Transcription Pipeline with Deepgram STT + +A Python example showing how to use Deepgram as a custom Haystack 2.x component for audio transcription in a RAG pipeline. Audio URLs go in, searchable Haystack Documents come out — complete with speaker labels, word timestamps, and confidence scores. + +## What you'll build + +A custom Haystack `@component` called `DeepgramTranscriber` that accepts audio URLs, transcribes them via Deepgram Pre-recorded STT (Nova-3), and outputs Haystack `Document` objects. The example includes a full ingestion pipeline that cleans transcripts and writes them to an in-memory document store for retrieval. + +## Prerequisites + +- Python 3.10+ +- Deepgram account — [get a free API key](https://console.deepgram.com/) + +## Environment variables + +| Variable | Where to find it | +|----------|-----------------| +| `DEEPGRAM_API_KEY` | [Deepgram console](https://console.deepgram.com/) → Settings → API Keys | + +## Install and run + +```bash +cp .env.example .env +# Add your DEEPGRAM_API_KEY to .env + +pip install -r requirements.txt + +# Transcribe a single audio file +python src/transcriber.py https://dpgr.am/spacewalk.wav + +# Batch transcribe multiple files +python src/transcriber.py https://dpgr.am/spacewalk.wav https://dpgr.am/spacewalk.wav +``` + +## Key parameters + +| Parameter | Value | Description | +|-----------|-------|-------------| +| `model` | `nova-3` | Deepgram's flagship speech model with highest accuracy | +| `smart_format` | `True` | Adds punctuation, capitalisation, and number formatting | +| `diarize` | `True` | Enables speaker diarization — each word gets a speaker ID | +| `language` | `en` | Language code for transcription | + +## How it works + +1. `DeepgramTranscriber` is a Haystack 2.x `@component` with an `run(urls=...)` method +2. For each URL, it calls Deepgram Pre-recorded STT with Nova-3 and diarization enabled +3. Deepgram fetches and processes the audio server-side (no local download needed) +4. Each transcript becomes a Haystack `Document` with metadata: duration, confidence, word timestamps, speaker labels +5. The ingestion pipeline passes documents through `DocumentCleaner` and into `InMemoryDocumentStore` +6. Documents in the store are ready for retrieval, filtering by metadata, or further RAG processing + +## Starter templates + +[deepgram-starters](https://github.com/orgs/deepgram-starters/repositories) diff --git a/examples/490-haystack-deepgram-stt-pipeline-python/requirements.txt b/examples/490-haystack-deepgram-stt-pipeline-python/requirements.txt new file mode 100644 index 0000000..70ae6b6 --- /dev/null +++ b/examples/490-haystack-deepgram-stt-pipeline-python/requirements.txt @@ -0,0 +1,3 @@ +deepgram-sdk==6.1.1 +haystack-ai==2.27.0 +python-dotenv==1.2.2 diff --git a/examples/490-haystack-deepgram-stt-pipeline-python/src/__init__.py b/examples/490-haystack-deepgram-stt-pipeline-python/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/490-haystack-deepgram-stt-pipeline-python/src/transcriber.py b/examples/490-haystack-deepgram-stt-pipeline-python/src/transcriber.py new file mode 100644 index 0000000..c3b7254 --- /dev/null +++ b/examples/490-haystack-deepgram-stt-pipeline-python/src/transcriber.py @@ -0,0 +1,170 @@ +"""Haystack 2.x component that transcribes audio via Deepgram Pre-recorded STT. + +Usage: + # Transcribe a single URL and search the transcript + python src/transcriber.py https://dpgr.am/spacewalk.wav + + # Batch mode — transcribe multiple audio files + python src/transcriber.py https://dpgr.am/spacewalk.wav https://dpgr.am/spacewalk.wav +""" + +import os +import sys +from typing import Any, Dict, List, Optional + +from dotenv import load_dotenv + +load_dotenv() + +from deepgram import DeepgramClient +from haystack import Document, Pipeline, component +from haystack.components.preprocessors import DocumentCleaner +from haystack.document_stores.in_memory import InMemoryDocumentStore +from haystack.components.writers import DocumentWriter + + +@component +class DeepgramTranscriber: + """Haystack 2.x @component that accepts audio URLs, transcribes them with + Deepgram nova-3, and outputs Haystack Document objects with rich metadata + (speaker labels, word timestamps, confidence scores). + + Deepgram processes the audio server-side — the audio never passes through + this component. This is faster and more memory-efficient than downloading + the file first. + """ + + def __init__( + self, + model: str = "nova-3", + smart_format: bool = True, + diarize: bool = True, + language: str = "en", + ) -> None: + self.model = model + self.smart_format = smart_format + self.diarize = diarize + self.language = language + + @component.output_types(documents=List[Document]) + def run(self, urls: List[str]) -> Dict[str, List[Document]]: + """Transcribe each URL and return Haystack Documents. + + Returns a dict with key "documents" — the Haystack component contract. + Each Document contains the full transcript as text and Deepgram metadata + (duration, confidence, word-level timestamps, speaker labels) as metadata. + """ + api_key = os.environ.get("DEEPGRAM_API_KEY") + if not api_key: + raise RuntimeError( + "DEEPGRAM_API_KEY not set. Get one at https://console.deepgram.com/" + ) + + client = DeepgramClient() + documents = [] + + for url in urls: + doc = self._transcribe_url(client, url) + documents.append(doc) + + return {"documents": documents} + + def _transcribe_url(self, client: DeepgramClient, url: str) -> Document: + # diarize=True enables speaker labels — each word gets a speaker ID + # so you can reconstruct who said what in multi-speaker audio. + response = client.listen.v1.media.transcribe_url( + url=url, + model=self.model, + smart_format=self.smart_format, + diarize=self.diarize, + language=self.language, + tag="deepgram-examples", + ) + + # response.results.channels[0].alternatives[0].transcript + channel = response.results.channels[0] + alt = channel.alternatives[0] + transcript = alt.transcript + words = alt.words or [] + duration = words[-1].end if words else 0.0 + + speakers = set() + word_data = [] + for w in words: + word_data.append({ + "word": w.word, + "start": w.start, + "end": w.end, + "confidence": w.confidence, + "speaker": getattr(w, "speaker", None), + }) + if getattr(w, "speaker", None) is not None: + speakers.add(w.speaker) + + metadata: Dict[str, Any] = { + "source": url, + "duration_seconds": round(duration, 2), + "confidence": alt.confidence, + "model": self.model, + "language": self.language, + "word_count": len(words), + "speaker_count": len(speakers), + "words": word_data, + } + + return Document(content=transcript, meta=metadata) + + +def build_ingest_pipeline( + document_store: Optional[InMemoryDocumentStore] = None, +) -> Pipeline: + """Build a Haystack pipeline: transcribe → clean → write to document store. + + This is the pattern for audio ingestion in a RAG system: audio URLs go in, + searchable Documents come out in the document store. + """ + if document_store is None: + document_store = InMemoryDocumentStore() + + pipeline = Pipeline() + pipeline.add_component("transcriber", DeepgramTranscriber()) + pipeline.add_component("cleaner", DocumentCleaner()) + pipeline.add_component("writer", DocumentWriter(document_store=document_store)) + + pipeline.connect("transcriber.documents", "cleaner.documents") + pipeline.connect("cleaner.documents", "writer.documents") + + return pipeline + + +def main() -> None: + if len(sys.argv) < 2: + print("Usage: python src/transcriber.py [ ...]") + sys.exit(1) + + audio_urls = sys.argv[1:] + + store = InMemoryDocumentStore() + pipeline = build_ingest_pipeline(document_store=store) + + print(f"Transcribing {len(audio_urls)} audio file(s)...") + result = pipeline.run({"transcriber": {"urls": audio_urls}}) + + written = result.get("writer", {}).get("documents_written", 0) + print(f"\n{written} document(s) written to the document store.") + + docs = store.filter_documents() + for i, doc in enumerate(docs): + print(f"\n{'='*60}") + print(f"Document {i + 1}") + print(f"{'='*60}") + print(f"Source: {doc.meta.get('source', 'unknown')}") + print(f"Duration: {doc.meta.get('duration_seconds', 0):.1f}s") + print(f"Confidence: {doc.meta.get('confidence', 0):.0%}") + print(f"Speakers: {doc.meta.get('speaker_count', 0)}") + print(f"Words: {doc.meta.get('word_count', 0)}") + print(f"\nTranscript preview:\n {doc.content[:300]}...") + + +if __name__ == "__main__": + main() diff --git a/examples/490-haystack-deepgram-stt-pipeline-python/tests/test_example.py b/examples/490-haystack-deepgram-stt-pipeline-python/tests/test_example.py new file mode 100644 index 0000000..2d9b203 --- /dev/null +++ b/examples/490-haystack-deepgram-stt-pipeline-python/tests/test_example.py @@ -0,0 +1,108 @@ +import os +import sys +from pathlib import Path + +# ── Credential check ──────────────────────────────────────────────────────── +env_example = Path(__file__).parent.parent / ".env.example" +required = [ + line.split("=")[0].strip() + for line in env_example.read_text().splitlines() + if line and not line.startswith("#") and "=" in line and line[0].isupper() +] +missing = [k for k in required if not os.environ.get(k)] +if missing: + print(f"MISSING_CREDENTIALS: {','.join(missing)}", file=sys.stderr) + sys.exit(2) +# ──────────────────────────────────────────────────────────────────────────── + +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) +from transcriber import DeepgramTranscriber, build_ingest_pipeline + +from haystack import Document +from haystack.document_stores.in_memory import InMemoryDocumentStore + +AUDIO_URL = "https://dpgr.am/spacewalk.wav" + + +def test_transcriber_component(): + """Verify DeepgramTranscriber returns Documents with transcript and metadata.""" + transcriber = DeepgramTranscriber() + result = transcriber.run(urls=[AUDIO_URL]) + + assert "documents" in result, "Component must return dict with 'documents' key" + docs = result["documents"] + assert len(docs) == 1, f"Expected 1 document, got {len(docs)}" + + doc = docs[0] + assert isinstance(doc, Document), "Output must be a Haystack Document" + assert len(doc.content) > 50, f"Transcript too short ({len(doc.content)} chars)" + + duration = doc.meta.get("duration_seconds", 0) + assert duration > 0, "Duration metadata missing" + chars_per_sec = len(doc.content) / duration if duration else 0 + assert chars_per_sec > 2, f"Transcript too short for duration ({chars_per_sec:.1f} chars/s)" + + assert doc.meta.get("source") == AUDIO_URL, "Source metadata missing" + assert doc.meta.get("confidence", 0) > 0.5, "Confidence too low" + assert doc.meta.get("model") == "nova-3", "Model metadata incorrect" + assert doc.meta.get("word_count", 0) > 0, "Word count missing" + assert doc.meta.get("speaker_count", 0) >= 0, "Speaker count missing" + + words = doc.meta.get("words", []) + assert len(words) > 0, "Word-level data missing" + first_word = words[0] + assert "word" in first_word, "Word data missing 'word' field" + assert "start" in first_word, "Word data missing 'start' field" + assert "end" in first_word, "Word data missing 'end' field" + assert "confidence" in first_word, "Word data missing 'confidence' field" + + print("✓ DeepgramTranscriber component working") + print(f" Transcript length: {len(doc.content)} chars") + print(f" Duration: {duration:.1f}s") + print(f" Words: {doc.meta.get('word_count')}") + print(f" Speakers: {doc.meta.get('speaker_count')}") + + +def test_batch_transcription(): + """Verify the component handles multiple URLs in a single run.""" + transcriber = DeepgramTranscriber() + result = transcriber.run(urls=[AUDIO_URL, AUDIO_URL]) + + docs = result["documents"] + assert len(docs) == 2, f"Expected 2 documents, got {len(docs)}" + + for i, doc in enumerate(docs): + assert len(doc.content) > 50, f"Document {i} transcript too short" + assert doc.meta.get("source") == AUDIO_URL + + print("✓ Batch transcription working (2 documents)") + + +def test_ingest_pipeline(): + """Verify the full pipeline: transcribe → clean → write to document store.""" + store = InMemoryDocumentStore() + pipeline = build_ingest_pipeline(document_store=store) + + result = pipeline.run({"transcriber": {"urls": [AUDIO_URL]}}) + + written = result.get("writer", {}).get("documents_written", 0) + assert written == 1, f"Expected 1 document written, got {written}" + + docs = store.filter_documents() + assert len(docs) == 1, f"Expected 1 document in store, got {len(docs)}" + + doc = docs[0] + assert len(doc.content) > 50, f"Stored document transcript too short" + assert doc.meta.get("source") == AUDIO_URL + assert doc.meta.get("duration_seconds", 0) > 0 + + print("✓ Ingest pipeline working (transcribe → clean → write)") + print(f" Documents in store: {len(docs)}") + print(f" Transcript length: {len(doc.content)} chars") + + +if __name__ == "__main__": + test_transcriber_component() + test_batch_transcription() + test_ingest_pipeline() + print("\n✓ All tests passed")