diff --git a/7.0/summary.pdf b/7.0/summary.pdf new file mode 100644 index 0000000..0b55f79 Binary files /dev/null and b/7.0/summary.pdf differ diff --git a/7.0/毕设.py b/7.0/毕设.py new file mode 100644 index 0000000..15fc6f8 --- /dev/null +++ b/7.0/毕设.py @@ -0,0 +1,1089 @@ +import os +import re +import base64 +import warnings +import imageio +import whisper +import numpy as np +import pdfkit +from PIL import Image +from skimage.metrics import structural_similarity as ssim +from collections import defaultdict +import subprocess +from jinja2 import Environment +import cv2 +from scipy.signal import find_peaks +from skimage.feature import hog +from skimage.color import rgb2gray +import concurrent.futures +import threading +import queue +import time +import gc +from functools import lru_cache +import multiprocessing +import signal +import sys + +# ======================== 全局配置 ======================== +warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") +VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径 +MODEL_DIR = "D:/whisper_models" # Whisper模型目录 +FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径 +WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径 +SSIM_THRESHOLD = 0.85 # 关键帧去重阈值 +FRAME_INTERVAL = 2 # 抽帧间隔(秒) +OUTPUT_DIR = "D:\桌面文件\python\output1" # 输出目录 +TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列 +HOG_THRESHOLD = 0.7 # HOG特征相似度阈值 +COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值 +WHISPER_MODEL = "base" # Whisper模型大小 +PROFESSIONAL_TERMS = { + "人工智能": "AI", + "机器学习": "ML", + "深度学习": "DL", + "神经网络": "NN", + "卷积神经网络": "CNN", + "循环神经网络": "RNN", + "自然语言处理": "NLP", + "计算机视觉": "CV", + "大数据": "Big Data", + "云计算": "Cloud Computing" +} # 专业术语词典 + +# 性能优化配置 +MAX_WORKERS = max(1, multiprocessing.cpu_count() - 1) # 并行处理的工作线程数 +BATCH_SIZE = 10 # 批处理大小 +CACHE_SIZE = 100 # 缓存大小 +MEMORY_LIMIT = 0.8 # 内存使用限制(占总内存的比例) +TIMEOUT_SECONDS = 300 # 操作超时时间(秒) +PROGRESS_UPDATE_INTERVAL = 1 # 进度更新间隔(秒) + + +# ======================================================== + +# 进度跟踪类 +class ProgressTracker: + def __init__(self, total_steps, description="处理中"): + self.total_steps = total_steps + self.current_step = 0 + self.description = description + self.start_time = time.time() + self.last_update_time = self.start_time + self._lock = threading.Lock() + + def update(self, step=1, message=None): + with self._lock: + self.current_step += step + current_time = time.time() + + # 控制更新频率 + if current_time - self.last_update_time >= PROGRESS_UPDATE_INTERVAL: + elapsed = current_time - self.start_time + progress = (self.current_step / self.total_steps) * 100 + + if message: + print( + f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps}) - {message}") + else: + print(f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps})") + + self.last_update_time = current_time + + def complete(self, message="完成"): + with self._lock: + elapsed = time.time() - self.start_time + print(f"[完成] {self.description}: 100% - {message} (耗时: {elapsed:.1f}秒)") + + +# 超时处理类 +class TimeoutHandler: + def __init__(self, timeout_seconds=TIMEOUT_SECONDS): + self.timeout_seconds = timeout_seconds + self.timer = None + self._lock = threading.Lock() + + def start(self, operation_name): + with self._lock: + if self.timer: + self.timer.cancel() + self.timer = threading.Timer(self.timeout_seconds, self._timeout_callback, args=[operation_name]) + self.timer.start() + print(f"[信息] 开始{operation_name},超时时间: {self.timeout_seconds}秒") + + def stop(self): + with self._lock: + if self.timer: + self.timer.cancel() + self.timer = None + + def _timeout_callback(self, operation_name): + print(f"[警告] {operation_name}操作超时,正在尝试恢复...") + # 这里可以添加恢复逻辑 + + +# ---------------------- 核心功能模块 ---------------------- +class VideoProcessor: + def __init__(self): + os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"] + self.frame_cache = {} + self.feature_cache = {} + self._lock = threading.Lock() + self.timeout_handler = TimeoutHandler() + + @staticmethod + def check_ffmpeg(): + """验证FFmpeg可用性""" + try: + subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print("[系统] FFmpeg验证成功") + return True + except Exception as e: + print(f"[错误] FFmpeg验证失败: {str(e)}") + return False + + @lru_cache(maxsize=CACHE_SIZE) + def calculate_color_histogram(self, frame_key): + """计算颜色直方图特征(带缓存)""" + frame = self.frame_cache.get(frame_key) + if frame is None: + return None + hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) + cv2.normalize(hist, hist) + return hist.flatten() + + @lru_cache(maxsize=CACHE_SIZE) + def calculate_hog_features(self, frame_key): + """计算HOG特征(带缓存)""" + frame = self.frame_cache.get(frame_key) + if frame is None: + return None + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + features = hog(gray, orientations=8, pixels_per_cell=(16, 16), + cells_per_block=(1, 1), visualize=False) + return features + + @staticmethod + def is_ppt_transition(frame1, frame2): + """检测PPT页面切换""" + # 转换为灰度图 + gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) + + # 计算边缘 + edges1 = cv2.Canny(gray1, 100, 200) + edges2 = cv2.Canny(gray2, 100, 200) + + # 计算边缘差异 + diff = cv2.absdiff(edges1, edges2) + return np.mean(diff) > 50 # 阈值可调整 + + @staticmethod + def is_blank_frame(frame, threshold=30): + """检测是否为无信息帧(纯黑屏或纯白屏)""" + try: + # 转换为灰度图 + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + # 计算图像统计特征 + mean = np.mean(gray) + std_dev = np.std(gray) + + # 检查是否为纯黑或纯白 + is_black = mean < 10 and std_dev < 5 + is_white = mean > 245 and std_dev < 5 + + # 检查是否有足够的细节 + has_detail = std_dev > threshold + + return is_black or is_white or not has_detail + except Exception as e: + print(f"[警告] 检查无信息帧时出错: {str(e)}") + return True + + def process_frame_batch(self, frames_batch, start_idx): + """处理一批帧""" + results = [] + for i, frame in enumerate(frames_batch): + idx = start_idx + i + frame_key = f"frame_{idx}" + self.frame_cache[frame_key] = frame + results.append((idx, frame)) + return results + + def extract_keyframes(self, video_path: str) -> tuple: + """提取去重关键帧及其时间戳(多特征融合,并行处理)""" + try: + self.timeout_handler.start("关键帧提取") + reader = imageio.get_reader(video_path) + fps = reader.get_meta_data()["fps"] + total_frames = reader.count_frames() + print(f"[信息] 视频总帧数: {total_frames}") + + keyframes = [] + timestamps = [] + prev_frame = None + frame_count = 0 + + # 创建进度跟踪器 + progress = ProgressTracker(total_frames, "关键帧提取") + + # 设置最后处理帧的阈值和超时 + last_frames_threshold = 30 # 增加到30帧 + last_frame_time = time.time() + last_frame_timeout = 10 # 降低到10秒超时 + + # 批处理大小动态调整 + current_batch_size = BATCH_SIZE + + # 使用队列存储结果 + result_queue = queue.Queue() + + # 最后阶段的简化处理标志 + simplified_processing = False + + # 使用线程池进行并行处理 + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = [] + frames_batch = [] + batch_start_idx = 0 + + try: + for idx, frame in enumerate(reader): + # 更新进度 + progress.update(1) + + # 检查是否接近结束 + if idx >= total_frames - last_frames_threshold: + if not simplified_processing: + print("[信息] 进入最后阶段,启用简化处理模式") + simplified_processing = True + # 清理现有资源 + self.frame_cache.clear() + self.feature_cache.clear() + gc.collect() + + current_time = time.time() + if current_time - last_frame_time > last_frame_timeout: + print(f"[警告] 处理最后{last_frames_threshold}帧时卡住,跳过剩余帧") + # 强制处理当前批次 + if frames_batch: + future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx) + futures.append(future) + break + + # 在最后阶段使用最小批处理大小 + current_batch_size = 1 + last_frame_time = current_time + + curr_time = idx / fps + if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL: + continue + + # 检查是否为无信息帧(使用简化版本的检查) + if self.is_blank_frame(frame, simplified=True): + continue + + frames_batch.append(frame) + + # 当批次达到指定大小时提交处理 + if len(frames_batch) >= current_batch_size: + future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx) + futures.append(future) + batch_start_idx += len(frames_batch) + frames_batch = [] + + # 及时清理完成的future + self._clean_completed_futures(futures, result_queue) + + # 强制垃圾回收 + if frame_count % 20 == 0: # 更频繁的垃圾回收 + gc.collect() + + # 处理剩余的帧 + if frames_batch: + future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx) + futures.append(future) + + # 等待所有future完成,但设置更短的超时 + try: + for future in concurrent.futures.as_completed(futures, timeout=15): + try: + batch_results = future.result(timeout=3) # 更短的超时 + for idx, frame in batch_results: + result_queue.put((idx, frame)) + except Exception as e: + print(f"[警告] 处理批次时出错: {str(e)}") + except concurrent.futures.TimeoutError: + print("[警告] 部分批次处理超时,继续处理已完成的结果") + + except Exception as e: + print(f"[警告] 帧处理过程中出错: {str(e)}") + finally: + # 处理队列中的所有结果 + while not result_queue.empty(): + try: + idx, frame = result_queue.get_nowait() + curr_time = idx / fps + + # 使用简化版本的特征比较 + if prev_frame is not None: + try: + if not self._is_frame_different(prev_frame, frame, simplified=True): + continue + except Exception as e: + print(f"[警告] 特征比较失败: {str(e)}") + continue + + keyframes.append(Image.fromarray(frame)) + timestamps.append(curr_time) + prev_frame = frame + frame_count += 1 + + # 在最后阶段更频繁地清理资源 + if simplified_processing and frame_count % 5 == 0: + gc.collect() + except queue.Empty: + break + + reader.close() + print(f"[图像] 关键帧提取完成,共{len(keyframes)}帧") + + # 清理资源 + self.frame_cache.clear() + self.feature_cache.clear() + gc.collect() + + # 停止超时处理 + self.timeout_handler.stop() + progress.complete(f"提取了{len(keyframes)}个关键帧") + + return keyframes, timestamps + except Exception as e: + print(f"[错误] 关键帧提取失败: {str(e)}") + self.timeout_handler.stop() + return [], [] + + def _clean_completed_futures(self, futures, result_queue): + """清理已完成的future并存储结果""" + done = [] + for future in futures: + if future.done(): + try: + batch_results = future.result(timeout=1) + for result in batch_results: + result_queue.put(result) + done.append(future) + except Exception as e: + print(f"[警告] 获取future结果时出错: {str(e)}") + + # 从futures列表中移除已完成的 + for future in done: + futures.remove(future) + + # 强制垃圾回收 + if len(done) > 0: + gc.collect() + + def _is_frame_different(self, frame1, frame2, simplified=False): + """简化版本的帧差异检测""" + if simplified: + try: + # 使用更简单的比较方法 + gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) + + # 计算平均差异 + diff = cv2.absdiff(gray1, gray2) + mean_diff = np.mean(diff) + + # 如果差异小于阈值,认为帧相同 + return mean_diff > 10 # 可调整的阈值 + except Exception: + return True + else: + # 完整的特征比较逻辑 + return True # 默认认为不同,具体实现可以根据需要添加 + + def is_blank_frame(self, frame, simplified=False): + """检测是否为无信息帧(支持简化版本)""" + try: + if simplified: + # 简化版本:只检查亮度和方差 + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + mean = np.mean(gray) + std = np.std(gray) + return mean < 10 or mean > 245 or std < 20 + else: + # 完整版本的检查逻辑 + return super().is_blank_frame(frame) + except Exception as e: + print(f"[警告] 检查无信息帧时出错: {str(e)}") + return True + + @staticmethod + def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list: + """语音识别与时间戳获取(支持中英文混合)""" + try: + # 创建进度跟踪器 + progress = ProgressTracker(100, "语音识别") + progress.update(10, "加载模型") + + # 使用更大的模型提高准确率 + model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR) + + progress.update(20, "开始转写") + + # 配置转写参数 + result = model.transcribe( + video_path, + fp16=False, + language="zh", + task="transcribe", + verbose=True, + initial_prompt="这是一段包含中英文的PPT讲解视频,可能包含专业术语。" + ) + + progress.update(60, "处理转写结果") + segments = result.get("segments", []) + + # 后处理:专业术语替换 + for i, seg in enumerate(segments): + text = seg["text"] + for cn, en in PROFESSIONAL_TERMS.items(): + text = text.replace(cn, f"{cn}({en})") + seg["text"] = text + progress.update(30 / len(segments), f"处理第{i + 1}/{len(segments)}个片段") + + progress.complete(f"识别了{len(segments)}个语音片段") + return segments + except Exception as e: + print(f"[错误] 语音识别失败: {str(e)}") + return [] + + +# ---------------------- 业务逻辑模块 ---------------------- +class ContentAligner: + @staticmethod + def generate_page_intervals(timestamps: list, duration: float) -> list: + """生成页面时间段""" + intervals = [] + for i in range(len(timestamps)): + start = timestamps[i] + end = timestamps[i + 1] if i < len(timestamps) - 1 else duration + intervals.append((start, end)) + return intervals + + @staticmethod + @lru_cache(maxsize=CACHE_SIZE) + def calculate_text_similarity(text1: str, text2: str) -> float: + """计算文本相似度(带缓存)""" + # 使用简单的词重叠度计算 + words1 = set(re.findall(r'\w+', text1.lower())) + words2 = set(re.findall(r'\w+', text2.lower())) + if not words1 or not words2: + return 0.0 + intersection = words1.intersection(words2) + union = words1.union(words2) + return len(intersection) / len(union) + + @staticmethod + def _process_segment(seg, seg_start, intervals, all_segments): + """处理单个语音片段(用于并行处理)""" + # 首先尝试时间戳匹配 + for page_idx, (start, end) in enumerate(intervals): + if start <= seg_start < end: + return page_idx, seg + + # 如果时间戳匹配失败,尝试文本相似度匹配 + best_page = None + best_score = 0.0 + + for page_idx, (start, end) in enumerate(intervals): + # 获取该页面的所有文本 + page_text = " ".join([s["text"] for s in all_segments if start <= s["start"] < end]) + similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text) + if similarity > best_score: + best_score = similarity + best_page = page_idx + + if best_page is not None: + return best_page, seg + return None + + @staticmethod + def find_best_match(segments: list, intervals: list) -> dict: + """为每个语音片段找到最佳匹配的页面(并行处理)""" + page_texts = defaultdict(list) + unmatched_segments = [] + + # 创建进度跟踪器 + progress = ProgressTracker(len(segments), "内容对齐") + + # 使用线程池进行并行处理 + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = [] + + for seg in segments: + seg_start = seg["start"] + future = executor.submit(ContentAligner._process_segment, seg, seg_start, intervals, segments) + futures.append(future) + + # 收集结果 + for i, future in enumerate(concurrent.futures.as_completed(futures)): + try: + result = future.result() + if result: + page_idx, seg = result + page_texts[page_idx].append(seg) + else: + unmatched_segments.append(seg) + progress.update(1, f"处理第{i + 1}/{len(segments)}个片段") + except Exception as e: + print(f"[警告] 处理语音片段时出错: {str(e)}") + + # 处理未匹配的片段 + if unmatched_segments: + print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段") + # 将未匹配片段添加到最近的页面 + for seg in unmatched_segments: + closest_page = min(range(len(intervals)), + key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2)) + page_texts[closest_page].append(seg) + + progress.complete(f"对齐了{len(segments)}个语音片段") + return page_texts + + @staticmethod + def align_content(video_path: str, timestamps: list) -> list: + """语音-画面对齐主逻辑(改进版,并行处理)""" + try: + # 创建超时处理器 + timeout_handler = TimeoutHandler() + timeout_handler.start("内容对齐") + + reader = imageio.get_reader(video_path) + duration = reader.get_meta_data()["duration"] + reader.close() + except: + duration = timestamps[-1] + FRAME_INTERVAL + + segments = VideoProcessor.transcribe_audio(video_path) + intervals = ContentAligner.generate_page_intervals(timestamps, duration) + + # 使用改进的匹配算法(并行处理) + page_texts = ContentAligner.find_best_match(segments, intervals) + + # 生成最终的对齐数据 + aligned_data = [] + for idx in range(len(intervals)): + text = " ".join([seg["text"] for seg in page_texts.get(idx, [])]) + aligned_data.append({ + "page": idx, + "start_time": intervals[idx][0], + "end_time": intervals[idx][1], + "text": text + }) + + # 停止超时处理 + timeout_handler.stop() + + return aligned_data + + +# ---------------------- 摘要生成模块 ---------------------- +class SummaryGenerator: + @staticmethod + def optimize_text(text: str) -> str: + """文本浓缩优化,过滤重复句子""" + # 分割句子 + sentences = re.split(r'[。!?]', text) + filtered = [] + seen = defaultdict(int) # 用于记录句子出现次数 + + # 预处理句子:去除空白字符,转换为小写 + processed_sentences = [sent.strip().lower() for sent in sentences] + + # 过滤重复句子 + for sent, processed_sent in zip(sentences, processed_sentences): + sent = sent.strip() + if (len(sent) >= 10 # 句子长度至少10个字符 + and not any(word in sent for word in TRANSITION_WORDS) # 不包含过渡词 + and seen[processed_sent] < 5): # 出现次数少于5次 + filtered.append(sent) + seen[processed_sent] += 1 + + # 如果过滤后没有句子,返回空字符串 + if not filtered: + return "" + + # 重新组合句子 + return '。'.join(filtered) + '。' + + @staticmethod + def generate_html(aligned_data: list, keyframes: list, output_dir: str): + """生成HTML报告(并行处理)""" + # 创建超时处理器 + timeout_handler = TimeoutHandler() + timeout_handler.start("HTML报告生成") + + # 创建进度跟踪器 + progress = ProgressTracker(len(keyframes), "HTML报告生成") + + pages_data = [] + temp_img_dir = os.path.join(output_dir, "_temp_images") + os.makedirs(temp_img_dir, exist_ok=True) + + try: + # 使用线程池进行并行处理 + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = [] + + for idx, frame in enumerate(keyframes): + future = executor.submit(SummaryGenerator._process_frame, idx, frame, aligned_data, temp_img_dir) + futures.append(future) + + # 收集结果 + for i, future in enumerate(concurrent.futures.as_completed(futures)): + try: + result = future.result() + if result: + pages_data.append(result) + progress.update(1, f"处理第{i + 1}/{len(keyframes)}个页面") + except Exception as e: + print(f"[警告] 处理帧时出错: {str(e)}") + + # 按页面顺序排序 + pages_data.sort(key=lambda x: x["num"]) + + progress.update(10, "生成HTML模板") + env = Environment() + template = env.from_string(""" + + +
+ +