import os import warnings import imageio import whisper import numpy as np from PIL import Image from skimage.metrics import structural_similarity as ssim from collections import defaultdict import subprocess # ======================== 配置参数 ======================== warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径(需修改为实际路径) MODEL_DIR = "D:/whisper_models" # Whisper模型目录 FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装目录 SSIM_THRESHOLD = 0.85 # 关键帧去重阈值(值越小保留越多帧) FRAME_INTERVAL = 2 # 抽帧间隔(秒) OUTPUT_DIR = "output_summary" # 输出目录 # ======================================================== # 动态添加FFmpeg到环境变量(修复核心问题) os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"] def check_ffmpeg(): """验证FFmpeg可访问性""" try: subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print("FFmpeg验证成功!") return True except Exception as e: print(f"FFmpeg验证失败: {str(e)}") print(f"请确认FFmpeg路径是否正确: {FFMPEG_BIN}") return False def extract_keyframes_with_time(video_path: str) -> tuple: """提取关键帧及其时间戳""" try: # 初始化视频读取器 reader = imageio.get_reader(video_path, 'ffmpeg') fps = reader.get_meta_data().get('fps', 30) keyframes = [] keyframe_times = [] prev_frame = None for i, frame in enumerate(reader): # 按固定间隔抽帧(每FRAME_INTERVAL秒抽1帧) if i % int(fps * FRAME_INTERVAL) != 0: continue current_time = i / fps # 降采样至320x240加速处理 curr_frame = Image.fromarray(frame).resize((320, 240)) if prev_frame is None: # 首帧强制保留 keyframes.append(curr_frame) keyframe_times.append(current_time) prev_frame = np.array(curr_frame.convert('L')) else: # 计算灰度图SSIM curr_gray = np.array(curr_frame.convert('L')) score = ssim(prev_frame, curr_gray, data_range=255) # 判定为关键帧的条件 if score < SSIM_THRESHOLD: keyframes.append(curr_frame) keyframe_times.append(current_time) prev_frame = curr_gray reader.close() print(f"关键帧提取完成,共{len(keyframes)}帧") return keyframes, keyframe_times except Exception as e: print(f"关键帧提取失败: {str(e)}") return [], [] def generate_page_intervals(keyframe_times, video_duration): """生成每个关键帧对应的时间段: [start, end)""" intervals = [] n = len(keyframe_times) for i in range(n): start = keyframe_times[i] end = keyframe_times[i + 1] if i < n - 1 else video_duration intervals.append((start, end)) return intervals def align_text_with_keyframes(video_path: str, keyframe_times: list, video_duration: float) -> list: """语音-关键帧对齐(修复版)""" try: # 加载模型(不再传递ffmpeg_path) model = whisper.load_model("tiny", device="cpu", download_root=MODEL_DIR) result = model.transcribe(video_path, fp16=False) # 移除ffmpeg_path参数 segments = result["segments"] # 生成时间段并聚合文本 page_intervals = generate_page_intervals(keyframe_times, video_duration) page_texts = defaultdict(list) # 将语音段落分配到对应页面(按起始时间判断) for seg in segments: seg_start = seg["start"] seg_end = seg["end"] for page_idx, (page_start, page_end) in enumerate(page_intervals): if page_start <= seg_start < page_end: page_texts[page_idx].append(seg["text"].strip()) break # 每个段落只属于一个页面 # 合并页面文本 merged = [] for page_idx in sorted(page_texts.keys()): full_text = " ".join(page_texts[page_idx]) merged.append({ "page": page_idx, "start_time": page_intervals[page_idx][0], "end_time": page_intervals[page_idx][1], "text": full_text }) return merged except Exception as e: print(f"语音处理失败: {str(e)}") return [] def save_summary(merged_texts, keyframes, output_dir): """保存关键帧和对应的完整语音文本""" os.makedirs(output_dir, exist_ok=True) for idx, item in enumerate(merged_texts): # 保存关键帧图像 img_path = os.path.join(output_dir, f"page_{idx}.jpg") keyframes[idx].save(img_path) # 保存文本 txt_path = os.path.join(output_dir, f"page_{idx}.txt") with open(txt_path, "w", encoding="utf-8") as f: f.write(f"页面时间段: {item['start_time']:.1f}s - {item['end_time']:.1f}s\n\n") f.write("完整讲解内容:\n") f.write(item["text"]) print(f"已保存: {txt_path}") if __name__ == "__main__": # 步骤0: 验证FFmpeg if not check_ffmpeg(): exit() # 步骤1: 检查视频文件存在性 if not os.path.exists(VIDEO_PATH): print(f"错误:视频文件 {VIDEO_PATH} 不存在!") exit() # 步骤2: 提取关键帧 keyframes, keyframe_times = extract_keyframes_with_time(VIDEO_PATH) if not keyframes: exit() # 步骤3: 获取视频总时长 reader = imageio.get_reader(VIDEO_PATH) video_duration = reader.get_meta_data()["duration"] reader.close() # 步骤4: 语音对齐与聚合 merged_texts = align_text_with_keyframes(VIDEO_PATH, keyframe_times, video_duration) # 步骤5: 保存结果 if merged_texts: save_summary(merged_texts, keyframes, OUTPUT_DIR) print(f"\n处理完成!结果已保存至目录: {os.path.abspath(OUTPUT_DIR)}") else: print("无有效语音内容可保存")