""" FastAPI application that exposes endpoints for inspecting and downloading videos via yt-dlp. """ from __future__ import annotations import mimetypes import shutil import tempfile from dataclasses import dataclass from pathlib import Path from typing import Any from uuid import uuid4 from fastapi import BackgroundTasks, FastAPI, HTTPException, Query, Request from fastapi.concurrency import run_in_threadpool from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, HttpUrl from yt_dlp import YoutubeDL from yt_dlp.utils import DownloadError app = FastAPI(title="Video Download API", version="0.1.0") BASE_DIR = Path(__file__).resolve().parent.parent DOWNLOADS_DIR = BASE_DIR / "tmp_downloads" DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True) WORKING_DIR = BASE_DIR / "tmp_work" WORKING_DIR.mkdir(parents=True, exist_ok=True) DOWNLOADS_ROOT = DOWNLOADS_DIR.resolve() DEFAULT_FORMAT = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]" ADDITIONAL_FALLBACKS: list[str] = [ "bestvideo+bestaudio/best", "best[ext=mp4]/best", "best", ] @dataclass(frozen=True) class FormatChoice: selector: str container: str | None = None # Allow the front-end (likely running on localhost:3000) to call the API. app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) app.mount("/downloads", StaticFiles(directory=DOWNLOADS_DIR), name="downloads") class FormatInfo(BaseModel): format_id: str ext: str | None = None resolution: str | None = None fps: float | None = None vcodec: str | None = None acodec: str | None = None filesize: int | None = None filesize_approx: int | None = None class VideoInfo(BaseModel): id: str title: str duration: int | None = None uploader: str | None = None thumbnail: HttpUrl | None = None webpage_url: HttpUrl | None = None formats: list[FormatInfo] class DownloadRequest(BaseModel): url: HttpUrl format_id: str | None = None filename: str | None = None class DownloadResponse(BaseModel): file_name: str download_url: HttpUrl @app.get("/health") async def healthcheck() -> dict[str, str]: """Lightweight readiness probe for container orchestration.""" return {"status": "ok"} @app.get("/api/info", response_model=VideoInfo) async def get_video_info( url: HttpUrl = Query(..., description="Public video URL to inspect") ) -> VideoInfo: """Return metadata and available formats for a given video URL.""" def _extract() -> dict[str, Any]: return _fetch_video_info(str(url)) try: info = await run_in_threadpool(_extract) except DownloadError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except Exception as exc: # pragma: no cover - defensive catch for unexpected errors raise HTTPException(status_code=500, detail="Failed to fetch video info") from exc return _serialize_video_info(info) @app.post("/api/download", response_model=DownloadResponse) async def download_video( payload: DownloadRequest, request: Request, background_tasks: BackgroundTasks ) -> DownloadResponse: """Download the requested video and return an accessible URL to the stored file.""" def _download() -> tuple[Path, Path]: return _download_video(str(payload.url), payload.format_id, payload.filename) try: file_path, temp_dir = await run_in_threadpool(_download) except DownloadError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="Video file missing after download") from exc except Exception as exc: # pragma: no cover - defensive catch for unexpected errors raise HTTPException(status_code=500, detail="Failed to download video") from exc background_tasks.add_task(_cleanup_temp_dir, temp_dir) download_url = str(request.url_for("download_file", file_name=file_path.name)) return DownloadResponse( file_name=file_path.name, download_url=download_url, ) def _serialize_video_info(info: dict[str, Any]) -> VideoInfo: """Select and sanitize the fields returned by yt-dlp for the API response.""" formats: list[FormatInfo] = [] for fmt in info.get("formats", []): format_id = fmt.get("format_id") if not format_id: continue resolution = fmt.get("resolution") if not resolution: width, height = fmt.get("width"), fmt.get("height") if width and height: resolution = f"{width}x{height}" formats.append( FormatInfo( format_id=format_id, ext=fmt.get("ext"), resolution=resolution, fps=fmt.get("fps"), vcodec=fmt.get("vcodec"), acodec=fmt.get("acodec"), filesize=fmt.get("filesize"), filesize_approx=fmt.get("filesize_approx"), ) ) return VideoInfo( id=info.get("id", ""), title=info.get("title", "unknown"), duration=info.get("duration"), uploader=info.get("uploader"), thumbnail=info.get("thumbnail"), webpage_url=info.get("webpage_url"), formats=formats, ) def _fetch_video_info(url: str) -> dict[str, Any]: """Retrieve metadata for a URL without downloading the media.""" with YoutubeDL( { "quiet": True, "no_warnings": True, "skip_download": True, "noplaylist": True, } ) as ydl: return ydl.extract_info(url, download=False) def _derive_format_selectors(info: dict[str, Any]) -> list[FormatChoice]: """Build a prioritized list of format selectors based on available formats.""" choices: list[FormatChoice] = [] combo_mp4 = _pick_best_combo(info, video_ext="mp4", audio_ext="m4a") if combo_mp4: video_fmt, audio_fmt = combo_mp4 container = _guess_merge_container(video_fmt, audio_fmt) selector = f"{video_fmt['format_id']}+{audio_fmt['format_id']}" choices.append(FormatChoice(selector, container)) combo_any = _pick_best_combo(info) if combo_any: video_fmt, audio_fmt = combo_any container = _guess_merge_container(video_fmt, audio_fmt) selector = f"{video_fmt['format_id']}+{audio_fmt['format_id']}" choices.append(FormatChoice(selector, container)) progressive_mp4 = _pick_best_progressive(info, preferred_ext="mp4") if progressive_mp4: choices.append(FormatChoice(str(progressive_mp4["format_id"]))) progressive_any = _pick_best_progressive(info) if progressive_any: choices.append(FormatChoice(str(progressive_any["format_id"]))) best_declared = info.get("format_id") if best_declared: choices.append(FormatChoice(str(best_declared))) for fallback in [DEFAULT_FORMAT, *ADDITIONAL_FALLBACKS]: choices.append(FormatChoice(fallback)) seen: set[str] = set() unique: list[FormatChoice] = [] for choice in choices: if not choice.selector or choice.selector in seen: continue seen.add(choice.selector) unique.append(choice) return unique def _pick_best_progressive( info: dict[str, Any], preferred_ext: str | None = None ) -> dict[str, Any] | None: candidates = [ fmt for fmt in info.get("formats", []) if fmt.get("acodec") not in (None, "none") and fmt.get("vcodec") not in (None, "none") and fmt.get("format_id") ] if preferred_ext: candidates = [fmt for fmt in candidates if fmt.get("ext") == preferred_ext] return _select_highest_quality(candidates) def _pick_best_combo( info: dict[str, Any], video_ext: str | None = None, audio_ext: str | None = None, ) -> tuple[dict[str, Any], dict[str, Any]] | None: video_fmt = _pick_best_video(info, preferred_ext=video_ext) audio_fmt = _pick_best_audio(info, preferred_ext=audio_ext) if video_fmt and audio_fmt: return video_fmt, audio_fmt return None def _pick_best_video( info: dict[str, Any], preferred_ext: str | None = None ) -> dict[str, Any] | None: candidates = [ fmt for fmt in info.get("formats", []) if fmt.get("vcodec") not in (None, "none") and fmt.get("acodec") in (None, "none") and fmt.get("format_id") ] if preferred_ext: candidates = [fmt for fmt in candidates if fmt.get("ext") == preferred_ext] return _select_highest_quality(candidates) def _pick_best_audio( info: dict[str, Any], preferred_ext: str | None = None ) -> dict[str, Any] | None: candidates = [ fmt for fmt in info.get("formats", []) if fmt.get("acodec") not in (None, "none") and fmt.get("vcodec") in (None, "none") and fmt.get("format_id") ] if preferred_ext: candidates = [fmt for fmt in candidates if fmt.get("ext") == preferred_ext] return _select_highest_quality(candidates) def _select_highest_quality(candidates: list[dict[str, Any]]) -> dict[str, Any] | None: if not candidates: return None return max(candidates, key=_format_quality_key) def _format_quality_key(fmt: dict[str, Any]) -> tuple[int, float, float, float]: height = fmt.get("height") or 0 fps = fmt.get("fps") or 0.0 tbr = fmt.get("tbr") or 0.0 filesize = fmt.get("filesize") or fmt.get("filesize_approx") or 0.0 return (height, fps, tbr, filesize) def _guess_merge_container( video_fmt: dict[str, Any], audio_fmt: dict[str, Any] ) -> str | None: video_ext = (video_fmt.get("ext") or "").lower() audio_ext = (audio_fmt.get("ext") or "").lower() if video_ext == "mp4" and audio_ext in {"m4a", "mp4", "aac", "unknown", ""}: return "mp4" if video_ext == "webm" and audio_ext in {"webm", "opus", "vorbis"}: return "webm" if video_ext in {"mkv", "flv", "3gp"}: return video_ext if video_ext == audio_ext and video_ext: return video_ext if video_ext == "mp4": # Fallback to mkv when mixing mp4 video with non-mp4 audio return "mkv" return None def _download_video( url: str, format_id: str | None, filename: str | None ) -> tuple[Path, Path]: """Download a video with yt-dlp, selecting the best available format with graceful fallbacks, and persist it to the downloads directory.""" temp_dir = Path(tempfile.mkdtemp(prefix="yt_dlp_", dir=str(WORKING_DIR))) output_template = _build_output_template(temp_dir, filename) selectors: list[FormatChoice] if format_id: selectors = [FormatChoice(format_id)] else: info = _fetch_video_info(url) selectors = _derive_format_selectors(info) last_error: Exception | None = None for choice in selectors: try: file_path = _execute_download(url, choice, output_template) file_path = Path(file_path) if not file_path.exists() or file_path.stat().st_size == 0: raise FileNotFoundError(file_path) stored_path = _store_download(file_path) return stored_path, temp_dir except (DownloadError, FileNotFoundError) as exc: last_error = exc _cleanup_partial_downloads(temp_dir) if format_id: break continue if isinstance(last_error, DownloadError): raise last_error message = str(last_error) if last_error else "Unknown download failure" raise DownloadError(message) from last_error def _execute_download(url: str, choice: FormatChoice, output_template: str) -> Path: """Run yt-dlp with the provided selector and return the resulting file path.""" ydl_opts = { "format": choice.selector, "outtmpl": output_template, "noplaylist": True, "quiet": True, "no_warnings": True, } if choice.container: ydl_opts["merge_output_format"] = choice.container with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) downloads = info.get("requested_downloads") or [] primary = downloads[0] if downloads else info candidate = ( primary.get("_filename") or info.get("_filename") or ydl.prepare_filename(info) ) return Path(candidate) def _cleanup_partial_downloads(temp_dir: Path) -> None: """Remove partial files from a temporary directory before retrying a download.""" for partial in temp_dir.glob("*"): try: if partial.is_file(): partial.unlink(missing_ok=True) except OSError: # Ignore partial cleanup issues; retries can still proceed. pass def _store_download(file_path: Path) -> Path: """Move a completed download into the project downloads directory.""" target = DOWNLOADS_DIR / file_path.name if target.exists(): try: if target.is_file(): target.unlink(missing_ok=True) else: shutil.rmtree(target, ignore_errors=True) except OSError: raise HTTPException(status_code=500, detail="Failed to replace existing file") shutil.move(str(file_path), target) return target def _build_output_template(temp_dir: Path, filename: str | None) -> str: """Construct the yt-dlp output template, ensuring an extension placeholder exists.""" default_template = "%(title)s.%(ext)s" if not filename: return str(temp_dir / default_template) safe_filename = Path(filename).name if not safe_filename: safe_filename = "download" filename = safe_filename # Allow users to omit the extension; yt-dlp will substitute it using %(ext)s. if "%(ext)s" not in filename and not Path(filename).suffix: filename = f"{filename}.%(ext)s" return str(temp_dir / filename) def _cleanup_temp_dir(temp_dir: Path) -> None: """Remove the temporary directory created for a download.""" try: shutil.rmtree(temp_dir, ignore_errors=True) except OSError: # Ignore cleanup errors; the directory lives in the system temp folder. pass @app.get("/api/download/{file_name}", name="download_file") async def stream_download(file_name: str) -> FileResponse: """Stream a stored download with HTTP range support for resumable transfers.""" file_path = _resolve_download_path(file_name) media_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream" stat_result = file_path.stat() return FileResponse( path=file_path, filename=file_path.name, media_type=media_type, stat_result=stat_result, headers={"Accept-Ranges": "bytes"}, ) def _resolve_download_path(file_name: str) -> Path: """Ensure the requested file lives inside the downloads directory and exists.""" safe_name = Path(file_name).name candidate = (DOWNLOADS_DIR / safe_name).resolve() try: candidate.relative_to(DOWNLOADS_ROOT) except ValueError as exc: raise HTTPException(status_code=404, detail="File not found") from exc if not candidate.is_file(): raise HTTPException(status_code=404, detail="File not found") return candidate