From e7a646dcc00d099ad967b617be0059e0985d7259 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 31 Mar 2026 23:28:26 +0200 Subject: [PATCH 1/3] Add Nebula thumbnail support via Cloudflare Worker proxy - utils/nebula.py: calls the CF Worker to get authenticated HLS stream URLs - utils/thumbnail.py: adds Nebula generation/retrieval, refactors shared helpers (file paths, redis keys, FFmpeg extraction) to support both YouTube and Nebula via nebula=True keyword argument - app.py: adds /api/v1/getNebulaThumbnail endpoint - utils/config.py: adds nebula_worker_url and nebula_worker_auth_secret - config.yaml.example: documents nebula config options - tests/test_nebula_thumbnail.py: yt-dlp based integration tests --- app.py | 195 +++++++++++++---- config.yaml.example | 5 + tests/test_nebula_thumbnail.py | 311 +++++++++++++++++++++++++++ utils/config.py | 6 + utils/nebula.py | 118 +++++++++++ utils/thumbnail.py | 374 +++++++++++++++++++++++++-------- 6 files changed, 872 insertions(+), 137 deletions(-) create mode 100644 tests/test_nebula_thumbnail.py create mode 100644 utils/nebula.py diff --git a/app.py b/app.py index 0675203..cb0f217 100644 --- a/app.py +++ b/app.py @@ -12,11 +12,14 @@ import time from hmac import compare_digest from rq.worker import Worker +from rq.job import Job from utils.test_utils import in_test import logging -from utils.thumbnail import generate_thumbnail, get_latest_thumbnail_from_files, get_job_id, get_thumbnail_from_files, set_best_time +from utils.thumbnail import generate_thumbnail, generate_nebula_thumbnail, \ + get_latest_thumbnail_from_files, get_job_id, get_thumbnail_from_files, set_best_time from utils.video import valid_video_id +from utils.nebula import valid_nebula_slug app = FastAPI() app.add_middleware( @@ -59,72 +62,175 @@ async def get_thumbnail(response: Response, request: Request, # If we got here with a None time, then there is no thumbnail to pull from return thumbnail_response_error(redirectUrl, "Thumbnail not cached") + try: + result = await _enqueue_and_wait( + job_id=get_job_id(videoID, time), + generate_now=generateNow, + request=request, + enqueue_fn=generate_thumbnail, + enqueue_args=(videoID, time, title, isLivestream, not in_test()), + label="YouTube", + ) + except TimeoutError: + return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail due to timeout") + + if result is None: + return thumbnail_response_error(redirectUrl, "Thumbnail not generated yet") + if result: + try: + return await handle_thumbnail_response(videoID, time, isLivestream, title, response) + except Exception as e: + log("Server error when getting thumbnails", e) + return thumbnail_response_error(redirectUrl, "Server error") + else: + return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail") + + +async def handle_thumbnail_response(video_id: str, time: float | None, is_livestream: bool, title: str | None, response: Response) -> Response: + thumbnail = await get_thumbnail_from_files(video_id, time, is_livestream, title) if time is not None else \ + await get_latest_thumbnail_from_files(video_id, is_livestream) + response.headers["X-Timestamp"] = str(thumbnail.time) + response.headers["Cache-Control"] = "public, max-age=3600" + if thumbnail.title is not None: + try: + response.headers["X-Title"] = thumbnail.title.strip() + except UnicodeEncodeError: + pass + + return Response(content=thumbnail.image, media_type="image/webp", headers=response.headers) + +def thumbnail_response_error(redirect_url: str | None, text: str) -> Response: + if redirect_url is not None and redirect_url.startswith("https://i.ytimg.com"): + return RedirectResponse(redirect_url) + else: + raise HTTPException(status_code=204, headers={ + "X-Failure-Reason": text + }) + + +# ─── Shared queue management ───────────────────────────────────────────────── - job_id = get_job_id(videoID, time) - queue = queue_high if generateNow else queue_low +async def _enqueue_and_wait( + job_id: str, + generate_now: bool, + request: Request, + enqueue_fn: Any, + enqueue_args: tuple, + label: str, +) -> bool | None: + """Reconcile queue priorities, enqueue if needed, and wait for result. + + Returns ``True`` if the job succeeded, ``False`` if it failed, or + ``None`` with a reason string if the caller should return an error + immediately (returned as a tuple ``(None, reason)``). + """ + queue = queue_high if generate_now else queue_low job = queue.fetch_job(job_id) - other_queue_job = queue_low.fetch_job(job_id) if queue == queue_high else queue_high.fetch_job(job_id) + other_queue_job: Job | None = ( + queue_low.fetch_job(job_id) if queue == queue_high + else queue_high.fetch_job(job_id) + ) if other_queue_job is not None: if other_queue_job.is_started: - # It is already started, use it job = other_queue_job elif queue == queue_high: - # Old queue is low, prefer new one queue_low.remove(other_queue_job) elif job is not None: - # New queue is low, old queue is high, prefer old one queue.remove(job) job = other_queue_job else: - # New queue is low, old queue is high, prefer old one job = other_queue_job if job is None or job.is_finished: if len(queue) > config["thumbnail_storage"]["max_queue_size"]: - return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail due to queue being too big") - - # Start the job if it is not already started - # TODO: Remove the ttl when proper priority is implemented - job = queue.enqueue(generate_thumbnail, - args=(videoID, time, title, isLivestream, not in_test()), - job_id=job_id, - job_timeout=30, - failure_ttl=500, - ttl=60, - at_front="front_auth" in config\ - and config["front_auth"] is not None\ - and request.headers.get("authorization") == config["front_auth"]) + return None # caller handles "queue too big" + + job = queue.enqueue( + enqueue_fn, + args=enqueue_args, + job_id=job_id, + job_timeout=30, + failure_ttl=500, + ttl=60, + at_front="front_auth" in config + and config["front_auth"] is not None + and request.headers.get("authorization") == config["front_auth"], + ) if job.is_failed: - return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail") + return False - result: bool = False if ((job.get_position() or 0) < config["thumbnail_storage"]["max_before_async_generation"] - and (generateNow or len(queue_high) < config["thumbnail_storage"]["max_before_async_generation"])): + and (generate_now or len(queue_high) < config["thumbnail_storage"]["max_before_async_generation"])): try: - result = (await wait_for_message(job_id)) == "true" + return (await wait_for_message(job_id)) == "true" except TimeoutError: - log("Failed to generate thumbnail due to timeout") - return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail due to timeout") + log(f"{label} thumbnail generation timed out") + raise else: - log("Thumbnail not generated yet", job.get_position()) - return thumbnail_response_error(redirectUrl, "Thumbnail not generated yet") + log(f"{label} thumbnail not generated yet", job.get_position()) + return None + + +# ─── Nebula thumbnails ──────────────────────────────────────────────────────── +@app.get("/api/v1/getNebulaThumbnail") +async def get_nebula_thumbnail(response: Response, request: Request, + videoSlug: str, time: float | None = None, + generateNow: bool = False, + title: str | None = None, + officialTime: bool = False) -> Response: + if not config.get("nebula_worker_url"): # type: ignore[attr-defined] + raise HTTPException(status_code=501, detail="Nebula support is not configured") + + if type(videoSlug) is not str or (type(time) is not float and time is not None) \ + or type(generateNow) is not bool or not valid_nebula_slug(videoSlug): + raise HTTPException(status_code=400, detail="Invalid parameters") + + if officialTime and time is not None: + await set_best_time(videoSlug, time, nebula=True) + + try: + return await handle_nebula_thumbnail_response(videoSlug, time, title, response) + except FileNotFoundError: + pass + + if time is None: + return nebula_thumbnail_response_error("Nebula thumbnail not cached") + + try: + result = await _enqueue_and_wait( + job_id=get_job_id(videoSlug, time, nebula=True), + generate_now=generateNow, + request=request, + enqueue_fn=generate_nebula_thumbnail, + enqueue_args=(videoSlug, time, title, not in_test()), + label="Nebula", + ) + except TimeoutError: + return nebula_thumbnail_response_error("Failed to generate Nebula thumbnail due to timeout") + + if result is None: + return nebula_thumbnail_response_error("Nebula thumbnail not generated yet") if result: try: - return await handle_thumbnail_response(videoID, time, isLivestream, title, response) + return await handle_nebula_thumbnail_response(videoSlug, time, title, response) except Exception as e: - log("Server error when getting thumbnails", e) - return thumbnail_response_error(redirectUrl, "Server error") + log("Server error when getting Nebula thumbnails", e) + return nebula_thumbnail_response_error("Server error") else: - log("Failed to generate thumbnail") - return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail") + return nebula_thumbnail_response_error("Failed to generate Nebula thumbnail") -async def handle_thumbnail_response(video_id: str, time: float | None, is_livestream: bool, title: str | None, response: Response) -> Response: - thumbnail = await get_thumbnail_from_files(video_id, time, is_livestream, title) if time is not None else \ - await get_latest_thumbnail_from_files(video_id, is_livestream) +async def handle_nebula_thumbnail_response( + video_slug: str, time: float | None, title: str | None, response: Response +) -> Response: + thumbnail = ( + await get_thumbnail_from_files(video_slug, time, title=title, nebula=True) + if time is not None + else await get_latest_thumbnail_from_files(video_slug, nebula=True) + ) response.headers["X-Timestamp"] = str(thumbnail.time) response.headers["Cache-Control"] = "public, max-age=3600" if thumbnail.title is not None: @@ -133,15 +239,14 @@ async def handle_thumbnail_response(video_id: str, time: float | None, is_livest except UnicodeEncodeError: pass - return Response(content=thumbnail.image, media_type="image/webp", headers=response.headers) + return Response(content=thumbnail.image, media_type="image/webp", + headers=response.headers) -def thumbnail_response_error(redirect_url: str | None, text: str) -> Response: - if redirect_url is not None and redirect_url.startswith("https://i.ytimg.com"): - return RedirectResponse(redirect_url) - else: - raise HTTPException(status_code=204, headers={ - "X-Failure-Reason": text - }) + +def nebula_thumbnail_response_error(text: str) -> Response: + raise HTTPException(status_code=204, headers={ + "X-Failure-Reason": text + }) @app.get("/api/v1/status") def get_status(includeDefault: bool = True, auth: str | None = None) -> dict[str, Any]: diff --git a/config.yaml.example b/config.yaml.example index 9cd5edd..b5be85a 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -6,6 +6,7 @@ server: thumbnail_storage: path: "cache" max_size: 50000000 + cleanup_multiplier: 0.9 redis_offset_allowed: 20 max_before_async_generation: 15 max_queue_size: 10000 @@ -21,4 +22,8 @@ status_auth_password: password skip_local_ffmpeg: false try_floatie: true try_ytdlp: true +# Nebula thumbnail support works via Cloudflare Worker proxy +# Set nebula_worker_url to enable Nebula thumbnails +# nebula_worker_url: "https://nebula-thumbnail-worker.example.workers.dev" +# nebula_worker_auth_secret: "shared-secret-between-server-and-worker" debug: false \ No newline at end of file diff --git a/tests/test_nebula_thumbnail.py b/tests/test_nebula_thumbnail.py new file mode 100644 index 0000000..292e72d --- /dev/null +++ b/tests/test_nebula_thumbnail.py @@ -0,0 +1,311 @@ +""" +Test for Nebula thumbnail generation using yt-dlp with authentication. + +Nebula requires authentication to access video streams. This test verifies +that yt-dlp can authenticate with Nebula and that a thumbnail frame can be +extracted using FFmpeg. + +Usage: + + # API token from browser DevTools (Application > Cookies > nebula_auth.apiToken) + export NEBULA_API_TOKEN="" + + # Or: Netscape-format cookies file + export NEBULA_COOKIES_FILE="/path/to/cookies.txt" + + # Optional: save the thumbnail to a fixed path for visual inspection + export NEBULA_SAVE_PATH="/tmp/nebula_test_thumb.webp" + + Then run: + pytest tests/test_nebula_thumbnail.py -v -s +""" + +import os +import http.cookiejar +import shutil +import tempfile + +import pytest +import yt_dlp # pyright: ignore[reportMissingTypeStubs] +from typing import Any, cast + +from utils.ffmpeg import run_ffmpeg +from constants.thumbnail import image_format + +NEBULA_VIDEO_URL = "https://nebula.tv/videos/downieexpress-s2e1" +NEBULA_TIMESTAMP = 60.0 # 1:00 + +NEBULA_API_TOKEN = os.environ.get("NEBULA_API_TOKEN") +NEBULA_COOKIES_FILE = os.environ.get("NEBULA_COOKIES_FILE") + +# If set, the end-to-end test will also copy the thumbnail to the save path for visual inspection +NEBULA_SAVE_PATH = os.environ.get("NEBULA_SAVE_PATH") + +has_credentials = bool(NEBULA_API_TOKEN or NEBULA_COOKIES_FILE) + +skip_reason = ( + "Nebula credentials not provided. " + "Set NEBULA_API_TOKEN (value of the nebula_auth.apiToken browser cookie) " + "or NEBULA_COOKIES_FILE to a Netscape-format cookies file." +) + + +def create_nebula_ytdlp(cookies_file: str | None = None) -> yt_dlp.YoutubeDL: + """Create a yt-dlp instance configured for Nebula authentication. + + Auth priority: API token cookie > cookies file. + """ + opts: dict[str, object] = { + "retries": 2, + "fragment_retries": 2, + "extractor_retries": 2, + "socket_timeout": 30, + "quiet": False, + "no_warnings": False, + } + + if cookies_file: + opts["cookiefile"] = cookies_file + + ydl = yt_dlp.YoutubeDL(opts) # pyright: ignore[reportArgumentType] + + if NEBULA_API_TOKEN: + # Inject the API token as a cookie so yt-dlp's Nebula extractor + # picks it up during _real_initialize(). + cookie = http.cookiejar.Cookie( + version=0, + name="nebula_auth.apiToken", + value=NEBULA_API_TOKEN, + port=None, + port_specified=False, + domain="nebula.tv", + domain_specified=True, + domain_initial_dot=False, + path="/", + path_specified=True, + secure=True, + expires=None, + discard=True, + comment=None, + comment_url=None, + rest={}, + ) + ydl.cookiejar.set_cookie(cookie) + + return ydl + + +@pytest.mark.skipif(not has_credentials, reason=skip_reason) +class TestNebulaThumbnail: + """Tests for Nebula video thumbnail extraction via yt-dlp + FFmpeg.""" + + output_dir: str + + @pytest.fixture(autouse=True) + def setup_and_teardown(self): + """Create and clean up a temporary output directory.""" + self.output_dir = tempfile.mkdtemp(prefix="nebula_thumb_test_") + yield + shutil.rmtree(self.output_dir, ignore_errors=True) + + def test_ytdlp_can_extract_nebula_info(self): + """Test that yt-dlp can authenticate and extract video info from Nebula.""" + ydl = create_nebula_ytdlp(NEBULA_COOKIES_FILE) + + info = ydl.extract_info(NEBULA_VIDEO_URL, download=False) + assert info is not None + + sanitized = cast(dict[str, Any], ydl.sanitize_info(info)) + print(f"\nVideo title: {sanitized.get('title')}") + print(f"Video duration: {sanitized.get('duration')}s") + print(f"Video ID: {sanitized.get('id')}") + print(f"Uploader: {sanitized.get('uploader')}") + + formats: list[dict[str, Any]] = sanitized.get("formats", []) + assert len(formats) > 0, "No formats found — authentication may have failed" + + # Filter to video-only formats (with height info) + video_formats = [f for f in formats if f.get("height") is not None and f.get("height", 0) > 0] + assert len(video_formats) > 0, "No video formats with height info found" + + print(f"\nAvailable video formats ({len(video_formats)}):") + for fmt in video_formats: + print(f" {fmt.get('format_id')}: {fmt.get('width')}x{fmt.get('height')} " + f"fps={fmt.get('fps')} vcodec={fmt.get('vcodec')} " + f"ext={fmt.get('ext')}") + + # Verify duration is long enough for test timestamp + duration = cast(int, sanitized.get("duration", 0)) + assert duration >= NEBULA_TIMESTAMP, ( + f"Video duration ({duration}s) is shorter than test timestamp ({NEBULA_TIMESTAMP}s)" + ) + + def test_ytdlp_can_get_playback_url(self): + """Test that a usable playback URL can be obtained for a Nebula video.""" + ydl = create_nebula_ytdlp(NEBULA_COOKIES_FILE) + + info = ydl.extract_info(NEBULA_VIDEO_URL, download=False) + assert info is not None + sanitized = cast(dict[str, Any], ydl.sanitize_info(info)) + + formats: list[dict[str, Any]] = sanitized.get("formats", []) + video_formats = [ + f for f in formats + if f.get("height") is not None + and f.get("height", 0) > 0 + and f.get("url") + ] + assert len(video_formats) > 0, "No video formats with URLs found" + + # Pick a format with height <= 720 + video_formats.sort(key=lambda f: f.get("height", 0), reverse=True) + selected = None + for fmt in video_formats: + if fmt.get("height", 9999) <= 720: + selected = fmt + break + + if selected is None: + # Fallback: just pick the smallest available + selected = video_formats[-1] + + print(f"\nSelected format: {selected.get('format_id')} " + f"{selected.get('width')}x{selected.get('height')} " + f"fps={selected.get('fps')} vcodec={selected.get('vcodec')}") + print(f"URL length: {len(selected.get('url', ''))}") + + assert selected.get("url"), "Selected format has no URL" + assert selected.get("height", 0) > 0, "Selected format has no height" + + def test_generate_nebula_thumbnail(self): + """End-to-end: authenticate, get stream URL, extract thumbnail at 1:00 with FFmpeg.""" + ydl = create_nebula_ytdlp(NEBULA_COOKIES_FILE) + + # Step 1: Extract info + print(f"\n[1/3] Extracting video info from {NEBULA_VIDEO_URL}...") + info = ydl.extract_info(NEBULA_VIDEO_URL, download=False) + assert info is not None + sanitized = cast(dict[str, Any], ydl.sanitize_info(info)) + + title = sanitized.get("title", "Unknown") + print(f" Title: {title}") + + # Step 2: Pick a suitable format + formats: list[dict[str, Any]] = sanitized.get("formats", []) + video_formats = [ + f for f in formats + if f.get("height") is not None + and f.get("height", 0) > 0 + and f.get("url") + ] + assert len(video_formats) > 0, "No usable video formats found" + + video_formats.sort(key=lambda f: f.get("height", 0), reverse=True) + selected = None + for fmt in video_formats: + if fmt.get("height", 9999) <= 720: + selected = fmt + break + if selected is None: + selected = video_formats[-1] + + playback_url = selected["url"] + height = selected.get("height", 0) + width = selected.get("width", 0) + fps = selected.get("fps", 30) + + print(f"[2/3] Selected format: {width}x{height} @ {fps}fps") + + # Step 3: Extract thumbnail with FFmpeg + output_filename = os.path.join(self.output_dir, f"{NEBULA_TIMESTAMP}{image_format}") + + # Round time to nearest frame (same logic as the main code) + rounded_time = int(NEBULA_TIMESTAMP * fps) / fps + + print(f"[3/3] Extracting thumbnail at {NEBULA_TIMESTAMP}s (rounded: {rounded_time}s)...") + + run_ffmpeg( + "-y", + "-ss", str(rounded_time), + "-i", playback_url, + "-vframes", "1", + "-lossless", "0", + "-pix_fmt", "bgra", + output_filename, + "-timelimit", "30", + timeout=30, + ) + + # Verify the thumbnail was generated + assert os.path.isfile(output_filename), f"Thumbnail file was not created at {output_filename}" + + file_size = os.path.getsize(output_filename) + print(f"\n ✓ Thumbnail generated successfully!") + print(f" File: {output_filename}") + print(f" Size: {file_size} bytes") + + assert file_size > 0, "Thumbnail file is empty" + assert file_size > 100, f"Thumbnail file is suspiciously small ({file_size} bytes)" + + # Read and verify it looks like a valid image + with open(output_filename, "rb") as f: + header = f.read(16) + # WebP files start with "RIFF" and contain "WEBP" + if image_format == ".webp": + assert header[:4] == b"RIFF", f"Output doesn't look like a WebP file (header: {header[:4]})" + assert header[8:12] == b"WEBP", f"Output doesn't look like a WebP file" + print(f" Format verified: {image_format}") + + # If NEBULA_SAVE_PATH is set, copy to a persistent location for visual inspection + if NEBULA_SAVE_PATH: + shutil.copy2(output_filename, NEBULA_SAVE_PATH) + print(f" Saved for visual inspection: {NEBULA_SAVE_PATH}") + + def test_generate_nebula_thumbnail_low_res(self): + """ + Test thumbnail generation with the lowest available resolution + to minimize bandwidth during testing. + """ + ydl = create_nebula_ytdlp(NEBULA_COOKIES_FILE) + + info = ydl.extract_info(NEBULA_VIDEO_URL, download=False) + assert info is not None + sanitized = cast(dict[str, Any], ydl.sanitize_info(info)) + + formats: list[dict[str, Any]] = sanitized.get("formats", []) + video_formats = [ + f for f in formats + if f.get("height") is not None + and f.get("height", 0) > 0 + and f.get("url") + ] + assert len(video_formats) > 0 + + # Pick the lowest resolution + video_formats.sort(key=lambda f: f.get("height", 0)) + selected = video_formats[0] + + playback_url = selected["url"] + fps = selected.get("fps", 30) + rounded_time = int(NEBULA_TIMESTAMP * fps) / fps + + output_filename = os.path.join(self.output_dir, f"lowres_{NEBULA_TIMESTAMP}{image_format}") + + print(f"\nGenerating low-res thumbnail at {selected.get('width')}x{selected.get('height')}...") + + run_ffmpeg( + "-y", + "-ss", str(rounded_time), + "-i", playback_url, + "-vframes", "1", + "-lossless", "0", + "-pix_fmt", "bgra", + output_filename, + "-timelimit", "30", + timeout=30, + ) + + assert os.path.isfile(output_filename) + file_size = os.path.getsize(output_filename) + print(f" ✓ Low-res thumbnail: {file_size} bytes") + assert file_size > 100 diff --git a/utils/config.py b/utils/config.py index cc9f70b..3aab0b4 100644 --- a/utils/config.py +++ b/utils/config.py @@ -47,6 +47,8 @@ class Config(TypedDict): proxy_token: str | None front_auth: str | None floatie_auth: str | None + nebula_worker_url: str | None + nebula_worker_auth_secret: str | None debug: bool @@ -56,3 +58,7 @@ class Config(TypedDict): config["proxy_url"] = None if "proxy_token" not in config: config["proxy_token"] = None +if "nebula_worker_url" not in config: + config["nebula_worker_url"] = None +if "nebula_worker_auth_secret" not in config: + config["nebula_worker_auth_secret"] = None diff --git a/utils/nebula.py b/utils/nebula.py new file mode 100644 index 0000000..9bc947a --- /dev/null +++ b/utils/nebula.py @@ -0,0 +1,118 @@ +""" +Nebula video support via the Nebula Thumbnail Worker. Hosted on cloudflare workers, maintained by Furdiburd. + +The worker holds Nebula credentials and exposes an API that returns +time-limited, JWT-authenticated HLS stream URLs. This module calls that +API and returns a PlaybackUrl that the rest of the thumbnail pipeline +(FFmpeg frame extraction) can consume unchanged. +""" + +import re +from dataclasses import dataclass +from typing import Any + +import requests + +from utils.config import config +from utils.logger import log +from utils.video import PlaybackUrl + + +# Nebula video slugs: lowercase alphanumerics, hyphens, underscores. +# e.g. "downieexpress-s2e1", "practical-engineering-rebuilding-the-oroville-dam-spillways" +_SLUG_RE = re.compile(r"^[\w-]+$") + +# Default FPS assumption for Nebula HLS streams. +# The worker returns the actual FPS from the manifest when available. +_DEFAULT_FPS = 30 + + +class NebulaError(Exception): + """Raised when the Nebula worker returns an error.""" + pass + + +def valid_nebula_slug(slug: str) -> bool: + """Validate a Nebula video slug.""" + return type(slug) is str and len(slug) > 0 and len(slug) <= 200 \ + and _SLUG_RE.match(slug) is not None + + +@dataclass +class NebulaVideoInfo: + """Metadata returned by the Nebula worker alongside the stream URL.""" + title: str + duration: float + slug: str + + +def get_nebula_playback_url( + video_slug: str, + height: int = config["default_max_height"], +) -> tuple[PlaybackUrl, NebulaVideoInfo]: + """ + Call the Nebula Thumbnail Worker to get an authenticated HLS stream URL. + + Returns a (PlaybackUrl, NebulaVideoInfo) tuple. The PlaybackUrl.url + points to an HLS variant playlist on starlight.nebula.tv with an + embedded JWT — FFmpeg can consume it directly. + + Raises NebulaError on any failure. + """ + worker_url = config.get("nebula_worker_url") # type: ignore[attr-defined] + if not worker_url: + raise NebulaError("nebula_worker_url is not configured") + + if not valid_nebula_slug(video_slug): + raise ValueError(f"Invalid Nebula video slug: {video_slug}") + + url = f"{worker_url.rstrip('/')}/api/v1/nebula/streamUrl" + params = {"videoSlug": video_slug, "height": str(height)} + headers: dict[str, str] = {} + + worker_auth = config.get("nebula_worker_auth_secret") # type: ignore[attr-defined] + if worker_auth: + headers["Authorization"] = f"Bearer {worker_auth}" + + log(f"Fetching Nebula stream URL for {video_slug} at {height}p") + + try: + resp = requests.get(url, params=params, headers=headers, timeout=15) + except requests.RequestException as e: + raise NebulaError(f"Failed to reach Nebula worker: {e}") from e + + if resp.status_code != 200: + try: + detail = resp.json().get("error", resp.text) + except Exception: + detail = resp.text + raise NebulaError( + f"Nebula worker returned {resp.status_code}: {detail}" + ) + + try: + data: dict[str, Any] = resp.json() + except Exception as e: + raise NebulaError(f"Invalid JSON from Nebula worker: {e}") from e + + selected = data.get("selectedFormat") + if not selected or "url" not in selected: + raise NebulaError("Nebula worker response missing selectedFormat.url") + + playback_url = PlaybackUrl( + url=selected["url"], + width=int(selected.get("width", 1280)), + height=int(selected.get("height", 720)), + fps=int(selected.get("fps", _DEFAULT_FPS)), + ) + + video_info = NebulaVideoInfo( + title=data.get("title", video_slug), + duration=float(data.get("duration", 0)), + slug=video_slug, + ) + + log(f"Got Nebula stream: {playback_url.width}x{playback_url.height} " + f"{selected.get('vcodec', '?')} for {video_slug}") + + return playback_url, video_info diff --git a/utils/thumbnail.py b/utils/thumbnail.py index 9b13526..5d6bec2 100644 --- a/utils/thumbnail.py +++ b/utils/thumbnail.py @@ -5,7 +5,7 @@ import random import re import sys -from typing import cast +from typing import Awaitable, Callable, cast import requests from .ffmpeg import run_ffmpeg, FFmpegError @@ -15,6 +15,7 @@ from utils.cleanup import add_storage_used, check_if_cleanup_needed, update_last_used from utils.proxy import get_proxy_url from utils.video import PlaybackUrl, get_playback_url, valid_video_id +from utils.nebula import get_nebula_playback_url, valid_nebula_slug, NebulaError from utils.config import config import time as time_module from utils.redis_handler import get_async_redis_conn, redis_conn @@ -50,32 +51,12 @@ def generate_thumbnail(video_id: str, time: float, title: str | None, is_livestr generate_and_store_thumbnail(video_id, time, is_livestream) - _, output_filename, metadata_filename, _ = get_file_paths(video_id, time, is_livestream) - if title is not None: - with open(metadata_filename, "w") as metadata_file: - metadata_file.write(title) - - title_file_size = len(title.encode("utf-8")) if title else 0 - image_file_size = os.path.getsize(output_filename) - storage_used = title_file_size + image_file_size - - if image_file_size < minimum_file_size: - os.remove(output_filename) - if update_redis: - try: - asyncio.get_event_loop().run_until_complete(add_storage_used(title_file_size)) - except Exception as e: - log_error("Failed to update storage used", e) - - raise ThumbnailGenerationError(f"Image file for {video_id} at {time} is too small, probably a premiere: {image_file_size} bytes") - - if update_redis: - try: - asyncio.get_event_loop().run_until_complete(add_storage_used(storage_used)) - except Exception as e: - log_error("Failed to update storage used", e) - publish_job_status(video_id, time, "true") - check_if_cleanup_needed() + _finish_thumbnail_job( + video_id, time, title, update_redis, + get_file_paths_fn=lambda vid, t: get_file_paths(vid, t, is_livestream), + publish_fn=lambda vid, t, s: publish_job_status(vid, t, s), + label="YouTube", + ) log(f"Generated thumbnail for {video_id} at {time} in {time_module.time() - now} seconds") @@ -211,51 +192,252 @@ def trace_function(*_): redis_conn.zrem("concurrent_renders", f"{video_id} {time} {is_livestream}") -async def get_latest_thumbnail_from_files(video_id: str, is_livestream: bool) -> Thumbnail: - if not valid_video_id(video_id): - raise ValueError(f"Invalid video ID: {video_id}") +async def get_latest_thumbnail_from_files(video_id: str, is_livestream: bool = False, + *, nebula: bool = False) -> Thumbnail: + _validate_key(video_id, nebula=nebula) + return await _get_latest_thumbnail( + get_folder_path(video_id, nebula=nebula), + get_best_time(video_id, nebula=nebula), + lambda t: get_thumbnail_from_files(video_id, t, is_livestream, nebula=nebula), + video_id, + ) + + +async def get_thumbnail_from_files(video_id: str, time: float, is_livestream: bool = False, + title: str | None = None, + *, nebula: bool = False) -> Thumbnail: + _validate_key(video_id, nebula=nebula) + if type(time) is not float: + raise ValueError(f"Invalid time: {time}") + return await _read_thumbnail_files( + get_folder_path(video_id, nebula=nebula), time, title, + lambda t: get_file_paths(video_id, t, is_livestream, nebula=nebula), + get_redis_key(video_id, nebula=nebula), + ) + +def get_file_paths(video_id: str, time: float, is_livestream: bool = False, + *, nebula: bool = False) -> tuple[str, str, str, str]: + _validate_key(video_id, nebula=nebula) + if type(time) is not float: + raise ValueError(f"Invalid time: {time}") - output_folder = get_folder_path(video_id) + output_folder = get_folder_path(video_id, nebula=nebula) + output_filename = f"{output_folder}/{time}{'-live' if is_livestream else ''}{image_format}" + metadata_filename = f"{output_folder}/{time}{metadata_format}" + video_filename = f"{output_folder}/{time}.mp4" + + return (output_folder, output_filename, metadata_filename, video_filename) + + +def get_folder_path(video_id: str, *, nebula: bool = False) -> str: + _validate_key(video_id, nebula=nebula) + base = config['thumbnail_storage']['path'] + return f"{base}/nebula/{video_id}" if nebula else f"{base}/{video_id}" + + +def get_job_id(video_id: str, time: float, *, nebula: bool = False) -> str: + return f"nebula_{video_id}-{time}" if nebula else f"{video_id}-{time}" + + +def get_best_time_key(video_id: str, *, nebula: bool = False) -> str: + return f"best-nebula:{video_id}" if nebula else f"best-{video_id}" - files = os.listdir(output_folder) - files.sort(key=lambda x: os.path.getmtime(os.path.join(output_folder, x)), reverse=True) - best_time = await get_best_time(video_id) +def get_redis_key(video_id: str, *, nebula: bool = False) -> str: + return f"nebula:{video_id}" if nebula else video_id - selected_file: str | None = f"{best_time.decode()}{image_format}" if best_time is not None else None - # Fallback to latest image +@retry(tries=5, delay=0.1, backoff=3) +def publish_job_status(video_id: str, time: float, status: str, + *, nebula: bool = False) -> None: + redis_conn.publish(get_job_id(video_id, time, nebula=nebula), status) + + +async def set_best_time(video_id: str, time: float, *, nebula: bool = False) -> None: + await (await get_async_redis_conn()).set( + get_best_time_key(video_id, nebula=nebula), time) + + +async def get_best_time(video_id: str, *, nebula: bool = False) -> bytes | None: + return cast(bytes | None, await (await get_async_redis_conn()).get( + get_best_time_key(video_id, nebula=nebula))) + + +def _validate_key(video_id: str, *, nebula: bool = False) -> None: + if nebula: + if not valid_nebula_slug(video_id): + raise ValueError(f"Invalid Nebula slug: {video_id}") + else: + if not valid_video_id(video_id): + raise ValueError(f"Invalid video ID: {video_id}") + +def send_fail_status(proxy_status_url: str) -> None: + url = proxy_status_url + "api/fail" + print(f"Sending fail status to {url}") + try: + requests.post(url) + except Exception: + pass + +def send_success_status(proxy_status_url: str) -> None: + url = proxy_status_url + "api/success" + print(f"Sending success status to {url}") + try: + requests.post(url) + except Exception: + pass + + +# ─── Shared helpers (YouTube + Nebula) ─────────────────────────────────────── + +def _finish_thumbnail_job( + video_key: str, + time: float, + title: str | None, + update_redis: bool, + get_file_paths_fn: Callable[[str, float], tuple[str, str, str, str]], + publish_fn: Callable[[str, float, str], None], + label: str = "", +) -> None: + """Store metadata, track storage, publish success, trigger cleanup. + + Shared by both YouTube and Nebula thumbnail generation jobs. + *video_key* is the raw ID/slug (not prefixed); *get_file_paths_fn* must + accept ``(video_key, time)`` and return the standard 4-tuple. + """ + _, output_filename, metadata_filename, _ = get_file_paths_fn(video_key, time) + + if title is not None: + with open(metadata_filename, "w") as metadata_file: + metadata_file.write(title) + + title_file_size = len(title.encode("utf-8")) if title else 0 + image_file_size = os.path.getsize(output_filename) + storage_used = title_file_size + image_file_size + + if image_file_size < minimum_file_size: + os.remove(output_filename) + if update_redis: + try: + asyncio.get_event_loop().run_until_complete(add_storage_used(title_file_size)) + except Exception as e: + log_error(f"Failed to update storage used ({label})", e) + raise ThumbnailGenerationError( + f"{label} image for {video_key} at {time} is too small: {image_file_size} bytes") + + if update_redis: + try: + asyncio.get_event_loop().run_until_complete(add_storage_used(storage_used)) + except Exception as e: + log_error(f"Failed to update storage used ({label})", e) + + publish_fn(video_key, time, "true") + check_if_cleanup_needed() + + +def _run_ffmpeg_frame_extract( + render_key: str, + time: float, + playback_url: PlaybackUrl, + file_paths: tuple[str, str, str, str], + extra_input_args: list[str] | None = None, +) -> None: + """Wait for concurrency slot, extract one frame with FFmpeg. + + *render_key* is used for the Redis concurrent_renders sorted-set. + *extra_input_args* allows passing e.g. ``["-http_proxy", url]``. + """ + wait_time = 0 + while redis_conn.zcard("concurrent_renders") > config["max_concurrent_renders"]: + print("Waiting for other renders to finish") + wait_time += 1 + if wait_time % 10 == 0: + redis_conn.zremrangebyscore("concurrent_renders", "-inf", time_module.time() - 60) + time_module.sleep(0.1 + 0.05 * random.random()) + redis_conn.zadd("concurrent_renders", {render_key: time_module.time()}) + + output_folder, output_filename, _, _ = file_paths + pathlib.Path(output_folder).mkdir(parents=True, exist_ok=True) + + # Round down time to nearest frame to be consistent with browsers + rounded_time = int(time * playback_url.fps) / playback_url.fps + if playback_url.fps == 60: + rounded_time = max(0, rounded_time - 1 / 100) + + try: + run_ffmpeg( + "-y", + *(extra_input_args or []), + "-ss", str(rounded_time), "-i", playback_url.url, + "-vframes", "1", "-lossless", "0", "-pix_fmt", "bgra", output_filename, + "-timelimit", "20", + timeout=20, + ) + except FFmpegError: + try: + os.remove(output_filename) + except FileNotFoundError: + pass + raise + finally: + redis_conn.zrem("concurrent_renders", render_key) + + +async def _get_latest_thumbnail( + output_folder: str, + best_time_coro: Awaitable[bytes | None], + get_thumbnail_fn: Callable[[float], Awaitable[Thumbnail]], + video_key: str, +) -> Thumbnail: + """Find the most recent thumbnail on disk, preferring *best_time*. + + Shared by YouTube and Nebula retrieval paths. + """ + try: + files = os.listdir(output_folder) + except FileNotFoundError: + # Ensure the coroutine is consumed even when the folder doesn't exist + await best_time_coro + raise FileNotFoundError(f"Failed to find thumbnail for {video_key}") + + files.sort(key=lambda x: os.path.getmtime(os.path.join(output_folder, x)), reverse=True) + + best_time = await best_time_coro + selected_file: str | None = ( + f"{best_time.decode()}{image_format}" if best_time is not None else None + ) + if selected_file is None or selected_file not in files: selected_file = None - for file in files: - # First try latest metadata file - # Most recent with a title is probably best if file.endswith(metadata_format): selected_file = file break - if selected_file is None: - # Fallback to latest image for file in files: if file.endswith(image_format): selected_file = file break if selected_file is not None: - # Remove file extension time = float(re.sub(r"(?:-live)?\.\S{3,4}$", "", selected_file)) - return await get_thumbnail_from_files(video_id, time, is_livestream) + return await get_thumbnail_fn(time) - raise FileNotFoundError(f"Failed to find thumbnail for {video_id}") + raise FileNotFoundError(f"Failed to find thumbnail for {video_key}") -async def get_thumbnail_from_files(video_id: str, time: float, is_livestream: bool, title: str | None = None) -> Thumbnail: - if not valid_video_id(video_id): - raise ValueError(f"Invalid video ID: {video_id}") - if type(time) is not float: - raise ValueError(f"Invalid time: {time}") - with os.scandir(get_folder_path(video_id)) as it: +async def _read_thumbnail_files( + folder_path: str, + time: float, + title: str | None, + get_file_paths_fn: Callable[[float], tuple[str, str, str, str]], + redis_key: str, +) -> Thumbnail: + """Read a thumbnail image + metadata from disk. + + Shared by YouTube and Nebula retrieval paths. + """ + with os.scandir(folder_path) as it: truncated_time = math.floor((time * 1000)) / 1000 truncated_time_string = str(truncated_time) if "." in truncated_time_string: @@ -268,19 +450,20 @@ async def get_thumbnail_from_files(video_id: str, time: float, is_livestream: bo continue break - _, output_filename, metadata_filename, _ = get_file_paths(video_id, time, is_livestream) + _, output_filename, metadata_filename, _ = get_file_paths_fn(time) with open(output_filename, "rb") as file: image_data = file.read() if image_data == b"": - raise FileNotFoundError(f"Image file for {video_id} at {time} zero bytes") + raise FileNotFoundError( + f"Image for {redis_key} at {time} zero bytes") if title is not None: with open(metadata_filename, "w") as metadata_file: metadata_file.write(title) try: - await update_last_used(video_id) + await update_last_used(redis_key) except Exception as e: log_error(f"Failed to update last used {e}") @@ -290,54 +473,61 @@ async def get_thumbnail_from_files(video_id: str, time: float, is_livestream: bo else: return Thumbnail(image_data, time) -def get_file_paths(video_id: str, time: float, is_livestream: bool) -> tuple[str, str, str, str]: - if not valid_video_id(video_id): - raise ValueError(f"Invalid video ID: {video_id}") - if type(time) is not float: - raise ValueError(f"Invalid time: {time}") +# ─── Nebula thumbnail support ──────────────────────────────────────────────── - output_folder = get_folder_path(video_id) - output_filename = f"{output_folder}/{time}{'-live' if is_livestream else ''}{image_format}" - metadata_filename = f"{output_folder}/{time}{metadata_format}" - video_filename = f"{output_folder}/{time}.mp4" - - return (output_folder, output_filename, metadata_filename, video_filename) - -def get_folder_path(video_id: str) -> str: - if not valid_video_id(video_id): - raise ValueError(f"Invalid video ID: {video_id}") +def generate_nebula_thumbnail(video_slug: str, time: float, title: str | None, + update_redis: bool = True) -> None: + """Generate a thumbnail for a Nebula video via the CF Worker.""" + try: + now = time_module.time() + _validate_key(video_slug, nebula=True) + if type(time) is not float: + raise ValueError(f"Invalid time: {time}") - return f"{config['thumbnail_storage']['path']}/{video_id}" + if update_redis: + try: + asyncio.get_event_loop().run_until_complete( + update_last_used(get_redis_key(video_slug, nebula=True))) + except Exception as e: + log_error("Failed to update last used (nebula)", e) -def get_job_id(video_id: str, time: float) -> str: - return f"{video_id}-{time}" + _generate_and_store_nebula_thumbnail(video_slug, time) -def get_best_time_key(video_id: str) -> str: - return f"best-{video_id}" + _finish_thumbnail_job( + video_slug, time, title, update_redis, + get_file_paths_fn=lambda vid, t: get_file_paths(vid, t, nebula=True), + publish_fn=lambda vid, t, s: publish_job_status(vid, t, s, nebula=True), + label="Nebula", + ) + log(f"Generated Nebula thumbnail for {video_slug} at {time} " + f"in {time_module.time() - now} seconds") -@retry(tries=5, delay=0.1, backoff=3) -def publish_job_status(video_id: str, time: float, status: str) -> None: - redis_conn.publish(get_job_id(video_id, time), status) + except Exception as e: + log(f"Failed to generate Nebula thumbnail for {video_slug} at {time}: {e}") + publish_job_status(video_slug, time, "false", nebula=True) + raise e -async def set_best_time(video_id: str, time: float) -> None: - await (await get_async_redis_conn()).set(get_best_time_key(video_id), time) -async def get_best_time(video_id: str) -> bytes | None: - return cast(bytes | None, await (await get_async_redis_conn()).get(get_best_time_key(video_id))) +@retry(ThumbnailGenerationError, tries=2, delay=1) +def _generate_and_store_nebula_thumbnail(video_slug: str, time: float) -> None: + """Fetch Nebula stream URL from the worker and extract a frame with FFmpeg.""" + print("nebula playback url start", time_module.time()) -def send_fail_status(proxy_status_url: str) -> None: - url = proxy_status_url + "api/fail" - print(f"Sending fail status to {url}") try: - requests.post(url) - except Exception: - pass + playback_url, _ = get_nebula_playback_url(video_slug) + except NebulaError as e: + raise ThumbnailGenerationError( + f"Failed to get Nebula playback URL for {video_slug}: {e}") from e + + print("nebula playback url done", time_module.time()) -def send_success_status(proxy_status_url: str) -> None: - url = proxy_status_url + "api/success" - print(f"Sending success status to {url}") try: - requests.post(url) - except Exception: - pass + _run_ffmpeg_frame_extract( + f"nebula:{video_slug}", time, playback_url, + get_file_paths(video_slug, time, nebula=True), + ) + print("nebula generated", time_module.time()) + except FFmpegError as e: + raise ThumbnailGenerationError( + f"Failed to generate Nebula thumbnail for {video_slug} at {time}: {e}") from e From 27406b99c752cbbd529979180eb76d9da5b87c7f Mon Sep 17 00:00:00 2001 From: furdiburd Date: Wed, 1 Apr 2026 23:08:09 +0200 Subject: [PATCH 2/3] Add dump.rdb to .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 45c3fa5..9f1360a 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,4 @@ config.yaml cache .vscode -test-cache \ No newline at end of file +test-cachedump.rdb From 3c4f0006f01dcf238851668c35212f0fbe4c79f9 Mon Sep 17 00:00:00 2001 From: furdiburd Date: Wed, 1 Apr 2026 23:33:51 +0200 Subject: [PATCH 3/3] Remove unused nebula_worker_auth_secret; use public worker URL as default example --- config.yaml.example | 3 +-- utils/config.py | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/config.yaml.example b/config.yaml.example index b5be85a..8800dc9 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -24,6 +24,5 @@ try_floatie: true try_ytdlp: true # Nebula thumbnail support works via Cloudflare Worker proxy # Set nebula_worker_url to enable Nebula thumbnails -# nebula_worker_url: "https://nebula-thumbnail-worker.example.workers.dev" -# nebula_worker_auth_secret: "shared-secret-between-server-and-worker" +# nebula_worker_url: "https://nebula-thumbnail-worker.anoim.workers.dev" debug: false \ No newline at end of file diff --git a/utils/config.py b/utils/config.py index 3aab0b4..0b17d57 100644 --- a/utils/config.py +++ b/utils/config.py @@ -48,7 +48,6 @@ class Config(TypedDict): front_auth: str | None floatie_auth: str | None nebula_worker_url: str | None - nebula_worker_auth_secret: str | None debug: bool @@ -60,5 +59,3 @@ class Config(TypedDict): config["proxy_token"] = None if "nebula_worker_url" not in config: config["nebula_worker_url"] = None -if "nebula_worker_auth_secret" not in config: - config["nebula_worker_auth_secret"] = None