import os import re import base64 import warnings import imageio import whisper import numpy as np import pdfkit from PIL import Image from skimage.metrics import structural_similarity as ssim from collections import defaultdict import subprocess from jinja2 import Environment import cv2 from scipy.signal import find_peaks from skimage.feature import hog from skimage.color import rgb2gray # ======================== 全局配置 ======================== warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") VIDEO_PATH = "D:/python项目文件/1/input3.mp4" # 输入视频路径 MODEL_DIR = "D:/whisper_models" # Whisper模型目录 FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径 WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径 SSIM_THRESHOLD = 0.85 # 关键帧去重阈值 FRAME_INTERVAL = 2 # 抽帧间隔(秒) OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录 TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列 HOG_THRESHOLD = 0.7 # HOG特征相似度阈值 COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值 WHISPER_MODEL = "base" # Whisper模型大小 PROFESSIONAL_TERMS = { "人工智能": "AI", "机器学习": "ML", "深度学习": "DL", "神经网络": "NN", "卷积神经网络": "CNN", "循环神经网络": "RNN", "自然语言处理": "NLP", "计算机视觉": "CV", "大数据": "Big Data", "云计算": "Cloud Computing" } # 专业术语词典 # ======================================================== # ---------------------- 核心功能模块 ---------------------- class VideoProcessor: def __init__(self): os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"] @staticmethod def check_ffmpeg(): """验证FFmpeg可用性""" try: subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print("[系统] FFmpeg验证成功") return True except Exception as e: print(f"[错误] FFmpeg验证失败: {str(e)}") return False @staticmethod def calculate_color_histogram(frame): """计算颜色直方图特征""" hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) cv2.normalize(hist, hist) return hist.flatten() @staticmethod def calculate_hog_features(frame): """计算HOG特征""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) features = hog(gray, orientations=8, pixels_per_cell=(16, 16), cells_per_block=(1, 1), visualize=False) return features @staticmethod def is_ppt_transition(frame1, frame2): """检测PPT页面切换""" # 转换为灰度图 gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) # 计算边缘 edges1 = cv2.Canny(gray1, 100, 200) edges2 = cv2.Canny(gray2, 100, 200) # 计算边缘差异 diff = cv2.absdiff(edges1, edges2) return np.mean(diff) > 50 # 阈值可调整 @staticmethod def extract_keyframes(video_path: str) -> tuple: """提取去重关键帧及其时间戳(多特征融合)""" try: reader = imageio.get_reader(video_path) fps = reader.get_meta_data()["fps"] total_frames = reader.count_frames() print(f"[信息] 视频总帧数: {total_frames}") keyframes = [] timestamps = [] prev_frame = None frame_count = 0 last_progress = 0 for idx, frame in enumerate(reader): # 显示进度 progress = int((idx / total_frames) * 100) if progress != last_progress and progress % 5 == 0: # 每5%显示一次进度 print(f"[进度] 处理中: {progress}% ({idx}/{total_frames}帧)") last_progress = progress curr_time = idx / fps if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL: continue # 多特征相似度计算 if prev_frame is not None: try: # 1. SSIM相似度(使用简化版本) gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) ssim_score = ssim(gray_prev, gray_curr, win_size=3) # 2. 颜色直方图相似度 hist_prev = VideoProcessor.calculate_color_histogram(prev_frame) hist_curr = VideoProcessor.calculate_color_histogram(frame) color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL) # 3. HOG特征相似度(仅在SSIM和颜色相似度较高时计算) if ssim_score > 0.8 and color_sim > 0.8: hog_prev = VideoProcessor.calculate_hog_features(prev_frame) hog_curr = VideoProcessor.calculate_hog_features(frame) hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr)) else: hog_sim = 0 # 如果SSIM和颜色相似度低,直接跳过HOG计算 # 4. PPT页面切换检测 is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame) # 综合判断 if (ssim_score > SSIM_THRESHOLD and color_sim > COLOR_THRESHOLD and hog_sim > HOG_THRESHOLD and not is_transition): continue except Exception as e: print(f"[警告] 特征计算失败: {str(e)}") continue keyframes.append(Image.fromarray(frame)) timestamps.append(curr_time) prev_frame = frame frame_count += 1 # 每处理100帧强制垃圾回收 if frame_count % 100 == 0: import gc gc.collect() reader.close() print(f"[图像] 关键帧提取完成,共{len(keyframes)}帧") return keyframes, timestamps except Exception as e: print(f"[错误] 关键帧提取失败: {str(e)}") return [], [] @staticmethod def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list: """语音识别与时间戳获取(支持中英文混合)""" try: # 使用更大的模型提高准确率 model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR) # 配置转写参数 result = model.transcribe( video_path, fp16=False, language="zh", task="transcribe", verbose=True, initial_prompt="这是一段包含中英文的PPT讲解视频,可能包含专业术语。" ) segments = result.get("segments", []) # 后处理:专业术语替换 for seg in segments: text = seg["text"] for cn, en in PROFESSIONAL_TERMS.items(): text = text.replace(cn, f"{cn}({en})") seg["text"] = text return segments except Exception as e: print(f"[错误] 语音识别失败: {str(e)}") return [] # ---------------------- 业务逻辑模块 ---------------------- class ContentAligner: @staticmethod def generate_page_intervals(timestamps: list, duration: float) -> list: """生成页面时间段""" intervals = [] for i in range(len(timestamps)): start = timestamps[i] end = timestamps[i + 1] if i < len(timestamps) - 1 else duration intervals.append((start, end)) return intervals @staticmethod def calculate_text_similarity(text1: str, text2: str) -> float: """计算文本相似度""" # 使用简单的词重叠度计算 words1 = set(re.findall(r'\w+', text1.lower())) words2 = set(re.findall(r'\w+', text2.lower())) if not words1 or not words2: return 0.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) @staticmethod def find_best_match(segments: list, intervals: list) -> dict: """为每个语音片段找到最佳匹配的页面""" page_texts = defaultdict(list) unmatched_segments = [] for seg in segments: seg_start = seg["start"] best_match = None best_score = 0.0 # 1. 首先尝试时间戳匹配 for page_idx, (start, end) in enumerate(intervals): if start <= seg_start < end: best_match = page_idx break # 2. 如果时间戳匹配失败,尝试文本相似度匹配 if best_match is None: for page_idx, (start, end) in enumerate(intervals): # 获取该页面的所有文本 page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end]) similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text) if similarity > best_score: best_score = similarity best_match = page_idx # 3. 如果找到匹配,添加到对应页面 if best_match is not None: page_texts[best_match].append(seg) else: unmatched_segments.append(seg) # 4. 处理未匹配的片段 if unmatched_segments: print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段") # 将未匹配片段添加到最近的页面 for seg in unmatched_segments: closest_page = min(range(len(intervals)), key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2)) page_texts[closest_page].append(seg) return page_texts @staticmethod def align_content(video_path: str, timestamps: list) -> list: """语音-画面对齐主逻辑(改进版)""" try: reader = imageio.get_reader(video_path) duration = reader.get_meta_data()["duration"] reader.close() except: duration = timestamps[-1] + FRAME_INTERVAL segments = VideoProcessor.transcribe_audio(video_path) intervals = ContentAligner.generate_page_intervals(timestamps, duration) # 使用改进的匹配算法 page_texts = ContentAligner.find_best_match(segments, intervals) # 生成最终的对齐数据 aligned_data = [] for idx in range(len(intervals)): text = " ".join([seg["text"] for seg in page_texts.get(idx, [])]) aligned_data.append({ "page": idx, "start_time": intervals[idx][0], "end_time": intervals[idx][1], "text": text }) return aligned_data # ---------------------- 摘要生成模块 ---------------------- class SummaryGenerator: @staticmethod def optimize_text(text: str) -> str: """文本浓缩优化""" sentences = re.split(r'[。!?]', text) filtered = [] seen = set() for sent in sentences: sent = sent.strip() if (len(sent) >= 10 and not any(word in sent for word in TRANSITION_WORDS) and sent not in seen): filtered.append(sent) seen.add(sent) return '。'.join(filtered) + '。' if filtered else "" @staticmethod def generate_html(aligned_data: list, keyframes: list, output_dir: str): """生成HTML报告""" pages_data = [] temp_img_dir = os.path.join(output_dir, "_temp_images") os.makedirs(temp_img_dir, exist_ok=True) try: for idx, frame in enumerate(keyframes): img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg") frame.save(img_path) with open(img_path, "rb") as f: img_data = base64.b64encode(f.read()).decode("utf-8") pages_data.append({ "num": idx + 1, "time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s", "image": f"data:image/jpeg;base64,{img_data}", "text": SummaryGenerator.optimize_text(aligned_data[idx]["text"]) }) env = Environment() template = env.from_string(""" PPT视频摘要报告

PPT视频结构化摘要

{% for page in pages %}

页面 {{ page.num }}

{{ page.time }}
页面截图
{{ page.text }}
{% endfor %} """) output_path = os.path.join(output_dir, "summary.html") with open(output_path, "w", encoding="utf-8") as f: f.write(template.render(pages=pages_data)) print(f"[输出] HTML报告已生成: {output_path}") finally: for f in os.listdir(temp_img_dir): os.remove(os.path.join(temp_img_dir, f)) os.rmdir(temp_img_dir) @staticmethod def generate_pdf(aligned_data: list, keyframes: list, output_dir: str): """生成PDF报告(优化版)""" temp_html = os.path.join(output_dir, "_temp_pdf.html") temp_img_dir = os.path.join(output_dir, "_temp_pdf_images") os.makedirs(temp_img_dir, exist_ok=True) try: # 使用绝对路径 abs_temp_img_dir = os.path.abspath(temp_img_dir) html_content = """

PPT视频结构化摘要

{% for page in pages %}
第 {{ page.num }} 页
时间区间:{{ page.time }}
页面截图
{{ page.text }}
{% endfor %} """ pages_data = [] for idx, frame in enumerate(keyframes): img_filename = f"page_{idx}.jpg" img_path = os.path.join(abs_temp_img_dir, img_filename) frame.save(img_path) pages_data.append({ "num": idx + 1, "time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s", "image_path": img_path, "text": SummaryGenerator.optimize_text(aligned_data[idx]["text"]) }) env = Environment() template = env.from_string(html_content) with open(temp_html, "w", encoding="utf-8") as f: f.write(template.render(pages=pages_data)) # PDF生成选项 options = { "enable-local-file-access": "", "encoding": "UTF-8", "margin-top": "20mm", "margin-bottom": "20mm", "margin-left": "20mm", "margin-right": "20mm", "no-stop-slow-scripts": "", "quiet": "", "dpi": "300", "image-quality": "100", "enable-smart-shrinking": "", "print-media-type": "" } config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH) pdf_path = os.path.join(output_dir, "summary.pdf") pdfkit.from_file( temp_html, pdf_path, configuration=config, options=options ) print(f"[输出] PDF报告已生成: {pdf_path}") finally: # 清理临时文件 if os.path.exists(temp_html): os.remove(temp_html) if os.path.exists(temp_img_dir): for f in os.listdir(temp_img_dir): os.remove(os.path.join(temp_img_dir, f)) os.rmdir(temp_img_dir) @classmethod def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str): """生成所有格式报告""" cls.generate_html(aligned_data, keyframes, output_dir) cls.generate_pdf(aligned_data, keyframes, output_dir) # ---------------------- 主流程控制 ---------------------- def main_process(): # 环境检查 processor = VideoProcessor() if not processor.check_ffmpeg(): return if not os.path.exists(VIDEO_PATH): print(f"[错误] 视频文件不存在: {VIDEO_PATH}") return # 关键帧提取 keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH) if not keyframes: print("[错误] 未提取到关键帧") return # 内容对齐 aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps) if not aligned_data: print("[警告] 未识别到有效语音内容") # 生成摘要 os.makedirs(OUTPUT_DIR, exist_ok=True) SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR) if __name__ == "__main__": main_process()