Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Deepgram — https://console.deepgram.com/
DEEPGRAM_API_KEY=
55 changes: 55 additions & 0 deletions examples/490-haystack-deepgram-stt-pipeline-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Haystack Audio Transcription Pipeline with Deepgram STT

A Python example showing how to use Deepgram as a custom Haystack 2.x component for audio transcription in a RAG pipeline. Audio URLs go in, searchable Haystack Documents come out — complete with speaker labels, word timestamps, and confidence scores.

## What you'll build

A custom Haystack `@component` called `DeepgramTranscriber` that accepts audio URLs, transcribes them via Deepgram Pre-recorded STT (Nova-3), and outputs Haystack `Document` objects. The example includes a full ingestion pipeline that cleans transcripts and writes them to an in-memory document store for retrieval.

## Prerequisites

- Python 3.10+
- Deepgram account — [get a free API key](https://console.deepgram.com/)

## Environment variables

| Variable | Where to find it |
|----------|-----------------|
| `DEEPGRAM_API_KEY` | [Deepgram console](https://console.deepgram.com/) → Settings → API Keys |

## Install and run

```bash
cp .env.example .env
# Add your DEEPGRAM_API_KEY to .env

pip install -r requirements.txt

# Transcribe a single audio file
python src/transcriber.py https://dpgr.am/spacewalk.wav

# Batch transcribe multiple files
python src/transcriber.py https://dpgr.am/spacewalk.wav https://dpgr.am/spacewalk.wav
```

## Key parameters

| Parameter | Value | Description |
|-----------|-------|-------------|
| `model` | `nova-3` | Deepgram's flagship speech model with highest accuracy |
| `smart_format` | `True` | Adds punctuation, capitalisation, and number formatting |
| `diarize` | `True` | Enables speaker diarization — each word gets a speaker ID |
| `language` | `en` | Language code for transcription |

## How it works

1. `DeepgramTranscriber` is a Haystack 2.x `@component` with an `run(urls=...)` method
2. For each URL, it calls Deepgram Pre-recorded STT with Nova-3 and diarization enabled
3. Deepgram fetches and processes the audio server-side (no local download needed)
4. Each transcript becomes a Haystack `Document` with metadata: duration, confidence, word timestamps, speaker labels
5. The ingestion pipeline passes documents through `DocumentCleaner` and into `InMemoryDocumentStore`
6. Documents in the store are ready for retrieval, filtering by metadata, or further RAG processing

## Starter templates

[deepgram-starters](https://github.com/orgs/deepgram-starters/repositories)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
deepgram-sdk==6.1.1
haystack-ai==2.27.0
python-dotenv==1.2.2
Empty file.
170 changes: 170 additions & 0 deletions examples/490-haystack-deepgram-stt-pipeline-python/src/transcriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Haystack 2.x component that transcribes audio via Deepgram Pre-recorded STT.

Usage:
# Transcribe a single URL and search the transcript
python src/transcriber.py https://dpgr.am/spacewalk.wav

# Batch mode — transcribe multiple audio files
python src/transcriber.py https://dpgr.am/spacewalk.wav https://dpgr.am/spacewalk.wav
"""

import os
import sys
from typing import Any, Dict, List, Optional

from dotenv import load_dotenv

load_dotenv()

from deepgram import DeepgramClient
from haystack import Document, Pipeline, component
from haystack.components.preprocessors import DocumentCleaner
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter


@component
class DeepgramTranscriber:
"""Haystack 2.x @component that accepts audio URLs, transcribes them with
Deepgram nova-3, and outputs Haystack Document objects with rich metadata
(speaker labels, word timestamps, confidence scores).

Deepgram processes the audio server-side — the audio never passes through
this component. This is faster and more memory-efficient than downloading
the file first.
"""

def __init__(
self,
model: str = "nova-3",
smart_format: bool = True,
diarize: bool = True,
language: str = "en",
) -> None:
self.model = model
self.smart_format = smart_format
self.diarize = diarize
self.language = language

@component.output_types(documents=List[Document])
def run(self, urls: List[str]) -> Dict[str, List[Document]]:
"""Transcribe each URL and return Haystack Documents.

Returns a dict with key "documents" — the Haystack component contract.
Each Document contains the full transcript as text and Deepgram metadata
(duration, confidence, word-level timestamps, speaker labels) as metadata.
"""
api_key = os.environ.get("DEEPGRAM_API_KEY")
if not api_key:
raise RuntimeError(
"DEEPGRAM_API_KEY not set. Get one at https://console.deepgram.com/"
)

client = DeepgramClient()
documents = []

for url in urls:
doc = self._transcribe_url(client, url)
documents.append(doc)

return {"documents": documents}

def _transcribe_url(self, client: DeepgramClient, url: str) -> Document:
# diarize=True enables speaker labels — each word gets a speaker ID
# so you can reconstruct who said what in multi-speaker audio.
response = client.listen.v1.media.transcribe_url(
url=url,
model=self.model,
smart_format=self.smart_format,
diarize=self.diarize,
language=self.language,
tag="deepgram-examples",
)

# response.results.channels[0].alternatives[0].transcript
channel = response.results.channels[0]
alt = channel.alternatives[0]
transcript = alt.transcript
words = alt.words or []
duration = words[-1].end if words else 0.0

speakers = set()
word_data = []
for w in words:
word_data.append({
"word": w.word,
"start": w.start,
"end": w.end,
"confidence": w.confidence,
"speaker": getattr(w, "speaker", None),
})
if getattr(w, "speaker", None) is not None:
speakers.add(w.speaker)

metadata: Dict[str, Any] = {
"source": url,
"duration_seconds": round(duration, 2),
"confidence": alt.confidence,
"model": self.model,
"language": self.language,
"word_count": len(words),
"speaker_count": len(speakers),
"words": word_data,
}

return Document(content=transcript, meta=metadata)


def build_ingest_pipeline(
document_store: Optional[InMemoryDocumentStore] = None,
) -> Pipeline:
"""Build a Haystack pipeline: transcribe → clean → write to document store.

This is the pattern for audio ingestion in a RAG system: audio URLs go in,
searchable Documents come out in the document store.
"""
if document_store is None:
document_store = InMemoryDocumentStore()

pipeline = Pipeline()
pipeline.add_component("transcriber", DeepgramTranscriber())
pipeline.add_component("cleaner", DocumentCleaner())
pipeline.add_component("writer", DocumentWriter(document_store=document_store))

pipeline.connect("transcriber.documents", "cleaner.documents")
pipeline.connect("cleaner.documents", "writer.documents")

return pipeline


def main() -> None:
if len(sys.argv) < 2:
print("Usage: python src/transcriber.py <audio-url> [<audio-url> ...]")
sys.exit(1)

audio_urls = sys.argv[1:]

store = InMemoryDocumentStore()
pipeline = build_ingest_pipeline(document_store=store)

print(f"Transcribing {len(audio_urls)} audio file(s)...")
result = pipeline.run({"transcriber": {"urls": audio_urls}})

written = result.get("writer", {}).get("documents_written", 0)
print(f"\n{written} document(s) written to the document store.")

docs = store.filter_documents()
for i, doc in enumerate(docs):
print(f"\n{'='*60}")
print(f"Document {i + 1}")
print(f"{'='*60}")
print(f"Source: {doc.meta.get('source', 'unknown')}")
print(f"Duration: {doc.meta.get('duration_seconds', 0):.1f}s")
print(f"Confidence: {doc.meta.get('confidence', 0):.0%}")
print(f"Speakers: {doc.meta.get('speaker_count', 0)}")
print(f"Words: {doc.meta.get('word_count', 0)}")
print(f"\nTranscript preview:\n {doc.content[:300]}...")


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import sys
from pathlib import Path

# ── Credential check ────────────────────────────────────────────────────────
env_example = Path(__file__).parent.parent / ".env.example"
required = [
line.split("=")[0].strip()
for line in env_example.read_text().splitlines()
if line and not line.startswith("#") and "=" in line and line[0].isupper()
]
missing = [k for k in required if not os.environ.get(k)]
if missing:
print(f"MISSING_CREDENTIALS: {','.join(missing)}", file=sys.stderr)
sys.exit(2)
# ────────────────────────────────────────────────────────────────────────────

sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from transcriber import DeepgramTranscriber, build_ingest_pipeline

from haystack import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore

AUDIO_URL = "https://dpgr.am/spacewalk.wav"


def test_transcriber_component():
"""Verify DeepgramTranscriber returns Documents with transcript and metadata."""
transcriber = DeepgramTranscriber()
result = transcriber.run(urls=[AUDIO_URL])

assert "documents" in result, "Component must return dict with 'documents' key"
docs = result["documents"]
assert len(docs) == 1, f"Expected 1 document, got {len(docs)}"

doc = docs[0]
assert isinstance(doc, Document), "Output must be a Haystack Document"
assert len(doc.content) > 50, f"Transcript too short ({len(doc.content)} chars)"

duration = doc.meta.get("duration_seconds", 0)
assert duration > 0, "Duration metadata missing"
chars_per_sec = len(doc.content) / duration if duration else 0
assert chars_per_sec > 2, f"Transcript too short for duration ({chars_per_sec:.1f} chars/s)"

assert doc.meta.get("source") == AUDIO_URL, "Source metadata missing"
assert doc.meta.get("confidence", 0) > 0.5, "Confidence too low"
assert doc.meta.get("model") == "nova-3", "Model metadata incorrect"
assert doc.meta.get("word_count", 0) > 0, "Word count missing"
assert doc.meta.get("speaker_count", 0) >= 0, "Speaker count missing"

words = doc.meta.get("words", [])
assert len(words) > 0, "Word-level data missing"
first_word = words[0]
assert "word" in first_word, "Word data missing 'word' field"
assert "start" in first_word, "Word data missing 'start' field"
assert "end" in first_word, "Word data missing 'end' field"
assert "confidence" in first_word, "Word data missing 'confidence' field"

print("✓ DeepgramTranscriber component working")
print(f" Transcript length: {len(doc.content)} chars")
print(f" Duration: {duration:.1f}s")
print(f" Words: {doc.meta.get('word_count')}")
print(f" Speakers: {doc.meta.get('speaker_count')}")


def test_batch_transcription():
"""Verify the component handles multiple URLs in a single run."""
transcriber = DeepgramTranscriber()
result = transcriber.run(urls=[AUDIO_URL, AUDIO_URL])

docs = result["documents"]
assert len(docs) == 2, f"Expected 2 documents, got {len(docs)}"

for i, doc in enumerate(docs):
assert len(doc.content) > 50, f"Document {i} transcript too short"
assert doc.meta.get("source") == AUDIO_URL

print("✓ Batch transcription working (2 documents)")


def test_ingest_pipeline():
"""Verify the full pipeline: transcribe → clean → write to document store."""
store = InMemoryDocumentStore()
pipeline = build_ingest_pipeline(document_store=store)

result = pipeline.run({"transcriber": {"urls": [AUDIO_URL]}})

written = result.get("writer", {}).get("documents_written", 0)
assert written == 1, f"Expected 1 document written, got {written}"

docs = store.filter_documents()
assert len(docs) == 1, f"Expected 1 document in store, got {len(docs)}"

doc = docs[0]
assert len(doc.content) > 50, f"Stored document transcript too short"
assert doc.meta.get("source") == AUDIO_URL
assert doc.meta.get("duration_seconds", 0) > 0

print("✓ Ingest pipeline working (transcribe → clean → write)")
print(f" Documents in store: {len(docs)}")
print(f" Transcript length: {len(doc.content)} chars")


if __name__ == "__main__":
test_transcriber_component()
test_batch_transcription()
test_ingest_pipeline()
print("\n✓ All tests passed")