diff --git a/3.0/input3.mp4 b/3.0/input3.mp4 new file mode 100644 index 0000000..158f5b1 Binary files /dev/null and b/3.0/input3.mp4 differ diff --git a/3.0/summary.pdf b/3.0/summary.pdf new file mode 100644 index 0000000..0b9a6c5 Binary files /dev/null and b/3.0/summary.pdf differ diff --git a/3.0/毕设.py b/3.0/毕设.py new file mode 100644 index 0000000..70bf313 --- /dev/null +++ b/3.0/毕设.py @@ -0,0 +1,576 @@ +import cv2 +import numpy as np +from skimage.metrics import structural_similarity as ssim +from moviepy.editor import VideoFileClip +from PIL import Image +import os +from scipy.signal import find_peaks +import torch +from concurrent.futures import ThreadPoolExecutor, as_completed +import time +import whisper +from collections import defaultdict +import re +from reportlab.lib.pagesizes import A4 +from reportlab.pdfgen import canvas +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from reportlab.platypus import Paragraph, Image as RLImage +from reportlab.lib.units import inch +import threading +import pdfkit +from jinja2 import Environment +import io +import base64 + +# 全局配置 +SSIM_THRESHOLD = 0.85 # 关键帧去重阈值 +FRAME_INTERVAL = 2 # 抽帧间隔(秒) +TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列表 +WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" + + +class PPTSummarizer: + def __init__(self, video_path, output_dir=None): + self.video_path = video_path + # 如果没有指定输出目录,则使用默认目录 + if output_dir is None: + # 使用视频文件名作为输出目录名 + video_name = os.path.splitext(os.path.basename(video_path))[0] + self.output_dir = os.path.join("output", video_name) + else: + self.output_dir = output_dir + + self.frames = [] + self.key_frames = [] + self.text_content = [] + self.frame_timestamps = [] + self.aligned_data = [] + self.processing_complete = threading.Event() + + # 创建输出目录 + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + + # 初始化whisper模型 + self.whisper_model = whisper.load_model("tiny", device="cpu") + + def extract_frames(self): + """提取视频帧""" + try: + # 使用VideoFileClip替代cv2 + video = VideoFileClip(self.video_path) + duration = video.duration + fps = video.fps + + # 计算采样间隔 + sample_interval = max(1 / fps, FRAME_INTERVAL) + timestamps = np.arange(0, duration, sample_interval) + + print(f"开始提取帧,视频时长:{duration:.2f}秒,FPS:{fps}") + + # 提取帧 + for t in timestamps: + try: + frame = video.get_frame(t) + # 转换为BGR格式(OpenCV格式) + frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + self.frames.append(frame_bgr) + self.frame_timestamps.append(t) + except Exception as e: + print(f"提取帧 {t}s 时出错: {str(e)}") + continue + + video.close() + print(f"成功提取 {len(self.frames)} 帧") + return fps + + except Exception as e: + print(f"视频处理出错: {str(e)}") + return 0 + + def process_audio(self): + """处理音频""" + try: + print("开始语音识别...") + # 使用更大的模型以提高识别准确度 + result = self.whisper_model.transcribe( + self.video_path, + fp16=False, + language="zh", + task="transcribe", + verbose=True + ) + segments = result.get("segments", []) + print(f"语音识别完成,共识别出 {len(segments)} 个片段") + + # 打印识别结果 + for i, seg in enumerate(segments): + try: + print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: {seg['text']}") + except UnicodeEncodeError: + print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: [文本包含特殊字符]") + + # 生成页面时间段 + intervals = [] + for i in range(len(self.frame_timestamps)): + start = self.frame_timestamps[i] + end = self.frame_timestamps[i + 1] if i < len(self.frame_timestamps) - 1 else self.frame_timestamps[ + -1] + 1 + intervals.append((start, end)) + print(f"页面 {i + 1} 时间段: {start:.1f}s - {end:.1f}s") + + # 改进的对齐逻辑 + page_texts = defaultdict(list) + page_segments = defaultdict(list) # 存储每个页面对应的片段 + all_texts = [] # 存储所有文本片段 + used_texts = set() # 跟踪已使用的文本 + + # 第一步:收集所有文本片段 + for seg in segments: + try: + seg_start = seg["start"] + seg_end = seg["end"] + seg_text = seg["text"].strip() + all_texts.append((seg_start, seg_end, seg_text)) + except Exception as e: + print(f"处理片段时出错: {str(e)}") + continue + + # 第二步:将文本分配到各个页面 + for start, end, text in all_texts: + try: + # 找到与当前片段时间重叠的所有页面 + overlapping_pages = [] + for page_idx, (page_start, page_end) in enumerate(intervals): + if (start <= page_end and end >= page_start): + overlapping_pages.append((page_idx, page_start, page_end)) + + # 如果找到重叠页面,将文本添加到最合适的页面 + if overlapping_pages: + # 计算每个页面的重叠时间 + page_overlaps = [] + for page_idx, page_start, page_end in overlapping_pages: + overlap_start = max(start, page_start) + overlap_end = min(end, page_end) + overlap_duration = overlap_end - overlap_start + page_overlaps.append((page_idx, overlap_duration)) + + # 按重叠时间排序 + page_overlaps.sort(key=lambda x: x[1], reverse=True) + + # 将文本添加到重叠时间最长的页面 + best_page = page_overlaps[0][0] + if text not in used_texts: # 确保文本未被使用 + page_texts[best_page].append(text) + page_segments[best_page].append((start, end, text)) + used_texts.add(text) + print(f"将文本 '{text}' 添加到页面 {best_page + 1}") + except Exception as e: + print(f"分配文本时出错: {str(e)}") + continue + + # 第三步:优化每个页面的文本 + self.aligned_data = [] + for idx in range(len(intervals)): + try: + # 获取当前页面的所有片段 + segments = page_segments[idx] + + # 按时间排序 + segments.sort(key=lambda x: x[0]) + + # 合并相邻的相似文本 + merged_texts = [] + current_text = "" + last_end_time = 0 + + for start, end, text in segments: + # 如果当前文本为空,直接添加 + if not current_text: + current_text = text + last_end_time = end + continue + + # 计算时间间隔 + time_gap = start - last_end_time + + # 如果时间间隔小于3秒,合并文本 + if time_gap < 3.0: # 增加时间间隔阈值 + current_text += " " + text + else: + merged_texts.append(current_text) + current_text = text + + last_end_time = end + + # 添加最后一个文本 + if current_text: + merged_texts.append(current_text) + + # 合并所有文本 + final_text = " ".join(merged_texts) + + # 如果当前页面文本为空,尝试从前一页面获取 + if not final_text and idx > 0: + final_text = self.aligned_data[idx - 1]["text"] + + # 优化文本 + optimized_text = self.optimize_text(final_text) + + if optimized_text: + print(f"页面 {idx + 1} 的优化后文本内容: {optimized_text}") + + self.aligned_data.append({ + "page": idx, + "start_time": intervals[idx][0], + "end_time": intervals[idx][1], + "text": optimized_text + }) + except Exception as e: + print(f"处理页面 {idx + 1} 时出错: {str(e)}") + # 添加空数据 + self.aligned_data.append({ + "page": idx, + "start_time": intervals[idx][0], + "end_time": intervals[idx][1], + "text": "" + }) + + # 第四步:确保所有文本都被包含 + # 检查是否有遗漏的文本片段 + for start, end, text in all_texts: + try: + if text not in used_texts: + # 找到最后一个非空页面 + last_non_empty_page = -1 + for i in range(len(self.aligned_data) - 1, -1, -1): + if self.aligned_data[i]["text"]: + last_non_empty_page = i + break + + if last_non_empty_page >= 0: + self.aligned_data[last_non_empty_page]["text"] += " " + text + print(f"将遗漏的文本 '{text}' 添加到页面 {last_non_empty_page + 1}") + except Exception as e: + print(f"处理遗漏文本时出错: {str(e)}") + continue + + # 保存对齐数据到文件 + try: + with open(os.path.join(self.output_dir, 'aligned_data.txt'), 'w', encoding='utf-8') as f: + for data in self.aligned_data: + f.write(f"页面 {data['page'] + 1}:\n") + f.write(f"时间: {data['start_time']:.1f}s - {data['end_time']:.1f}s\n") + f.write(f"文本: {data['text']}\n\n") + print("对齐数据已保存到文件") + except Exception as e: + print(f"保存对齐数据时出错: {str(e)}") + + except Exception as e: + print(f"音频处理出错: {str(e)}") + self.aligned_data = [] + + def process_frames(self): + """处理视频帧""" + try: + print("开始处理视频帧...") + # 计算帧间相似度 + similarities = [] + for i in range(len(self.frames) - 1): + try: + frame1 = cv2.cvtColor(self.frames[i], cv2.COLOR_BGR2GRAY) + frame2 = cv2.cvtColor(self.frames[i + 1], cv2.COLOR_BGR2GRAY) + similarity = ssim(frame1, frame2) + similarities.append(similarity) + except Exception as e: + print(f"计算帧 {i} 相似度时出错: {str(e)}") + similarities.append(1.0) # 出错时假设帧相似 + + # 使用自适应阈值 + mean_similarity = np.mean(similarities) + std_similarity = np.std(similarities) + adaptive_threshold = mean_similarity - 2 * std_similarity + + # 使用峰值检测找到关键帧 + peaks, _ = find_peaks([1 - s for s in similarities], + height=1 - adaptive_threshold, + distance=int(len(similarities) / 20)) # 最小距离 + + # 保存关键帧 + for peak in peaks: + if not self.is_blank_frame(self.frames[peak]): + self.key_frames.append(self.frames[peak]) + + print(f"找到 {len(self.key_frames)} 个关键帧") + + except Exception as e: + print(f"处理视频帧时出错: {str(e)}") + + def is_blank_frame(self, frame, threshold=30): + """检测是否为空白帧""" + try: + # 转换为灰度图 + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + # 计算图像统计特征 + mean = np.mean(gray) + std_dev = np.std(gray) + + # 检查是否为纯黑或纯白 + is_black = mean < 10 and std_dev < 5 + is_white = mean > 245 and std_dev < 5 + + # 检查是否有足够的细节 + has_detail = std_dev > threshold + + return is_black or is_white or not has_detail + except Exception as e: + print(f"检查空白帧时出错: {str(e)}") + return True + + def optimize_text(self, text): + """文本优化""" + try: + if not text: + return "" + + # 过滤过渡词 + sentences = re.split(r'[。!?]', text) + filtered = [] + seen = set() + for sent in sentences: + sent = sent.strip() + if (len(sent) >= 10 + and not any(word in sent for word in TRANSITION_WORDS) + and sent not in seen): + filtered.append(sent) + seen.add(sent) + + result = '。'.join(filtered) + '。' if filtered else "" + if result: + print(f"优化后的文本: {result}") + return result + except Exception as e: + print(f"文本优化时出错: {str(e)}") + return text + + def save_results(self): + """保存结果""" + try: + # 检查输出目录权限 + if not os.access(self.output_dir, os.W_OK): + print(f"错误:没有写入权限: {self.output_dir}") + return + + # 生成PDF文档 + pdf_path = os.path.join(self.output_dir, 'summary.pdf') + + # 创建临时HTML文件 + temp_html = os.path.join(self.output_dir, "_temp_pdf.html") + temp_img_dir = os.path.join(self.output_dir, "_temp_pdf_images") + os.makedirs(temp_img_dir, exist_ok=True) + + try: + # 使用绝对路径 + abs_temp_img_dir = os.path.abspath(temp_img_dir) + + html_content = """ + + + + + + + +

PPT视频结构化摘要

+ {% for page in pages %} +
+

页面 {{ page.num }}

+
{{ page.time }}
+ 页面截图 +
{{ page.text }}
+
+ {% endfor %} + + + """ + + pages_data = [] + for idx, frame in enumerate(self.key_frames): + try: + img_filename = f"page_{idx}.jpg" + img_path = os.path.join(abs_temp_img_dir, img_filename) + + # 将numpy数组转换为PIL Image对象 + if len(frame.shape) == 3 and frame.shape[2] == 3: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + else: + frame_rgb = frame + + # 创建PIL Image对象 + img = Image.fromarray(frame_rgb) + + # 调整图片大小 + max_width = 800 + if img.width > max_width: + ratio = max_width / img.width + new_height = int(img.height * ratio) + img = img.resize((max_width, new_height), Image.Resampling.LANCZOS) + + # 保存图片 + img.save(img_path, format='JPEG', quality=85, optimize=True) + + # 获取从开始到当前帧的所有文本 + current_time = self.aligned_data[idx]['end_time'] + + # 收集从开始到当前时间点的所有文本 + texts = [] + for data in self.aligned_data: + if data['end_time'] <= current_time: + if data['text']: + texts.append(data['text']) + + # 合并文本 + combined_text = " ".join(texts) + + # 如果只有一帧关键帧,显示整个视频的所有文本 + if len(self.key_frames) == 1: + all_texts = [] + for data in self.aligned_data: + if data['text']: + all_texts.append(data['text']) + combined_text = " ".join(all_texts) + + # 添加file://前缀到图片路径 + img_path_with_prefix = f"file:///{img_path.replace(os.sep, '/')}" + + pages_data.append({ + "num": idx + 1, + "time": f"{self.aligned_data[idx]['start_time']:.1f}s - {self.aligned_data[idx]['end_time']:.1f}s", + "image_path": img_path_with_prefix, + "text": combined_text + }) + except Exception as e: + print(f"处理页面 {idx + 1} 时出错: {str(e)}") + continue + + # 生成HTML文件 + env = Environment() + template = env.from_string(html_content) + with open(temp_html, "w", encoding="utf-8") as f: + f.write(template.render(pages=pages_data)) + + # 设置PDF生成选项 + options = { + "enable-local-file-access": "", + "encoding": "UTF-8", + "margin-top": "15mm", + "margin-bottom": "15mm", + "margin-left": "15mm", + "margin-right": "15mm", + "quiet": "", + "print-media-type": "", + "page-size": "A4", + "orientation": "Portrait" + } + config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH) + + # 生成PDF + pdfkit.from_file( + temp_html, + pdf_path, + configuration=config, + options=options + ) + print(f"PDF已保存到: {pdf_path}") + + finally: + # 清理临时文件 + if os.path.exists(temp_html): + os.remove(temp_html) + if os.path.exists(temp_img_dir): + for f in os.listdir(temp_img_dir): + os.remove(os.path.join(temp_img_dir, f)) + os.rmdir(temp_img_dir) + + except Exception as e: + print(f"保存结果时出错: {str(e)}") + + def process(self): + """处理视频并生成摘要""" + try: + start_time = time.time() + print("开始处理视频...") + + # 提取视频帧 + self.extract_frames() + + # 创建线程池 + with ThreadPoolExecutor(max_workers=2) as executor: + # 提交音频处理和帧处理任务 + audio_future = executor.submit(self.process_audio) + frames_future = executor.submit(self.process_frames) + + # 等待两个任务完成 + audio_future.result() + frames_future.result() + + print("生成摘要...") + self.save_results() + + end_time = time.time() + print(f"处理完成!总耗时:{end_time - start_time:.2f}秒") + + except Exception as e: + print(f"处理过程出错: {str(e)}") + raise # 重新抛出异常以便调试 + + +if __name__ == "__main__": + # 使用示例 + video_path = "D:/python项目文件/1/input3.mp4" # 替换为实际的视频路径 + output_dir = "custom_output" # 自定义输出路径 + summarizer = PPTSummarizer(video_path, output_dir) + summarizer.process() \ No newline at end of file