Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions backend/video_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import uuid
import yt_dlp
import logging
import subprocess
from urllib.parse import urlparse, unquote
from pathlib import Path
from typing import Optional

Expand All @@ -23,12 +25,62 @@ def __init__(self):
'preferredquality': '192'
}],
# 全局FFmpeg参数:单声道 + 16k 采样率 + faststart
'postprocessor_args': ['-ac', '1', '-ar', '16000', '-movflags', '+faststart'],
'postprocessor_args': ['-ac', '1', '-ar', '16000', '-movflags', '+faststart' ],
'prefer_ffmpeg': True,
'quiet': True,
'no_warnings': True,
'noplaylist': True, # 强制只下载单个视频,不下载播放列表
}

def _resolve_local_path(self, url: str) -> Optional[Path]:
"""Resolve file:// URL or direct local path to a Path."""
parsed = urlparse(url)
if parsed.scheme == "file":
if parsed.netloc and parsed.netloc not in ("", "localhost"):
raise Exception("不支持的 file:// URL 主机,仅允许本机路径")
return Path(unquote(parsed.path)).expanduser()

# 支持用户直接传本地绝对/相对路径
candidate = Path(url).expanduser()
if candidate.exists():
return candidate
return None

async def _convert_local_video_to_audio(self, local_path: Path, output_dir: Path) -> tuple[str, str]:
"""Convert local video/audio file to normalized m4a."""
import asyncio

if not local_path.exists() or not local_path.is_file():
raise Exception(f"本地文件不存在: {local_path}")

unique_id = str(uuid.uuid4())[:8]
output_audio = output_dir / f"audio_{unique_id}.m4a"

ffmpeg_cmd = [
"ffmpeg",
"-y",
"-i",
str(local_path),
"-vn",
"-ac",
"1",
"-ar",
"16000",
"-c:a",
"aac",
"-b:a",
"192k",
"-movflags",
"+faststart",
str(output_audio),
]
logger.info(f"检测到本地文件输入,开始转换: {local_path}")
await asyncio.to_thread(subprocess.check_call, ffmpeg_cmd)

if not output_audio.exists():
raise Exception("本地文件转换失败,未生成音频文件")

return str(output_audio), local_path.stem

async def fetch_subtitles(self, url: str, output_dir: Path) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""
Expand Down Expand Up @@ -310,6 +362,12 @@ async def download_and_convert(self, url: str, output_dir: Path) -> tuple[str, s
try:
# 创建输出目录
output_dir.mkdir(exist_ok=True)

local_path = self._resolve_local_path(url)
if local_path is not None:
audio_file, video_title = await self._convert_local_video_to_audio(local_path, output_dir)
logger.info(f"音频文件已保存: {audio_file}")
return audio_file, video_title

# 生成唯一的文件名
unique_id = str(uuid.uuid4())[:8]
Expand Down Expand Up @@ -349,7 +407,7 @@ async def download_and_convert(self, url: str, output_dir: Path) -> tuple[str, s

# 校验时长,如果和源视频差异较大,尝试一次ffmpeg规范化重封装
try:
import subprocess, shlex
import shlex
probe_cmd = f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(audio_file)}"
out = subprocess.check_output(probe_cmd, shell=True).decode().strip()
actual_duration = float(out) if out else 0.0
Expand Down