Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,4 @@ config.yaml
cache
.vscode

test-cache
test-cachedump.rdb
195 changes: 150 additions & 45 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import time
from hmac import compare_digest
from rq.worker import Worker
from rq.job import Job
from utils.test_utils import in_test
import logging

from utils.thumbnail import generate_thumbnail, get_latest_thumbnail_from_files, get_job_id, get_thumbnail_from_files, set_best_time
from utils.thumbnail import generate_thumbnail, generate_nebula_thumbnail, \
get_latest_thumbnail_from_files, get_job_id, get_thumbnail_from_files, set_best_time
from utils.video import valid_video_id
from utils.nebula import valid_nebula_slug

app = FastAPI()
app.add_middleware(
Expand Down Expand Up @@ -59,72 +62,175 @@ async def get_thumbnail(response: Response, request: Request,
# If we got here with a None time, then there is no thumbnail to pull from
return thumbnail_response_error(redirectUrl, "Thumbnail not cached")

try:
result = await _enqueue_and_wait(
job_id=get_job_id(videoID, time),
generate_now=generateNow,
request=request,
enqueue_fn=generate_thumbnail,
enqueue_args=(videoID, time, title, isLivestream, not in_test()),
label="YouTube",
)
except TimeoutError:
return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail due to timeout")

if result is None:
return thumbnail_response_error(redirectUrl, "Thumbnail not generated yet")
if result:
try:
return await handle_thumbnail_response(videoID, time, isLivestream, title, response)
except Exception as e:
log("Server error when getting thumbnails", e)
return thumbnail_response_error(redirectUrl, "Server error")
else:
return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail")


async def handle_thumbnail_response(video_id: str, time: float | None, is_livestream: bool, title: str | None, response: Response) -> Response:
thumbnail = await get_thumbnail_from_files(video_id, time, is_livestream, title) if time is not None else \
await get_latest_thumbnail_from_files(video_id, is_livestream)
response.headers["X-Timestamp"] = str(thumbnail.time)
response.headers["Cache-Control"] = "public, max-age=3600"
if thumbnail.title is not None:
try:
response.headers["X-Title"] = thumbnail.title.strip()
except UnicodeEncodeError:
pass

return Response(content=thumbnail.image, media_type="image/webp", headers=response.headers)

def thumbnail_response_error(redirect_url: str | None, text: str) -> Response:
if redirect_url is not None and redirect_url.startswith("https://i.ytimg.com"):
return RedirectResponse(redirect_url)
else:
raise HTTPException(status_code=204, headers={
"X-Failure-Reason": text
})


# ─── Shared queue management ─────────────────────────────────────────────────

job_id = get_job_id(videoID, time)
queue = queue_high if generateNow else queue_low
async def _enqueue_and_wait(
job_id: str,
generate_now: bool,
request: Request,
enqueue_fn: Any,
enqueue_args: tuple,
label: str,
) -> bool | None:
"""Reconcile queue priorities, enqueue if needed, and wait for result.

Returns ``True`` if the job succeeded, ``False`` if it failed, or
``None`` with a reason string if the caller should return an error
immediately (returned as a tuple ``(None, reason)``).
"""
queue = queue_high if generate_now else queue_low

job = queue.fetch_job(job_id)
other_queue_job = queue_low.fetch_job(job_id) if queue == queue_high else queue_high.fetch_job(job_id)
other_queue_job: Job | None = (
queue_low.fetch_job(job_id) if queue == queue_high
else queue_high.fetch_job(job_id)
)
if other_queue_job is not None:
if other_queue_job.is_started:
# It is already started, use it
job = other_queue_job
elif queue == queue_high:
# Old queue is low, prefer new one
queue_low.remove(other_queue_job)
elif job is not None:
# New queue is low, old queue is high, prefer old one
queue.remove(job)
job = other_queue_job
else:
# New queue is low, old queue is high, prefer old one
job = other_queue_job

if job is None or job.is_finished:
if len(queue) > config["thumbnail_storage"]["max_queue_size"]:
return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail due to queue being too big")

# Start the job if it is not already started
# TODO: Remove the ttl when proper priority is implemented
job = queue.enqueue(generate_thumbnail,
args=(videoID, time, title, isLivestream, not in_test()),
job_id=job_id,
job_timeout=30,
failure_ttl=500,
ttl=60,
at_front="front_auth" in config\
and config["front_auth"] is not None\
and request.headers.get("authorization") == config["front_auth"])
return None # caller handles "queue too big"

job = queue.enqueue(
enqueue_fn,
args=enqueue_args,
job_id=job_id,
job_timeout=30,
failure_ttl=500,
ttl=60,
at_front="front_auth" in config
and config["front_auth"] is not None
and request.headers.get("authorization") == config["front_auth"],
)

if job.is_failed:
return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail")
return False

result: bool = False
if ((job.get_position() or 0) < config["thumbnail_storage"]["max_before_async_generation"]
and (generateNow or len(queue_high) < config["thumbnail_storage"]["max_before_async_generation"])):
and (generate_now or len(queue_high) < config["thumbnail_storage"]["max_before_async_generation"])):
try:
result = (await wait_for_message(job_id)) == "true"
return (await wait_for_message(job_id)) == "true"
except TimeoutError:
log("Failed to generate thumbnail due to timeout")
return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail due to timeout")
log(f"{label} thumbnail generation timed out")
raise
else:
log("Thumbnail not generated yet", job.get_position())
return thumbnail_response_error(redirectUrl, "Thumbnail not generated yet")
log(f"{label} thumbnail not generated yet", job.get_position())
return None


# ─── Nebula thumbnails ────────────────────────────────────────────────────────

@app.get("/api/v1/getNebulaThumbnail")
async def get_nebula_thumbnail(response: Response, request: Request,
videoSlug: str, time: float | None = None,
generateNow: bool = False,
title: str | None = None,
officialTime: bool = False) -> Response:
if not config.get("nebula_worker_url"): # type: ignore[attr-defined]
raise HTTPException(status_code=501, detail="Nebula support is not configured")

if type(videoSlug) is not str or (type(time) is not float and time is not None) \
or type(generateNow) is not bool or not valid_nebula_slug(videoSlug):
raise HTTPException(status_code=400, detail="Invalid parameters")

if officialTime and time is not None:
await set_best_time(videoSlug, time, nebula=True)

try:
return await handle_nebula_thumbnail_response(videoSlug, time, title, response)
except FileNotFoundError:
pass

if time is None:
return nebula_thumbnail_response_error("Nebula thumbnail not cached")

try:
result = await _enqueue_and_wait(
job_id=get_job_id(videoSlug, time, nebula=True),
generate_now=generateNow,
request=request,
enqueue_fn=generate_nebula_thumbnail,
enqueue_args=(videoSlug, time, title, not in_test()),
label="Nebula",
)
except TimeoutError:
return nebula_thumbnail_response_error("Failed to generate Nebula thumbnail due to timeout")

if result is None:
return nebula_thumbnail_response_error("Nebula thumbnail not generated yet")
if result:
try:
return await handle_thumbnail_response(videoID, time, isLivestream, title, response)
return await handle_nebula_thumbnail_response(videoSlug, time, title, response)
except Exception as e:
log("Server error when getting thumbnails", e)
return thumbnail_response_error(redirectUrl, "Server error")
log("Server error when getting Nebula thumbnails", e)
return nebula_thumbnail_response_error("Server error")
else:
log("Failed to generate thumbnail")
return thumbnail_response_error(redirectUrl, "Failed to generate thumbnail")
return nebula_thumbnail_response_error("Failed to generate Nebula thumbnail")


async def handle_thumbnail_response(video_id: str, time: float | None, is_livestream: bool, title: str | None, response: Response) -> Response:
thumbnail = await get_thumbnail_from_files(video_id, time, is_livestream, title) if time is not None else \
await get_latest_thumbnail_from_files(video_id, is_livestream)
async def handle_nebula_thumbnail_response(
video_slug: str, time: float | None, title: str | None, response: Response
) -> Response:
thumbnail = (
await get_thumbnail_from_files(video_slug, time, title=title, nebula=True)
if time is not None
else await get_latest_thumbnail_from_files(video_slug, nebula=True)
)
response.headers["X-Timestamp"] = str(thumbnail.time)
response.headers["Cache-Control"] = "public, max-age=3600"
if thumbnail.title is not None:
Expand All @@ -133,15 +239,14 @@ async def handle_thumbnail_response(video_id: str, time: float | None, is_livest
except UnicodeEncodeError:
pass

return Response(content=thumbnail.image, media_type="image/webp", headers=response.headers)
return Response(content=thumbnail.image, media_type="image/webp",
headers=response.headers)

def thumbnail_response_error(redirect_url: str | None, text: str) -> Response:
if redirect_url is not None and redirect_url.startswith("https://i.ytimg.com"):
return RedirectResponse(redirect_url)
else:
raise HTTPException(status_code=204, headers={
"X-Failure-Reason": text
})

def nebula_thumbnail_response_error(text: str) -> Response:
raise HTTPException(status_code=204, headers={
"X-Failure-Reason": text
})

@app.get("/api/v1/status")
def get_status(includeDefault: bool = True, auth: str | None = None) -> dict[str, Any]:
Expand Down
4 changes: 4 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ server:
thumbnail_storage:
path: "cache"
max_size: 50000000
cleanup_multiplier: 0.9
redis_offset_allowed: 20
max_before_async_generation: 15
max_queue_size: 10000
Expand All @@ -21,4 +22,7 @@ status_auth_password: password
skip_local_ffmpeg: false
try_floatie: true
try_ytdlp: true
# Nebula thumbnail support works via Cloudflare Worker proxy
# Set nebula_worker_url to enable Nebula thumbnails
# nebula_worker_url: "https://nebula-thumbnail-worker.anoim.workers.dev"
debug: false
Loading