473 lines
15 KiB
Python
473 lines
15 KiB
Python
"""
|
|
FastAPI application that exposes endpoints for inspecting and downloading
|
|
videos via yt-dlp.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import mimetypes
|
|
import shutil
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request
|
|
from fastapi.concurrency import run_in_threadpool
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pydantic import BaseModel, HttpUrl
|
|
from yt_dlp import YoutubeDL
|
|
from yt_dlp.utils import DownloadError
|
|
|
|
|
|
app = FastAPI(title="Video Download API", version="0.1.0")
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent.parent
|
|
DOWNLOADS_DIR = BASE_DIR / "tmp_downloads"
|
|
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)
|
|
WORKING_DIR = BASE_DIR / "tmp_work"
|
|
WORKING_DIR.mkdir(parents=True, exist_ok=True)
|
|
DOWNLOADS_ROOT = DOWNLOADS_DIR.resolve()
|
|
|
|
DEFAULT_FORMAT = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]"
|
|
ADDITIONAL_FALLBACKS: list[str] = [
|
|
"bestvideo+bestaudio/best",
|
|
"best[ext=mp4]/best",
|
|
"best",
|
|
]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FormatChoice:
|
|
selector: str
|
|
container: str | None = None
|
|
|
|
# Allow the front-end (likely running on localhost:3000) to call the API.
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
app.mount("/downloads", StaticFiles(directory=DOWNLOADS_DIR), name="downloads")
|
|
|
|
|
|
class FormatInfo(BaseModel):
|
|
format_id: str
|
|
ext: str | None = None
|
|
resolution: str | None = None
|
|
fps: float | None = None
|
|
vcodec: str | None = None
|
|
acodec: str | None = None
|
|
filesize: int | None = None
|
|
filesize_approx: int | None = None
|
|
|
|
|
|
class VideoInfo(BaseModel):
|
|
id: str
|
|
title: str
|
|
duration: int | None = None
|
|
uploader: str | None = None
|
|
thumbnail: HttpUrl | None = None
|
|
webpage_url: HttpUrl | None = None
|
|
formats: list[FormatInfo]
|
|
|
|
|
|
class DownloadRequest(BaseModel):
|
|
url: HttpUrl
|
|
format_id: str | None = None
|
|
filename: str | None = None
|
|
|
|
|
|
class DownloadResponse(BaseModel):
|
|
file_name: str
|
|
download_url: HttpUrl
|
|
|
|
|
|
@app.get("/health")
|
|
async def healthcheck() -> dict[str, str]:
|
|
"""Lightweight readiness probe for container orchestration."""
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/api/info", response_model=VideoInfo)
|
|
async def get_video_info(
|
|
url: HttpUrl = Query(..., description="Public video URL to inspect")
|
|
) -> VideoInfo:
|
|
"""Return metadata and available formats for a given video URL."""
|
|
|
|
def _extract() -> dict[str, Any]:
|
|
return _fetch_video_info(str(url))
|
|
|
|
try:
|
|
info = await run_in_threadpool(_extract)
|
|
except DownloadError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
|
except Exception as exc: # pragma: no cover - defensive catch for unexpected errors
|
|
raise HTTPException(status_code=500, detail="Failed to fetch video info") from exc
|
|
|
|
return _serialize_video_info(info)
|
|
|
|
|
|
@app.post("/api/download", response_model=DownloadResponse)
|
|
async def download_video(
|
|
payload: DownloadRequest, request: Request, background_tasks: BackgroundTasks
|
|
) -> DownloadResponse:
|
|
"""Download the requested video and return an accessible URL to the stored file."""
|
|
|
|
def _download() -> tuple[Path, Path]:
|
|
return _download_video(str(payload.url), payload.format_id, payload.filename)
|
|
|
|
try:
|
|
file_path, temp_dir = await run_in_threadpool(_download)
|
|
except DownloadError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
|
except FileNotFoundError as exc:
|
|
raise HTTPException(status_code=500, detail="Video file missing after download") from exc
|
|
except Exception as exc: # pragma: no cover - defensive catch for unexpected errors
|
|
raise HTTPException(status_code=500, detail="Failed to download video") from exc
|
|
|
|
background_tasks.add_task(_cleanup_temp_dir, temp_dir)
|
|
download_url = str(request.url_for("download_file", file_name=file_path.name))
|
|
|
|
return DownloadResponse(
|
|
file_name=file_path.name,
|
|
download_url=download_url,
|
|
)
|
|
|
|
|
|
def _serialize_video_info(info: dict[str, Any]) -> VideoInfo:
|
|
"""Select and sanitize the fields returned by yt-dlp for the API response."""
|
|
formats: list[FormatInfo] = []
|
|
for fmt in info.get("formats", []):
|
|
format_id = fmt.get("format_id")
|
|
if not format_id:
|
|
continue
|
|
resolution = fmt.get("resolution")
|
|
if not resolution:
|
|
width, height = fmt.get("width"), fmt.get("height")
|
|
if width and height:
|
|
resolution = f"{width}x{height}"
|
|
formats.append(
|
|
FormatInfo(
|
|
format_id=format_id,
|
|
ext=fmt.get("ext"),
|
|
resolution=resolution,
|
|
fps=fmt.get("fps"),
|
|
vcodec=fmt.get("vcodec"),
|
|
acodec=fmt.get("acodec"),
|
|
filesize=fmt.get("filesize"),
|
|
filesize_approx=fmt.get("filesize_approx"),
|
|
)
|
|
)
|
|
|
|
return VideoInfo(
|
|
id=info.get("id", ""),
|
|
title=info.get("title", "unknown"),
|
|
duration=info.get("duration"),
|
|
uploader=info.get("uploader"),
|
|
thumbnail=info.get("thumbnail"),
|
|
webpage_url=info.get("webpage_url"),
|
|
formats=formats,
|
|
)
|
|
|
|
|
|
def _fetch_video_info(url: str) -> dict[str, Any]:
|
|
"""Retrieve metadata for a URL without downloading the media."""
|
|
with YoutubeDL(
|
|
{
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"skip_download": True,
|
|
"noplaylist": True,
|
|
}
|
|
) as ydl:
|
|
return ydl.extract_info(url, download=False)
|
|
|
|
|
|
def _derive_format_selectors(info: dict[str, Any]) -> list[FormatChoice]:
|
|
"""Build a prioritized list of format selectors based on available formats."""
|
|
choices: list[FormatChoice] = []
|
|
|
|
|
|
combo_mp4 = _pick_best_combo(info, video_ext="mp4", audio_ext="m4a")
|
|
if combo_mp4:
|
|
video_fmt, audio_fmt = combo_mp4
|
|
container = _guess_merge_container(video_fmt, audio_fmt)
|
|
selector = f"{video_fmt['format_id']}+{audio_fmt['format_id']}"
|
|
choices.append(FormatChoice(selector, container))
|
|
|
|
combo_any = _pick_best_combo(info)
|
|
if combo_any:
|
|
video_fmt, audio_fmt = combo_any
|
|
container = _guess_merge_container(video_fmt, audio_fmt)
|
|
selector = f"{video_fmt['format_id']}+{audio_fmt['format_id']}"
|
|
choices.append(FormatChoice(selector, container))
|
|
|
|
progressive_mp4 = _pick_best_progressive(info, preferred_ext="mp4")
|
|
if progressive_mp4:
|
|
choices.append(FormatChoice(str(progressive_mp4["format_id"])))
|
|
|
|
progressive_any = _pick_best_progressive(info)
|
|
if progressive_any:
|
|
choices.append(FormatChoice(str(progressive_any["format_id"])))
|
|
|
|
best_declared = info.get("format_id")
|
|
if best_declared:
|
|
choices.append(FormatChoice(str(best_declared)))
|
|
|
|
for fallback in [DEFAULT_FORMAT, *ADDITIONAL_FALLBACKS]:
|
|
choices.append(FormatChoice(fallback))
|
|
|
|
seen: set[str] = set()
|
|
unique: list[FormatChoice] = []
|
|
for choice in choices:
|
|
if not choice.selector or choice.selector in seen:
|
|
continue
|
|
seen.add(choice.selector)
|
|
unique.append(choice)
|
|
|
|
return unique
|
|
|
|
|
|
def _pick_best_progressive(
|
|
info: dict[str, Any], preferred_ext: str | None = None
|
|
) -> dict[str, Any] | None:
|
|
candidates = [
|
|
fmt
|
|
for fmt in info.get("formats", [])
|
|
if fmt.get("acodec") not in (None, "none")
|
|
and fmt.get("vcodec") not in (None, "none")
|
|
and fmt.get("format_id")
|
|
]
|
|
if preferred_ext:
|
|
candidates = [fmt for fmt in candidates if fmt.get("ext") == preferred_ext]
|
|
return _select_highest_quality(candidates)
|
|
|
|
|
|
def _pick_best_combo(
|
|
info: dict[str, Any],
|
|
video_ext: str | None = None,
|
|
audio_ext: str | None = None,
|
|
) -> tuple[dict[str, Any], dict[str, Any]] | None:
|
|
video_fmt = _pick_best_video(info, preferred_ext=video_ext)
|
|
audio_fmt = _pick_best_audio(info, preferred_ext=audio_ext)
|
|
if video_fmt and audio_fmt:
|
|
return video_fmt, audio_fmt
|
|
return None
|
|
|
|
|
|
def _pick_best_video(
|
|
info: dict[str, Any], preferred_ext: str | None = None
|
|
) -> dict[str, Any] | None:
|
|
candidates = [
|
|
fmt
|
|
for fmt in info.get("formats", [])
|
|
if fmt.get("vcodec") not in (None, "none")
|
|
and fmt.get("acodec") in (None, "none")
|
|
and fmt.get("format_id")
|
|
]
|
|
if preferred_ext:
|
|
candidates = [fmt for fmt in candidates if fmt.get("ext") == preferred_ext]
|
|
return _select_highest_quality(candidates)
|
|
|
|
|
|
def _pick_best_audio(
|
|
info: dict[str, Any], preferred_ext: str | None = None
|
|
) -> dict[str, Any] | None:
|
|
candidates = [
|
|
fmt
|
|
for fmt in info.get("formats", [])
|
|
if fmt.get("acodec") not in (None, "none")
|
|
and fmt.get("vcodec") in (None, "none")
|
|
and fmt.get("format_id")
|
|
]
|
|
if preferred_ext:
|
|
candidates = [fmt for fmt in candidates if fmt.get("ext") == preferred_ext]
|
|
return _select_highest_quality(candidates)
|
|
|
|
|
|
def _select_highest_quality(candidates: list[dict[str, Any]]) -> dict[str, Any] | None:
|
|
if not candidates:
|
|
return None
|
|
return max(candidates, key=_format_quality_key)
|
|
|
|
|
|
def _format_quality_key(fmt: dict[str, Any]) -> tuple[int, float, float, float]:
|
|
height = fmt.get("height") or 0
|
|
fps = fmt.get("fps") or 0.0
|
|
tbr = fmt.get("tbr") or 0.0
|
|
filesize = fmt.get("filesize") or fmt.get("filesize_approx") or 0.0
|
|
return (height, fps, tbr, filesize)
|
|
|
|
|
|
def _guess_merge_container(
|
|
video_fmt: dict[str, Any], audio_fmt: dict[str, Any]
|
|
) -> str | None:
|
|
video_ext = (video_fmt.get("ext") or "").lower()
|
|
audio_ext = (audio_fmt.get("ext") or "").lower()
|
|
|
|
if video_ext == "mp4" and audio_ext in {"m4a", "mp4", "aac", "unknown", ""}:
|
|
return "mp4"
|
|
if video_ext == "webm" and audio_ext in {"webm", "opus", "vorbis"}:
|
|
return "webm"
|
|
if video_ext in {"mkv", "flv", "3gp"}:
|
|
return video_ext
|
|
if video_ext == audio_ext and video_ext:
|
|
return video_ext
|
|
if video_ext == "mp4":
|
|
# Fallback to mkv when mixing mp4 video with non-mp4 audio
|
|
return "mkv"
|
|
return None
|
|
|
|
|
|
def _download_video(
|
|
url: str, format_id: str | None, filename: str | None
|
|
) -> tuple[Path, Path]:
|
|
"""Download a video with yt-dlp, selecting the best available format with graceful fallbacks, and persist it to the downloads directory."""
|
|
temp_dir = Path(tempfile.mkdtemp(prefix="yt_dlp_", dir=str(WORKING_DIR)))
|
|
output_template = _build_output_template(temp_dir, filename)
|
|
|
|
selectors: list[FormatChoice]
|
|
if format_id:
|
|
selectors = [FormatChoice(format_id)]
|
|
else:
|
|
info = _fetch_video_info(url)
|
|
selectors = _derive_format_selectors(info)
|
|
|
|
last_error: Exception | None = None
|
|
|
|
for choice in selectors:
|
|
try:
|
|
file_path = _execute_download(url, choice, output_template)
|
|
file_path = Path(file_path)
|
|
if not file_path.exists() or file_path.stat().st_size == 0:
|
|
raise FileNotFoundError(file_path)
|
|
|
|
stored_path = _store_download(file_path)
|
|
return stored_path, temp_dir
|
|
except (DownloadError, FileNotFoundError) as exc:
|
|
last_error = exc
|
|
_cleanup_partial_downloads(temp_dir)
|
|
if format_id:
|
|
break
|
|
continue
|
|
|
|
if isinstance(last_error, DownloadError):
|
|
raise last_error
|
|
message = str(last_error) if last_error else "Unknown download failure"
|
|
raise DownloadError(message) from last_error
|
|
|
|
|
|
def _execute_download(url: str, choice: FormatChoice, output_template: str) -> Path:
|
|
"""Run yt-dlp with the provided selector and return the resulting file path."""
|
|
ydl_opts = {
|
|
"format": choice.selector,
|
|
"outtmpl": output_template,
|
|
"noplaylist": True,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
}
|
|
if choice.container:
|
|
ydl_opts["merge_output_format"] = choice.container
|
|
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=True)
|
|
downloads = info.get("requested_downloads") or []
|
|
primary = downloads[0] if downloads else info
|
|
candidate = (
|
|
primary.get("_filename")
|
|
or info.get("_filename")
|
|
or ydl.prepare_filename(info)
|
|
)
|
|
return Path(candidate)
|
|
|
|
|
|
def _cleanup_partial_downloads(temp_dir: Path) -> None:
|
|
"""Remove partial files from a temporary directory before retrying a download."""
|
|
for partial in temp_dir.glob("*"):
|
|
try:
|
|
if partial.is_file():
|
|
partial.unlink(missing_ok=True)
|
|
except OSError:
|
|
# Ignore partial cleanup issues; retries can still proceed.
|
|
pass
|
|
|
|
|
|
def _store_download(file_path: Path) -> Path:
|
|
"""Move a completed download into the project downloads directory."""
|
|
target = DOWNLOADS_DIR / file_path.name
|
|
if target.exists():
|
|
try:
|
|
if target.is_file():
|
|
target.unlink(missing_ok=True)
|
|
else:
|
|
shutil.rmtree(target, ignore_errors=True)
|
|
except OSError:
|
|
raise HTTPException(status_code=500, detail="Failed to replace existing file")
|
|
|
|
shutil.move(str(file_path), target)
|
|
return target
|
|
|
|
|
|
def _build_output_template(temp_dir: Path, filename: str | None) -> str:
|
|
"""Construct the yt-dlp output template, ensuring an extension placeholder exists."""
|
|
default_template = "%(title)s.%(ext)s"
|
|
if not filename:
|
|
return str(temp_dir / default_template)
|
|
|
|
safe_filename = Path(filename).name
|
|
if not safe_filename:
|
|
safe_filename = "download"
|
|
filename = safe_filename
|
|
|
|
# Allow users to omit the extension; yt-dlp will substitute it using %(ext)s.
|
|
if "%(ext)s" not in filename and not Path(filename).suffix:
|
|
filename = f"{filename}.%(ext)s"
|
|
|
|
return str(temp_dir / filename)
|
|
|
|
|
|
def _cleanup_temp_dir(temp_dir: Path) -> None:
|
|
"""Remove the temporary directory created for a download."""
|
|
try:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
except OSError:
|
|
# Ignore cleanup errors; the directory lives in the system temp folder.
|
|
pass
|
|
|
|
|
|
@app.get("/api/download/{file_name}", name="download_file")
|
|
async def stream_download(file_name: str) -> FileResponse:
|
|
"""Stream a stored download with HTTP range support for resumable transfers."""
|
|
file_path = _resolve_download_path(file_name)
|
|
media_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
|
|
stat_result = file_path.stat()
|
|
return FileResponse(
|
|
path=file_path,
|
|
filename=file_path.name,
|
|
media_type=media_type,
|
|
stat_result=stat_result,
|
|
headers={"Accept-Ranges": "bytes"},
|
|
)
|
|
|
|
|
|
def _resolve_download_path(file_name: str) -> Path:
|
|
"""Ensure the requested file lives inside the downloads directory and exists."""
|
|
safe_name = Path(file_name).name
|
|
candidate = (DOWNLOADS_DIR / safe_name).resolve()
|
|
|
|
try:
|
|
candidate.relative_to(DOWNLOADS_ROOT)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=404, detail="File not found") from exc
|
|
|
|
if not candidate.is_file():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
return candidate
|