diff --git a/helpers/transcribe.py b/helpers/transcribe.py index 26d3906..df51545 100644 --- a/helpers/transcribe.py +++ b/helpers/transcribe.py @@ -1,16 +1,19 @@ -"""Transcribe a video with ElevenLabs Scribe. +"""Transcribe a video with OpenAI Whisper (local, no API key required). -Extracts mono 16kHz audio via ffmpeg, uploads to Scribe with verbatim + -diarize + audio events + word-level timestamps, writes the full response -to /transcripts/.json. +Extracts mono 16kHz audio via ffmpeg, runs Whisper with word-level timestamps, +writes the full response to /transcripts/.json in the +same schema that the rest of video-use expects from ElevenLabs Scribe: -Cached: if the output file already exists, the upload is skipped. + {"words": [{"type": "word", "text": str, "start": float, "end": float, + "speaker_id": "S0"}, ...]} + +Cached: if the output file already exists, transcription is skipped. Usage: python helpers/transcribe.py python helpers/transcribe.py --edit-dir /custom/edit python helpers/transcribe.py --language en - python helpers/transcribe.py --num-speakers 2 + python helpers/transcribe.py --model medium """ from __future__ import annotations @@ -24,75 +27,56 @@ import time from pathlib import Path -import requests - - -SCRIBE_URL = "https://api.elevenlabs.io/v1/speech-to-text" - - -def load_api_key() -> str: - for candidate in [Path(__file__).resolve().parent.parent / ".env", Path(".env")]: - if candidate.exists(): - for line in candidate.read_text().splitlines(): - line = line.strip() - if not line or line.startswith("#") or "=" not in line: - continue - k, v = line.split("=", 1) - if k.strip() == "ELEVENLABS_API_KEY": - return v.strip().strip('"').strip("'") - v = os.environ.get("ELEVENLABS_API_KEY", "") - if not v: - sys.exit("ELEVENLABS_API_KEY not found in .env or environment") - return v +# Allow callers to override the ffmpeg binary path via env var +FFMPEG_BIN = os.environ.get("FFMPEG_BIN", str(Path.home() / ".local/bin/ffmpeg")) def extract_audio(video_path: Path, dest: Path) -> None: cmd = [ - "ffmpeg", "-y", "-i", str(video_path), + FFMPEG_BIN, "-y", "-i", str(video_path), "-vn", "-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le", str(dest), ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) -def call_scribe( +def call_whisper( audio_path: Path, - api_key: str, language: str | None = None, - num_speakers: int | None = None, + model_name: str = "base", ) -> dict: - data: dict[str, str] = { - "model_id": "scribe_v1", - "diarize": "true", - "tag_audio_events": "true", - "timestamps_granularity": "word", - } - if language: - data["language_code"] = language - if num_speakers: - data["num_speakers"] = str(num_speakers) - - with open(audio_path, "rb") as f: - resp = requests.post( - SCRIBE_URL, - headers={"xi-api-key": api_key}, - files={"file": (audio_path.name, f, "audio/wav")}, - data=data, - timeout=1800, - ) - - if resp.status_code != 200: - raise RuntimeError(f"Scribe returned {resp.status_code}: {resp.text[:500]}") - - return resp.json() + import whisper # imported here so the module is usable without whisper installed globally + + model = whisper.load_model(model_name) + result = model.transcribe( + str(audio_path), + language=language, + word_timestamps=True, + verbose=False, + ) + + words = [] + for segment in result.get("segments", []): + for w in segment.get("words", []): + text = w["word"].strip() + if not text: + continue + words.append({ + "type": "word", + "text": text, + "start": round(w["start"], 3), + "end": round(w["end"], 3), + "speaker_id": "S0", + }) + + return {"words": words, "text": result.get("text", "").strip()} def transcribe_one( video: Path, edit_dir: Path, - api_key: str, language: str | None = None, - num_speakers: int | None = None, + model_name: str = "base", verbose: bool = True, ) -> Path: """Transcribe a single video. Returns path to transcript JSON. @@ -108,17 +92,16 @@ def transcribe_one( print(f"cached: {out_path.name}") return out_path - if verbose: - print(f" extracting audio from {video.name}", flush=True) - t0 = time.time() with tempfile.TemporaryDirectory() as tmp: audio = Path(tmp) / f"{video.stem}.wav" + if verbose: + print(f" extracting audio from {video.name}", flush=True) extract_audio(video, audio) size_mb = audio.stat().st_size / (1024 * 1024) if verbose: - print(f" uploading {video.stem}.wav ({size_mb:.1f} MB)", flush=True) - payload = call_scribe(audio, api_key, language, num_speakers) + print(f" transcribing {video.stem}.wav ({size_mb:.1f} MB) with whisper:{model_name}", flush=True) + payload = call_whisper(audio, language=language, model_name=model_name) out_path.write_text(json.dumps(payload, indent=2)) dt = time.time() - t0 @@ -126,14 +109,13 @@ def transcribe_one( if verbose: kb = out_path.stat().st_size / 1024 print(f" saved: {out_path.name} ({kb:.1f} KB) in {dt:.1f}s") - if isinstance(payload, dict) and "words" in payload: - print(f" words: {len(payload['words'])}") + print(f" words: {len(payload['words'])}") return out_path def main() -> None: - ap = argparse.ArgumentParser(description="Transcribe a video with ElevenLabs Scribe") + ap = argparse.ArgumentParser(description="Transcribe a video with OpenAI Whisper") ap.add_argument("video", type=Path, help="Path to video file") ap.add_argument( "--edit-dir", @@ -148,10 +130,11 @@ def main() -> None: help="Optional ISO language code (e.g., 'en'). Omit to auto-detect.", ) ap.add_argument( - "--num-speakers", - type=int, - default=None, - help="Optional number of speakers when known. Improves diarization accuracy.", + "--model", + type=str, + default="base", + choices=["tiny", "base", "small", "medium", "large"], + help="Whisper model size (default: base). Larger = more accurate but slower.", ) args = ap.parse_args() @@ -160,14 +143,12 @@ def main() -> None: sys.exit(f"video not found: {video}") edit_dir = (args.edit_dir or (video.parent / "edit")).resolve() - api_key = load_api_key() transcribe_one( video=video, edit_dir=edit_dir, - api_key=api_key, language=args.language, - num_speakers=args.num_speakers, + model_name=args.model, ) diff --git a/helpers/transcribe_batch.py b/helpers/transcribe_batch.py index 5aeb1d6..a1d70e7 100644 --- a/helpers/transcribe_batch.py +++ b/helpers/transcribe_batch.py @@ -1,6 +1,6 @@ """Batch-transcribe every video in a directory with 4 parallel workers. -Walks for common video extensions, runs ElevenLabs Scribe on +Walks for common video extensions, runs OpenAI Whisper on each, writes transcripts to /edit/transcripts/.json. Cached per-file: any source that already has a transcript is skipped. @@ -20,7 +20,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from transcribe import load_api_key, transcribe_one +from transcribe import transcribe_one VIDEO_EXTS = {".mp4", ".MP4", ".mov", ".MOV", ".mkv", ".MKV", ".avi", ".AVI", ".m4v"} @@ -51,10 +51,11 @@ def main() -> None: help="Optional ISO language code. Omit to auto-detect per file.", ) ap.add_argument( - "--num-speakers", - type=int, - default=None, - help="Optional number of speakers. Improves diarization when known.", + "--model", + type=str, + default="base", + choices=["tiny", "base", "small", "medium", "large"], + help="Whisper model size (default: base).", ) args = ap.parse_args() @@ -77,8 +78,6 @@ def main() -> None: print("nothing to do") return - api_key = load_api_key() - print(f"transcribing {len(pending)} files with {args.workers} parallel workers") t0 = time.time() @@ -89,9 +88,8 @@ def main() -> None: transcribe_one, video=v, edit_dir=edit_dir, - api_key=api_key, language=args.language, - num_speakers=args.num_speakers, + model_name=args.model, verbose=False, ): v for v in pending