import os import re import base64 import warnings import imageio import whisper import numpy as np import pdfkit from PIL import Image from skimage.metrics import structural_similarity as ssim from collections import defaultdict import subprocess from jinja2 import Environment import cv2 from scipy.signal import find_peaks from skimage.feature import hog from skimage.color import rgb2gray import concurrent.futures import threading import queue import time import gc from functools import lru_cache import multiprocessing import signal import sys # ======================== 全局配置 ======================== warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径 MODEL_DIR = "D:/whisper_models" # Whisper模型目录 FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径 WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径 SSIM_THRESHOLD = 0.85 # 关键帧去重阈值 FRAME_INTERVAL = 2 # 抽帧间隔(秒) OUTPUT_DIR = "D:\桌面文件\python\output1" # 输出目录 TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列 HOG_THRESHOLD = 0.7 # HOG特征相似度阈值 COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值 WHISPER_MODEL = "base" # Whisper模型大小 PROFESSIONAL_TERMS = { "人工智能": "AI", "机器学习": "ML", "深度学习": "DL", "神经网络": "NN", "卷积神经网络": "CNN", "循环神经网络": "RNN", "自然语言处理": "NLP", "计算机视觉": "CV", "大数据": "Big Data", "云计算": "Cloud Computing" } # 专业术语词典 # 性能优化配置 MAX_WORKERS = max(1, multiprocessing.cpu_count() - 1) # 并行处理的工作线程数 BATCH_SIZE = 10 # 批处理大小 CACHE_SIZE = 100 # 缓存大小 MEMORY_LIMIT = 0.8 # 内存使用限制(占总内存的比例) TIMEOUT_SECONDS = 300 # 操作超时时间(秒) PROGRESS_UPDATE_INTERVAL = 1 # 进度更新间隔(秒) # ======================================================== # 进度跟踪类 class ProgressTracker: def __init__(self, total_steps, description="处理中"): self.total_steps = total_steps self.current_step = 0 self.description = description self.start_time = time.time() self.last_update_time = self.start_time self._lock = threading.Lock() def update(self, step=1, message=None): with self._lock: self.current_step += step current_time = time.time() # 控制更新频率 if current_time - self.last_update_time >= PROGRESS_UPDATE_INTERVAL: elapsed = current_time - self.start_time progress = (self.current_step / self.total_steps) * 100 if message: print( f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps}) - {message}") else: print(f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps})") self.last_update_time = current_time def complete(self, message="完成"): with self._lock: elapsed = time.time() - self.start_time print(f"[完成] {self.description}: 100% - {message} (耗时: {elapsed:.1f}秒)") # 超时处理类 class TimeoutHandler: def __init__(self, timeout_seconds=TIMEOUT_SECONDS): self.timeout_seconds = timeout_seconds self.timer = None self._lock = threading.Lock() def start(self, operation_name): with self._lock: if self.timer: self.timer.cancel() self.timer = threading.Timer(self.timeout_seconds, self._timeout_callback, args=[operation_name]) self.timer.start() print(f"[信息] 开始{operation_name},超时时间: {self.timeout_seconds}秒") def stop(self): with self._lock: if self.timer: self.timer.cancel() self.timer = None def _timeout_callback(self, operation_name): print(f"[警告] {operation_name}操作超时,正在尝试恢复...") # 这里可以添加恢复逻辑 # ---------------------- 核心功能模块 ---------------------- class VideoProcessor: def __init__(self): os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"] self.frame_cache = {} self.feature_cache = {} self._lock = threading.Lock() self.timeout_handler = TimeoutHandler() @staticmethod def check_ffmpeg(): """验证FFmpeg可用性""" try: subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print("[系统] FFmpeg验证成功") return True except Exception as e: print(f"[错误] FFmpeg验证失败: {str(e)}") return False @lru_cache(maxsize=CACHE_SIZE) def calculate_color_histogram(self, frame_key): """计算颜色直方图特征(带缓存)""" frame = self.frame_cache.get(frame_key) if frame is None: return None hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) cv2.normalize(hist, hist) return hist.flatten() @lru_cache(maxsize=CACHE_SIZE) def calculate_hog_features(self, frame_key): """计算HOG特征(带缓存)""" frame = self.frame_cache.get(frame_key) if frame is None: return None gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) features = hog(gray, orientations=8, pixels_per_cell=(16, 16), cells_per_block=(1, 1), visualize=False) return features @staticmethod def is_ppt_transition(frame1, frame2): """检测PPT页面切换""" # 转换为灰度图 gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) # 计算边缘 edges1 = cv2.Canny(gray1, 100, 200) edges2 = cv2.Canny(gray2, 100, 200) # 计算边缘差异 diff = cv2.absdiff(edges1, edges2) return np.mean(diff) > 50 # 阈值可调整 @staticmethod def is_blank_frame(frame, threshold=30): """检测是否为无信息帧(纯黑屏或纯白屏)""" try: # 转换为灰度图 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 计算图像统计特征 mean = np.mean(gray) std_dev = np.std(gray) # 检查是否为纯黑或纯白 is_black = mean < 10 and std_dev < 5 is_white = mean > 245 and std_dev < 5 # 检查是否有足够的细节 has_detail = std_dev > threshold return is_black or is_white or not has_detail except Exception as e: print(f"[警告] 检查无信息帧时出错: {str(e)}") return True def process_frame_batch(self, frames_batch, start_idx): """处理一批帧""" results = [] for i, frame in enumerate(frames_batch): idx = start_idx + i frame_key = f"frame_{idx}" self.frame_cache[frame_key] = frame results.append((idx, frame)) return results def extract_keyframes(self, video_path: str) -> tuple: """提取去重关键帧及其时间戳(多特征融合,并行处理)""" try: self.timeout_handler.start("关键帧提取") reader = imageio.get_reader(video_path) fps = reader.get_meta_data()["fps"] total_frames = reader.count_frames() print(f"[信息] 视频总帧数: {total_frames}") keyframes = [] timestamps = [] prev_frame = None frame_count = 0 # 创建进度跟踪器 progress = ProgressTracker(total_frames, "关键帧提取") # 设置最后处理帧的阈值和超时 last_frames_threshold = 30 # 增加到30帧 last_frame_time = time.time() last_frame_timeout = 10 # 降低到10秒超时 # 批处理大小动态调整 current_batch_size = BATCH_SIZE # 使用队列存储结果 result_queue = queue.Queue() # 最后阶段的简化处理标志 simplified_processing = False # 使用线程池进行并行处理 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] frames_batch = [] batch_start_idx = 0 try: for idx, frame in enumerate(reader): # 更新进度 progress.update(1) # 检查是否接近结束 if idx >= total_frames - last_frames_threshold: if not simplified_processing: print("[信息] 进入最后阶段,启用简化处理模式") simplified_processing = True # 清理现有资源 self.frame_cache.clear() self.feature_cache.clear() gc.collect() current_time = time.time() if current_time - last_frame_time > last_frame_timeout: print(f"[警告] 处理最后{last_frames_threshold}帧时卡住,跳过剩余帧") # 强制处理当前批次 if frames_batch: future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx) futures.append(future) break # 在最后阶段使用最小批处理大小 current_batch_size = 1 last_frame_time = current_time curr_time = idx / fps if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL: continue # 检查是否为无信息帧(使用简化版本的检查) if self.is_blank_frame(frame, simplified=True): continue frames_batch.append(frame) # 当批次达到指定大小时提交处理 if len(frames_batch) >= current_batch_size: future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx) futures.append(future) batch_start_idx += len(frames_batch) frames_batch = [] # 及时清理完成的future self._clean_completed_futures(futures, result_queue) # 强制垃圾回收 if frame_count % 20 == 0: # 更频繁的垃圾回收 gc.collect() # 处理剩余的帧 if frames_batch: future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx) futures.append(future) # 等待所有future完成,但设置更短的超时 try: for future in concurrent.futures.as_completed(futures, timeout=15): try: batch_results = future.result(timeout=3) # 更短的超时 for idx, frame in batch_results: result_queue.put((idx, frame)) except Exception as e: print(f"[警告] 处理批次时出错: {str(e)}") except concurrent.futures.TimeoutError: print("[警告] 部分批次处理超时,继续处理已完成的结果") except Exception as e: print(f"[警告] 帧处理过程中出错: {str(e)}") finally: # 处理队列中的所有结果 while not result_queue.empty(): try: idx, frame = result_queue.get_nowait() curr_time = idx / fps # 使用简化版本的特征比较 if prev_frame is not None: try: if not self._is_frame_different(prev_frame, frame, simplified=True): continue except Exception as e: print(f"[警告] 特征比较失败: {str(e)}") continue keyframes.append(Image.fromarray(frame)) timestamps.append(curr_time) prev_frame = frame frame_count += 1 # 在最后阶段更频繁地清理资源 if simplified_processing and frame_count % 5 == 0: gc.collect() except queue.Empty: break reader.close() print(f"[图像] 关键帧提取完成,共{len(keyframes)}帧") # 清理资源 self.frame_cache.clear() self.feature_cache.clear() gc.collect() # 停止超时处理 self.timeout_handler.stop() progress.complete(f"提取了{len(keyframes)}个关键帧") return keyframes, timestamps except Exception as e: print(f"[错误] 关键帧提取失败: {str(e)}") self.timeout_handler.stop() return [], [] def _clean_completed_futures(self, futures, result_queue): """清理已完成的future并存储结果""" done = [] for future in futures: if future.done(): try: batch_results = future.result(timeout=1) for result in batch_results: result_queue.put(result) done.append(future) except Exception as e: print(f"[警告] 获取future结果时出错: {str(e)}") # 从futures列表中移除已完成的 for future in done: futures.remove(future) # 强制垃圾回收 if len(done) > 0: gc.collect() def _is_frame_different(self, frame1, frame2, simplified=False): """简化版本的帧差异检测""" if simplified: try: # 使用更简单的比较方法 gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) # 计算平均差异 diff = cv2.absdiff(gray1, gray2) mean_diff = np.mean(diff) # 如果差异小于阈值,认为帧相同 return mean_diff > 10 # 可调整的阈值 except Exception: return True else: # 完整的特征比较逻辑 return True # 默认认为不同,具体实现可以根据需要添加 def is_blank_frame(self, frame, simplified=False): """检测是否为无信息帧(支持简化版本)""" try: if simplified: # 简化版本:只检查亮度和方差 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) mean = np.mean(gray) std = np.std(gray) return mean < 10 or mean > 245 or std < 20 else: # 完整版本的检查逻辑 return super().is_blank_frame(frame) except Exception as e: print(f"[警告] 检查无信息帧时出错: {str(e)}") return True @staticmethod def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list: """语音识别与时间戳获取(支持中英文混合)""" try: # 创建进度跟踪器 progress = ProgressTracker(100, "语音识别") progress.update(10, "加载模型") # 使用更大的模型提高准确率 model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR) progress.update(20, "开始转写") # 配置转写参数 result = model.transcribe( video_path, fp16=False, language="zh", task="transcribe", verbose=True, initial_prompt="这是一段包含中英文的PPT讲解视频,可能包含专业术语。" ) progress.update(60, "处理转写结果") segments = result.get("segments", []) # 后处理:专业术语替换 for i, seg in enumerate(segments): text = seg["text"] for cn, en in PROFESSIONAL_TERMS.items(): text = text.replace(cn, f"{cn}({en})") seg["text"] = text progress.update(30 / len(segments), f"处理第{i + 1}/{len(segments)}个片段") progress.complete(f"识别了{len(segments)}个语音片段") return segments except Exception as e: print(f"[错误] 语音识别失败: {str(e)}") return [] # ---------------------- 业务逻辑模块 ---------------------- class ContentAligner: @staticmethod def generate_page_intervals(timestamps: list, duration: float) -> list: """生成页面时间段""" intervals = [] for i in range(len(timestamps)): start = timestamps[i] end = timestamps[i + 1] if i < len(timestamps) - 1 else duration intervals.append((start, end)) return intervals @staticmethod @lru_cache(maxsize=CACHE_SIZE) def calculate_text_similarity(text1: str, text2: str) -> float: """计算文本相似度(带缓存)""" # 使用简单的词重叠度计算 words1 = set(re.findall(r'\w+', text1.lower())) words2 = set(re.findall(r'\w+', text2.lower())) if not words1 or not words2: return 0.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) @staticmethod def _process_segment(seg, seg_start, intervals, all_segments): """处理单个语音片段(用于并行处理)""" # 首先尝试时间戳匹配 for page_idx, (start, end) in enumerate(intervals): if start <= seg_start < end: return page_idx, seg # 如果时间戳匹配失败,尝试文本相似度匹配 best_page = None best_score = 0.0 for page_idx, (start, end) in enumerate(intervals): # 获取该页面的所有文本 page_text = " ".join([s["text"] for s in all_segments if start <= s["start"] < end]) similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text) if similarity > best_score: best_score = similarity best_page = page_idx if best_page is not None: return best_page, seg return None @staticmethod def find_best_match(segments: list, intervals: list) -> dict: """为每个语音片段找到最佳匹配的页面(并行处理)""" page_texts = defaultdict(list) unmatched_segments = [] # 创建进度跟踪器 progress = ProgressTracker(len(segments), "内容对齐") # 使用线程池进行并行处理 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for seg in segments: seg_start = seg["start"] future = executor.submit(ContentAligner._process_segment, seg, seg_start, intervals, segments) futures.append(future) # 收集结果 for i, future in enumerate(concurrent.futures.as_completed(futures)): try: result = future.result() if result: page_idx, seg = result page_texts[page_idx].append(seg) else: unmatched_segments.append(seg) progress.update(1, f"处理第{i + 1}/{len(segments)}个片段") except Exception as e: print(f"[警告] 处理语音片段时出错: {str(e)}") # 处理未匹配的片段 if unmatched_segments: print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段") # 将未匹配片段添加到最近的页面 for seg in unmatched_segments: closest_page = min(range(len(intervals)), key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2)) page_texts[closest_page].append(seg) progress.complete(f"对齐了{len(segments)}个语音片段") return page_texts @staticmethod def align_content(video_path: str, timestamps: list) -> list: """语音-画面对齐主逻辑(改进版,并行处理)""" try: # 创建超时处理器 timeout_handler = TimeoutHandler() timeout_handler.start("内容对齐") reader = imageio.get_reader(video_path) duration = reader.get_meta_data()["duration"] reader.close() except: duration = timestamps[-1] + FRAME_INTERVAL segments = VideoProcessor.transcribe_audio(video_path) intervals = ContentAligner.generate_page_intervals(timestamps, duration) # 使用改进的匹配算法(并行处理) page_texts = ContentAligner.find_best_match(segments, intervals) # 生成最终的对齐数据 aligned_data = [] for idx in range(len(intervals)): text = " ".join([seg["text"] for seg in page_texts.get(idx, [])]) aligned_data.append({ "page": idx, "start_time": intervals[idx][0], "end_time": intervals[idx][1], "text": text }) # 停止超时处理 timeout_handler.stop() return aligned_data # ---------------------- 摘要生成模块 ---------------------- class SummaryGenerator: @staticmethod def optimize_text(text: str) -> str: """文本浓缩优化,过滤重复句子""" # 分割句子 sentences = re.split(r'[。!?]', text) filtered = [] seen = defaultdict(int) # 用于记录句子出现次数 # 预处理句子:去除空白字符,转换为小写 processed_sentences = [sent.strip().lower() for sent in sentences] # 过滤重复句子 for sent, processed_sent in zip(sentences, processed_sentences): sent = sent.strip() if (len(sent) >= 10 # 句子长度至少10个字符 and not any(word in sent for word in TRANSITION_WORDS) # 不包含过渡词 and seen[processed_sent] < 5): # 出现次数少于5次 filtered.append(sent) seen[processed_sent] += 1 # 如果过滤后没有句子,返回空字符串 if not filtered: return "" # 重新组合句子 return '。'.join(filtered) + '。' @staticmethod def generate_html(aligned_data: list, keyframes: list, output_dir: str): """生成HTML报告(并行处理)""" # 创建超时处理器 timeout_handler = TimeoutHandler() timeout_handler.start("HTML报告生成") # 创建进度跟踪器 progress = ProgressTracker(len(keyframes), "HTML报告生成") pages_data = [] temp_img_dir = os.path.join(output_dir, "_temp_images") os.makedirs(temp_img_dir, exist_ok=True) try: # 使用线程池进行并行处理 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for idx, frame in enumerate(keyframes): future = executor.submit(SummaryGenerator._process_frame, idx, frame, aligned_data, temp_img_dir) futures.append(future) # 收集结果 for i, future in enumerate(concurrent.futures.as_completed(futures)): try: result = future.result() if result: pages_data.append(result) progress.update(1, f"处理第{i + 1}/{len(keyframes)}个页面") except Exception as e: print(f"[警告] 处理帧时出错: {str(e)}") # 按页面顺序排序 pages_data.sort(key=lambda x: x["num"]) progress.update(10, "生成HTML模板") env = Environment() template = env.from_string(""" PPT视频摘要报告

PPT视频结构化摘要

{% for page in pages %}

页面 {{ page.num }}

{{ page.time }}
页面截图
{{ page.text }}
{% endfor %} """) progress.update(10, "保存HTML文件") output_path = os.path.join(output_dir, "summary.html") with open(output_path, "w", encoding="utf-8") as f: f.write(template.render(pages=pages_data)) print(f"[输出] HTML报告已生成: {output_path}") # 停止超时处理 timeout_handler.stop() progress.complete("HTML报告生成完成") finally: for f in os.listdir(temp_img_dir): os.remove(os.path.join(temp_img_dir, f)) os.rmdir(temp_img_dir) @staticmethod def _process_frame(idx, frame, aligned_data, temp_img_dir): """处理单个帧(用于并行处理)""" try: img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg") frame.save(img_path) with open(img_path, "rb") as f: img_data = base64.b64encode(f.read()).decode("utf-8") return { "num": idx + 1, "time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s", "image": f"data:image/jpeg;base64,{img_data}", "text": SummaryGenerator.optimize_text(aligned_data[idx]["text"]) } except Exception as e: print(f"[警告] 处理帧 {idx} 时出错: {str(e)}") return None @staticmethod def generate_pdf(aligned_data: list, keyframes: list, output_dir: str): """生成PDF报告(优化版,并行处理)""" # 创建超时处理器 timeout_handler = TimeoutHandler() timeout_handler.start("PDF报告生成") # 创建进度跟踪器 progress = ProgressTracker(len(keyframes) + 20, "PDF报告生成") temp_html = os.path.join(output_dir, "_temp_pdf.html") temp_img_dir = os.path.join(output_dir, "_temp_pdf_images") os.makedirs(temp_img_dir, exist_ok=True) try: # 使用绝对路径 abs_temp_img_dir = os.path.abspath(temp_img_dir) progress.update(5, "准备HTML模板") html_content = """

PPT视频结构化摘要

{% for page in pages %}
第 {{ page.num }} 页
时间区间:{{ page.time }}
页面截图
{{ page.text }}
{% endfor %} """ pages_data = [] # 使用线程池进行并行处理 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for idx, frame in enumerate(keyframes): future = executor.submit(SummaryGenerator._process_frame_for_pdf, idx, frame, aligned_data, abs_temp_img_dir) futures.append(future) # 收集结果 for i, future in enumerate(concurrent.futures.as_completed(futures)): try: result = future.result() if result: pages_data.append(result) progress.update(1, f"处理第{i + 1}/{len(keyframes)}个页面") except Exception as e: print(f"[警告] 处理帧时出错: {str(e)}") # 按页面顺序排序 pages_data.sort(key=lambda x: x["num"]) progress.update(5, "生成HTML文件") env = Environment() template = env.from_string(html_content) with open(temp_html, "w", encoding="utf-8") as f: f.write(template.render(pages=pages_data)) # PDF生成选项 progress.update(5, "配置PDF生成选项") options = { "enable-local-file-access": "", "encoding": "UTF-8", "margin-top": "20mm", "margin-bottom": "20mm", "margin-left": "20mm", "margin-right": "20mm", "no-stop-slow-scripts": "", "quiet": "", "dpi": "300", "image-quality": "100", "enable-smart-shrinking": "", "print-media-type": "" } config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH) progress.update(5, "生成PDF文件") pdf_path = os.path.join(output_dir, "summary.pdf") # 使用子进程生成PDF,设置超时 try: print("[信息] 尝试使用子进程生成PDF...") process = subprocess.Popen( [WKHTMLTOPDF_PATH, "--enable-local-file-access", temp_html, pdf_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # 等待进程完成,设置超时 try: print("[信息] 等待PDF生成进程完成...") stdout, stderr = process.communicate(timeout=60) if process.returncode != 0: print(f"[警告] PDF生成返回非零状态码: {process.returncode}") print(f"[警告] 错误输出: {stderr.decode('utf-8', errors='ignore')}") raise Exception(f"PDF生成失败,返回码: {process.returncode}") print("[信息] PDF生成进程完成") except subprocess.TimeoutExpired: print("[警告] PDF生成超时,终止进程") process.kill() print("[信息] 尝试使用备用方法") # 备用方法:使用pdfkit print("[信息] 使用pdfkit库生成PDF...") pdfkit.from_file( temp_html, pdf_path, configuration=config, options=options ) print("[信息] pdfkit生成PDF完成") except Exception as e: print(f"[警告] 使用子进程生成PDF失败: {str(e)}") # 备用方法:使用pdfkit print("[信息] 使用pdfkit库生成PDF...") try: pdfkit.from_file( temp_html, pdf_path, configuration=config, options=options ) print("[信息] pdfkit生成PDF完成") except Exception as e2: print(f"[错误] pdfkit生成PDF也失败: {str(e2)}") # 最后的备用方法:使用简化的HTML print("[信息] 尝试使用简化的HTML生成PDF...") try: # 创建一个简化的HTML文件 simple_html = os.path.join(output_dir, "_simple_pdf.html") with open(simple_html, "w", encoding="utf-8") as f: f.write(""" PPT视频摘要报告

PPT视频结构化摘要

""") for page in pages_data: f.write(f"""

页面 {page['num']}

时间区间:{page['time']}
{page['text']}
""") f.write("") # 使用简化的HTML生成PDF pdfkit.from_file( simple_html, pdf_path, configuration=config, options=options ) print("[信息] 使用简化HTML生成PDF完成") # 清理简化HTML if os.path.exists(simple_html): os.remove(simple_html) except Exception as e3: print(f"[错误] 所有PDF生成方法都失败: {str(e3)}") print("[警告] 无法生成PDF报告,请检查HTML报告") print(f"[输出] PDF报告已生成: {pdf_path}") # 停止超时处理 timeout_handler.stop() progress.complete("PDF报告生成完成") finally: # 清理临时文件 print("[信息] 清理临时文件...") try: if os.path.exists(temp_html): os.remove(temp_html) print("[信息] 已删除临时HTML文件") if os.path.exists(temp_img_dir): for f in os.listdir(temp_img_dir): try: os.remove(os.path.join(temp_img_dir, f)) except Exception as e: print(f"[警告] 删除临时图片文件失败: {str(e)}") try: os.rmdir(temp_img_dir) print("[信息] 已删除临时图片目录") except Exception as e: print(f"[警告] 删除临时图片目录失败: {str(e)}") except Exception as e: print(f"[警告] 清理临时文件时出错: {str(e)}") @classmethod def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str): """生成所有格式报告(并行处理)""" # 创建进度跟踪器 progress = ProgressTracker(2, "报告生成") # 使用线程池并行生成HTML和PDF with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: html_future = executor.submit(cls.generate_html, aligned_data, keyframes, output_dir) pdf_future = executor.submit(cls.generate_pdf, aligned_data, keyframes, output_dir) # 等待HTML生成完成 try: html_future.result(timeout=300) # 设置5分钟超时 progress.update(1, "HTML报告生成完成") except concurrent.futures.TimeoutError: print("[警告] HTML报告生成超时") except Exception as e: print(f"[警告] HTML报告生成出错: {str(e)}") # 等待PDF生成完成 try: pdf_future.result(timeout=300) # 设置5分钟超时 progress.update(1, "PDF报告生成完成") except concurrent.futures.TimeoutError: print("[警告] PDF报告生成超时") except Exception as e: print(f"[警告] PDF报告生成出错: {str(e)}") progress.complete("所有报告生成完成") @staticmethod def _process_frame_for_pdf(idx, frame, aligned_data, abs_temp_img_dir): """处理单个帧用于PDF生成(用于并行处理)""" try: img_filename = f"page_{idx}.jpg" img_path = os.path.join(abs_temp_img_dir, img_filename) frame.save(img_path) return { "num": idx + 1, "time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s", "image_path": img_path, "text": SummaryGenerator.optimize_text(aligned_data[idx]["text"]) } except Exception as e: print(f"[警告] 处理帧 {idx} 时出错: {str(e)}") return None # ---------------------- 主流程控制 ---------------------- def main_process(): # 环境检查 processor = VideoProcessor() if not processor.check_ffmpeg(): return if not os.path.exists(VIDEO_PATH): print(f"[错误] 视频文件不存在: {VIDEO_PATH}") return # 创建总进度跟踪器 total_progress = ProgressTracker(4, "总体进度") # 关键帧提取 total_progress.update(1, "开始关键帧提取") keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH) if not keyframes: print("[错误] 未提取到关键帧") return total_progress.update(1, "关键帧提取完成") # 内容对齐 total_progress.update(1, "开始内容对齐") aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps) if not aligned_data: print("[警告] 未识别到有效语音内容") total_progress.update(1, "内容对齐完成") # 生成摘要 print("[信息] 开始生成报告...") os.makedirs(OUTPUT_DIR, exist_ok=True) try: SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR) print("[信息] 报告生成完成") except Exception as e: print(f"[错误] 报告生成过程中出错: {str(e)}") total_progress.complete("处理完成") print("[完成] 所有处理已完成,请查看输出目录中的报告文件") if __name__ == "__main__": try: main_process() except KeyboardInterrupt: print("\n[中断] 用户中断了处理") except Exception as e: print(f"[错误] 程序执行过程中出现未处理的异常: {str(e)}") import traceback traceback.print_exc()