diff --git a/终版2/毕设.py b/终版2/毕设.py new file mode 100644 index 0000000..31e97e1 --- /dev/null +++ b/终版2/毕设.py @@ -0,0 +1,1500 @@ +import os +import re +import base64 +import warnings +import sys # Add sys import for debugging +import ctypes # For short path conversion +from ctypes import wintypes # For short path conversion +# import opencc # <-- opencc 导入会保留,但路径处理提前 + +# --- Manually add D:\Lib\site-packages to sys.path --- +site_packages_path = r'D:\Lib\site-packages' # Corrected path separator +if site_packages_path not in sys.path: + sys.path.append(site_packages_path) +# --- End of manual addition --- + +# --- Manually add path for opencc if needed --- +opencc_site_packages_path = r'C:\Users\86138\AppData\Roaming\Python\Python311\site-packages' # Corrected path separator +if opencc_site_packages_path not in sys.path: + sys.path.insert(0, opencc_site_packages_path) # Insert at the beginning + print(f"--- Debug --- Added to sys.path for opencc: {opencc_site_packages_path}") +# --- End of manual addition for opencc --- + +# Now try importing opencc +import opencc + +print(f"--- Debug --- Attempting to import imageio in: {__file__}") # Debug print +print(f"--- Debug --- Python executable: {sys.executable}") # Debug print +print(f"--- Debug --- sys.path AFTER manual add: {sys.path}") # Debug print, note the change in message +try: # Debug block + import imageio as test_imageio_module + print(f"--- Debug --- Found 'imageio' at: {test_imageio_module.__file__}") + print(f"--- Debug --- Version of 'imageio': {test_imageio_module.__version__}") +except ImportError as e: + print(f"--- Debug --- ImportError for imageio: {e}") +except AttributeError: # Handle cases where __file__ or __version__ might be missing + print(f"--- Debug --- Found 'imageio', but cannot get __file__ or __version__.") + +# The original import line +import imageio +import whisper +import numpy as np +from PIL import Image +from skimage.metrics import structural_similarity as ssim +from collections import defaultdict +import subprocess +from jinja2 import Environment +import cv2 +from scipy.signal import find_peaks +from skimage.feature import hog +from skimage.color import rgb2gray +import concurrent.futures +import threading +import queue +import time +import gc +from functools import lru_cache +import multiprocessing +import signal +import traceback +import logging +import json +import shutil +import importlib + +# 导入补丁模块 - 用于解决wkhtmltopdf依赖问题 +try: + import pdfkit_patch as pdfkit + logging.info("已加载pdfkit补丁模块") +except ImportError: + logging.info("未找到pdfkit补丁模块,PDF生成功能可能不可用") + +# 设置环境变量,使用 OpenBLAS +os.environ['OPENBLAS_NUM_THREADS'] = '1' +os.environ['MKL_NUM_THREADS'] = '1' +os.environ['NUMEXPR_NUM_THREADS'] = '1' +os.environ['OMP_NUM_THREADS'] = '1' + +# FFmpeg路径配置 +FFMPEG_BIN = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ffmpeg", "bin") +print(f"--- Debug --- FFMPEG_BIN calculated as: {repr(FFMPEG_BIN)}") # DEBUG LINE ADDED +if not os.path.exists(FFMPEG_BIN): + FFMPEG_BIN = "" # 如果目录不存在,使用系统环境变量中的FFmpeg + print(f"--- Debug --- FFMPEG_BIN reset to empty string because path does not exist.") # DEBUG LINE ADDED + +# 配置日志 +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('process.log', encoding='utf-8'), + logging.StreamHandler() + ] +) + +# Helper function for short path +def get_short_path_name(long_name): + """Gets the short path name of a given long path.""" + output_buf_size = 0 + # First, call GetShortPathNameW with a null buffer to get the required buffer size. + # We expect this to fail and return the size. + output_buf_size = ctypes.windll.kernel32.GetShortPathNameW(long_name, None, 0) + if output_buf_size == 0: + # An error occurred, perhaps the path doesn't exist or another issue. + print(f"--- Debug --- GetShortPathNameW failed to get buffer size for: {long_name}, error code: {ctypes.GetLastError()}") + return long_name # Return original name if conversion fails + + output_buf = ctypes.create_unicode_buffer(output_buf_size) + needed = ctypes.windll.kernel32.GetShortPathNameW(long_name, output_buf, output_buf_size) + + if needed == 0: + print(f"--- Debug --- GetShortPathNameW failed to convert: {long_name}, error code: {ctypes.GetLastError()}") + return long_name # Return original name if conversion fails + else: + return output_buf.value + +def check_dependencies(): + try: + # 检查FFmpeg + try: + subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) + logging.info("FFmpeg 检查通过") + except Exception as e: + logging.error(f"FFmpeg 检查失败: {str(e)}") + return False + + # 检查OpenCV + try: + import cv2 + logging.info("OpenCV 检查通过") + except Exception as e: + logging.error(f"OpenCV 检查失败: {str(e)}") + return False + + # 检查Whisper + try: + import whisper + logging.info("Whisper 检查通过") + except Exception as e: + logging.error(f"Whisper 检查失败: {str(e)}") + return False + + # 注意: wkhtmltopdf检查已禁用 + # 使用pdfkit_patch模块解决wkhtmltopdf依赖问题 + logging.info("wkhtmltopdf检查已禁用,仅生成HTML报告") + + logging.info("所有依赖项检查通过") + return True + except Exception as e: + logging.error(f"依赖项检查失败: {str(e)}") + return False + +# ======================== 全局配置 ======================== +warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") + +# 使用相对路径 +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +MODEL_DIR = os.path.join(BASE_DIR, "models") +OUTPUT_DIR = os.path.join(BASE_DIR, "output") + +# 创建必要的目录 +os.makedirs(MODEL_DIR, exist_ok=True) +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# 其他配置保持不变 +SSIM_THRESHOLD = 0.85 # 关键帧去重阈值 +FRAME_INTERVAL = 2 # 抽帧间隔(秒) +TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列 +HOG_THRESHOLD = 0.7 # HOG特征相似度阈值 +COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值 +WHISPER_MODEL = "small" # Whisper模型大小 +PROFESSIONAL_TERMS = { + "人工智能": "AI", + "机器学习": "ML", + "深度学习": "DL", + "神经网络": "NN", + "卷积神经网络": "CNN", + "循环神经网络": "RNN", + "自然语言处理": "NLP", + "计算机视觉": "CV", + "大数据": "Big Data", + "云计算": "Cloud Computing" +} # 专业术语词典 + +# 性能优化配置 +MAX_WORKERS = max(1, multiprocessing.cpu_count() - 1) # 并行处理的工作线程数 +BATCH_SIZE = 15 # 增加批处理大小 +CACHE_SIZE = 150 # 增加缓存大小 +MEMORY_LIMIT = 0.8 # 内存使用限制(占总内存的比例) +TIMEOUT_SECONDS = 200 # 减少超时时间以加速处理流程 +PROGRESS_UPDATE_INTERVAL = 1 # 进度更新间隔(秒) +MAX_KEYFRAMES = 30 # 最大关键帧数量限制,超过此数量将进行抽样 + + +# ======================================================== + +# 进度跟踪类 +class ProgressTracker: + def __init__(self, total_steps, description="处理中"): + self.total_steps = total_steps + self.current_step = 0 + self.description = description + self.start_time = time.time() + self.last_update_time = self.start_time + self._lock = threading.Lock() + + def update(self, step=1, message=None): + with self._lock: + self.current_step += step + current_time = time.time() + + # 控制更新频率 + if current_time - self.last_update_time >= PROGRESS_UPDATE_INTERVAL: + elapsed = current_time - self.start_time + progress = (self.current_step / self.total_steps) * 100 + + if message: + print( + f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps}) - {message}") + else: + print(f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps})") + + self.last_update_time = current_time + + def complete(self, message="完成"): + with self._lock: + elapsed = time.time() - self.start_time + print(f"[完成] {self.description}: 100% - {message} (耗时: {elapsed:.1f}秒)") + + +# 超时处理类 +class TimeoutHandler: + def __init__(self, timeout_seconds=TIMEOUT_SECONDS): + self.timeout_seconds = timeout_seconds + self.timer = None + self._lock = threading.Lock() + + def start(self, operation_name): + with self._lock: + if self.timer: + self.timer.cancel() + self.timer = threading.Timer(self.timeout_seconds, self._timeout_callback, args=[operation_name]) + self.timer.start() + print(f"[信息] 开始{operation_name},超时时间: {self.timeout_seconds}秒") + + def stop(self): + with self._lock: + if self.timer: + self.timer.cancel() + self.timer = None + + def _timeout_callback(self, operation_name): + print(f"[警告] {operation_name}操作超时,正在尝试恢复...") + # 这里可以添加恢复逻辑 + + +# ---------------------- 核心功能模块 ---------------------- +class VideoProcessor: + def __init__(self): + # os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"] # COMMENTED OUT/MODIFIED + if FFMPEG_BIN: # Only set if FFMPEG_BIN is not empty + ffmpeg_exe_path = os.path.join(FFMPEG_BIN, 'ffmpeg.exe') + print(f"--- Debug VideoProcessor --- Setting IMAGEIO_FFMPEG_EXE to: {repr(ffmpeg_exe_path)}") # DEBUG LINE ADDED + os.environ['IMAGEIO_FFMPEG_EXE'] = ffmpeg_exe_path + else: + print("--- Debug VideoProcessor --- FFMPEG_BIN is empty, not setting IMAGEIO_FFMPEG_EXE.") # DEBUG LINE ADDED + self.frame_cache = {} + self.feature_cache = {} + self._lock = threading.Lock() + self.timeout_handler = TimeoutHandler() + + @staticmethod + def check_ffmpeg(): + """验证FFmpeg可用性""" + try: + subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print("[系统] FFmpeg验证成功") + return True + except Exception as e: + print(f"[错误] FFmpeg验证失败: {str(e)}") + return False + + @lru_cache(maxsize=CACHE_SIZE) + def calculate_color_histogram(self, frame_key): + """计算颜色直方图特征(带缓存)""" + frame = self.frame_cache.get(frame_key) + if frame is None: + return None + hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) + cv2.normalize(hist, hist) + return hist.flatten() + + @lru_cache(maxsize=CACHE_SIZE) + def calculate_hog_features(self, frame_key): + """计算HOG特征(带缓存)""" + frame = self.frame_cache.get(frame_key) + if frame is None: + return None + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + features = hog(gray, orientations=8, pixels_per_cell=(16, 16), + cells_per_block=(1, 1), visualize=False) + return features + + @staticmethod + def is_ppt_transition(frame1, frame2): + """检测PPT页面切换""" + # 转换为灰度图 + gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) + + # 计算边缘 + edges1 = cv2.Canny(gray1, 100, 200) + edges2 = cv2.Canny(gray2, 100, 200) + + # 计算边缘差异 + diff = cv2.absdiff(edges1, edges2) + return np.mean(diff) > 50 # 阈值可调整 + + @staticmethod + def is_blank_frame(frame, threshold=30): + """检测是否为无信息帧(纯黑屏或纯白屏)""" + try: + # 转换为灰度图 + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + # 计算图像统计特征 + mean = np.mean(gray) + std_dev = np.std(gray) + + # 检查是否为纯黑或纯白 + is_black = mean < 10 and std_dev < 5 + is_white = mean > 245 and std_dev < 5 + + # 检查是否有足够的细节 + has_detail = std_dev > threshold + + return is_black or is_white or not has_detail + except Exception as e: + print(f"[警告] 检查无信息帧时出错: {str(e)}") + return True + + def process_frame_batch(self, frames_batch, start_idx): + """处理一批帧""" + results = [] + for i, frame in enumerate(frames_batch): + idx = start_idx + i + frame_key = f"frame_{idx}" + self.frame_cache[frame_key] = frame + results.append((idx, frame)) + return results + + def extract_keyframes(self, video_path: str) -> tuple: + """提取去重关键帧及其时间戳(多特征融合,并行处理)""" + cap = None + try: + self.timeout_handler.start("关键帧提取") + + logging.info(f"[Debug extract_keyframes] Original video_path type: {type(video_path)}, value: {repr(video_path)}") + + # --- OpenCV VideoCapture for reading frames --- + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + logging.error(f"OpenCV: Failed to open video file: {video_path}") + raise ValueError(f"无法打开视频文件: {video_path}") + + # 获取视频元数据 + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps if fps > 0 else 0 + + if total_frames <= 0 or fps <= 0 or duration <= 0: + error_message = f"视频元数据读取不完整或无效: fps={fps}, total_frames={total_frames}, duration={duration}。" + logging.error(error_message) + raise ValueError(error_message) + + print(f"[信息] 视频元数据: 总帧数={total_frames}, 时长={duration:.2f}秒, FPS={fps:.2f}") + + keyframes = [] + timestamps = [] + prev_frame_bgr = None + prev_frame_gray = None + frame_count = 0 + + progress = ProgressTracker(total_frames, "关键帧提取 (OpenCV)") + + # 根据视频长度调整采样间隔 + is_short_video = duration < 30 + if is_short_video: + sample_interval_frames = max(int(fps * 0.5), 1) # 短视频每0.5秒采样一次 + else: + sample_interval_frames = max(int(fps * 1.0), 1) # 长视频每1秒采样一次 + + logging.info(f"使用采样间隔: {sample_interval_frames}帧 (约{sample_interval_frames/fps:.1f}秒/帧)") + + while cap.isOpened(): + ret, frame_bgr = cap.read() + if not ret: + break + + frame_count += 1 + progress.update(1) + + # 按采样间隔处理帧 + if frame_count % sample_interval_frames != 0: + continue + + # 检查是否为空白帧 + if self.is_blank_frame(frame_bgr, simplified=True): + continue + + # 转换为灰度图用于SSIM计算 + frame_gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + + # 与前一帧比较 + if prev_frame_gray is not None: + # 计算SSIM + ssim_value = ssim(prev_frame_gray, frame_gray) + logging.debug(f"[Debug SSIM] Frame {frame_count}, SSIM: {ssim_value:.4f}") + if ssim_value > 0.95: + logging.info(f"[Debug SSIM] Frame {frame_count} is too similar to previous frame, skipping.") + continue + + # 保存关键帧 + frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) + frame_pil = Image.fromarray(frame_rgb) + keyframes.append(frame_pil) + timestamps.append(frame_count / fps) + + # 更新前一帧 + prev_frame_bgr = frame_bgr.copy() + prev_frame_gray = frame_gray.copy() + + # 如果没有提取到任何关键帧,进行强制采样 + if not keyframes: + logging.warning("自动提取关键帧为0,进行强制均匀采样") + cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # 重置到视频开始 + frame_count = 0 + + # 强制采样间隔 + force_sample_interval = max(total_frames // 10, 1) # 至少提取10帧 + + while cap.isOpened(): + ret, frame_bgr = cap.read() + if not ret: + break + + frame_count += 1 + if frame_count % force_sample_interval == 0: + frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) + frame_pil = Image.fromarray(frame_rgb) + keyframes.append(frame_pil) + timestamps.append(frame_count / fps) + + if not keyframes: + raise RuntimeError("关键帧提取失败:未能提取到任何关键帧") + + print(f"[图像] 关键帧提取完成 (OpenCV),共{len(keyframes)}帧") + self.timeout_handler.stop() + progress.complete(f"提取了{len(keyframes)}个关键帧 (OpenCV)") + + return keyframes, duration + + except Exception as e: + logging.error(f"[错误] 关键帧提取失败 (OpenCV流程): {str(e)}") + logging.error(traceback.format_exc()) + self.timeout_handler.stop() + return [], 0.0 + finally: + if cap and cap.isOpened(): + cap.release() + logging.info("OpenCV VideoCapture released in finally block.") + + def _is_frame_different(self, frame1_bgr_np, frame2_bgr_np, simplified=False, threshold=0.8): + """简化版本的帧差异检测. Expects BGR NumPy arrays from OpenCV.""" + if simplified: + try: + gray1 = cv2.cvtColor(frame1_bgr_np, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(frame2_bgr_np, cv2.COLOR_BGR2GRAY) + diff = cv2.absdiff(gray1, gray2) + mean_diff = np.mean(diff) + + # 降低阈值,使检测更敏感 + required_mean_diff = threshold * 3.0 # 从7.5降低到3.0 + logging.debug(f"[Debug Diff] mean_diff: {mean_diff:.2f}, required_mean_diff (threshold={threshold:.2f}*3.0): {required_mean_diff:.2f}") + return mean_diff > required_mean_diff + except Exception as e_diff_internal: + logging.error(f"Error in _is_frame_different simplified: {e_diff_internal}") + return True # On error, assume different to avoid losing a frame + else: + # 完整的特征比较逻辑 (当前未被调用,因为 simplified=True) + logging.warning("_is_frame_different called with simplified=False, but non-simplified path is not fully implemented.") + return True + + def is_blank_frame(self, frame_bgr_np, simplified=False, threshold=20): + """检测是否为无信息帧(支持简化版本). Expects BGR NumPy array from OpenCV.""" + try: + gray = cv2.cvtColor(frame_bgr_np, cv2.COLOR_BGR2GRAY) + mean = np.mean(gray) + std_dev = np.std(gray) + + if simplified: # Simplified version for main loop + # 调整阈值,使其更宽松 + is_black = mean < 40 and std_dev < 20 # 从35/15调整到40/20 + is_white = mean > 215 and std_dev < 20 # 从220/15调整到215/20 + + # 降低细节检测阈值 + has_enough_detail = std_dev >= threshold * 0.8 # 降低阈值要求 + + is_actually_blank = (is_black or is_white) or not has_enough_detail + logging.debug(f"[Debug BlankS] mean: {mean:.2f}, std_dev: {std_dev:.2f}, threshold: {threshold}, is_black: {is_black}, is_white: {is_white}, has_enough_detail: {has_enough_detail}, result: {is_actually_blank}") + return is_actually_blank + else: + # Original more complex logic (currently not used by main path) + is_black_orig = mean < 10 and std_dev < 5 + is_white_orig = mean > 245 and std_dev < 5 + has_detail_orig = std_dev > threshold + is_actually_blank_orig = (is_black_orig or is_white_orig) or not has_detail_orig + logging.debug(f"[Debug BlankNS] mean: {mean:.2f}, std_dev: {std_dev:.2f}, threshold: {threshold}, is_black: {is_black_orig}, is_white: {is_white_orig}, has_detail: {has_detail_orig}, result: {is_actually_blank_orig}") + return is_actually_blank_orig + except Exception as e_blank_internal: + print(f"[警告] 检查无信息帧时出错 (OpenCV): {str(e_blank_internal)}") + logging.error(f"Error in is_blank_frame: {e_blank_internal}") + return False # On error, assume NOT blank to avoid wrongly discarding a frame + + @staticmethod + def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list: + """语音识别与时间戳获取(支持中英文混合,通过语言自动检测,并转换为简体中文)""" + try: + # 创建进度跟踪器 + progress = ProgressTracker(100, "语音识别") + progress.update(10, "加载模型") + + # 使用更大的模型提高准确率 + model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR) + + progress.update(20, "开始转写") + logging.info(f"[Whisper] Starting transcription for: {video_path} with model: {model_name}. Language auto-detection ON.") + + result = model.transcribe( + video_path, + fp16=False, + task="transcribe", + verbose=True, + initial_prompt=None + ) + + detected_language = result.get("language", "unknown") + logging.info(f"[Whisper] Transcription complete. Detected language: {detected_language}") + + progress.update(60, f"处理转写结果 (语言: {detected_language})") + segments = result.get("segments", []) + + if detected_language == 'zh': + logging.info("[Whisper] 检测到中文,将进行繁体到简体转换。") + try: + # 尝试不带 .json 后缀初始化,让库自行查找标准配置文件 + converter = opencc.OpenCC('t2s') # <--- 修改点:移除了 .json + for i, seg in enumerate(segments): + original_text = seg['text'] + simplified_text = converter.convert(original_text) + if original_text != simplified_text: + logging.debug(f"[OpenCC] 片段 {i} 转换: '{original_text[:30]}...' -> '{simplified_text[:30]}...'") + seg['text'] = simplified_text + logging.info("[OpenCC] 繁体到简体转换完成。") + except Exception as e_opencc: + logging.error(f"[OpenCC] 繁体到简体转换失败: {e_opencc}。将使用原始转录文本。") + + logging.info("[Whisper] 应用中文专业术语替换。") + for i, seg in enumerate(segments): + text = seg["text"] + for cn, en in PROFESSIONAL_TERMS.items(): + text = text.replace(cn, f"{cn}({en})") + seg["text"] = text + else: + logging.info(f"[Whisper] Detected language is {detected_language}. Skipping Chinese char conversion and professional terms replacement.") + + if segments: + progress.update(30, f"已处理 {len(segments)} 个片段的文本转换与术语替换 (如果适用)") + else: + progress.update(30, "无语音片段进行文本转换或术语替换处理") + + progress.complete(f"识别了{len(segments)}个语音片段") + return segments + except Exception as e: + print(f"[错误] 语音识别失败: {str(e)}") + return [] + + +# ---------------------- 业务逻辑模块 ---------------------- +class ContentAligner: + @staticmethod + def generate_page_intervals(timestamps: list, duration: float) -> list: + """生成页面时间段""" + intervals = [] + for i in range(len(timestamps)): + start = timestamps[i] + end = timestamps[i + 1] if i < len(timestamps) - 1 else duration + intervals.append((start, end)) + return intervals + + @staticmethod + @lru_cache(maxsize=CACHE_SIZE) + def calculate_text_similarity(text1: str, text2: str) -> float: + """计算文本相似度(带缓存)""" + # 使用简单的词重叠度计算 + words1 = set(re.findall(r'\w+', text1.lower())) + words2 = set(re.findall(r'\w+', text2.lower())) + if not words1 or not words2: + return 0.0 + intersection = words1.intersection(words2) + union = words1.union(words2) + return len(intersection) / len(union) + + @staticmethod + def _process_segment(seg, seg_start, intervals, all_segments): + """处理单个语音片段(用于并行处理)""" + # 首先尝试时间戳匹配 + for page_idx, (start, end) in enumerate(intervals): + if start <= seg_start < end: + return page_idx, seg + + # 如果时间戳匹配失败,尝试文本相似度匹配 + best_page = None + best_score = 0.0 + + for page_idx, (start, end) in enumerate(intervals): + # 获取该页面的所有文本 + page_text = " ".join([s["text"] for s in all_segments if start <= s["start"] < end]) + similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text) + if similarity > best_score: + best_score = similarity + best_page = page_idx + + if best_page is not None: + return best_page, seg + return None + + @staticmethod + def _filter_repetitive_segments(segments: list, min_repeats_for_deduplication: int = 3) -> list: + """过滤连续重复的语音片段。如果一个片段的文本连续重复N次或更多,则只保留第一个实例。""" + if not segments: + return [] + + filtered_segments = [] + i = 0 + n = len(segments) + while i < n: + text_to_match = segments[i]['text'] + + # 计算当前文本连续重复的次数 + count = 0 + k = i + while k < n and segments[k]['text'] == text_to_match: + count += 1 + k += 1 + + if count < min_repeats_for_deduplication: # 例如,重复1或2次,保留所有 + filtered_segments.extend(segments[i : i + count]) + else: # 例如,重复3次或更多,只保留第一个 + filtered_segments.append(segments[i]) # 保留序列中的第一个片段 + logging.info(f"文本去重:'{text_to_match[:50]}...' 连续出现 {count} 次,已保留1次。原始首片段信息:Start={segments[i]['start']}, End={segments[i]['end']}") + + i = k # 移动到下一个不同的文本块或列表末尾 + + return filtered_segments + + @staticmethod + def find_best_match(segments: list, intervals: list) -> dict: + """为每个语音片段找到最佳匹配的页面(并行处理)""" + page_texts = defaultdict(list) + unmatched_segments = [] + + # 创建进度跟踪器 + progress = ProgressTracker(len(segments), "内容对齐") + + # 使用线程池进行并行处理 + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = [] + + for seg in segments: + seg_start = seg["start"] + future = executor.submit(ContentAligner._process_segment, seg, seg_start, intervals, segments) + futures.append(future) + + # 收集结果 + for i, future in enumerate(concurrent.futures.as_completed(futures)): + try: + result = future.result() + if result: + page_idx, seg = result + page_texts[page_idx].append(seg) + else: + unmatched_segments.append(seg) + progress.update(1, f"处理第{i + 1}/{len(segments)}个片段") + except Exception as e: + print(f"[警告] 处理语音片段时出错: {str(e)}") + + # 处理未匹配的片段 + if unmatched_segments: + print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段") + # 将未匹配片段添加到最近的页面 + for seg in unmatched_segments: + closest_page = min(range(len(intervals)), + key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2)) + page_texts[closest_page].append(seg) + + progress.complete(f"对齐了{len(segments)}个语音片段") + return page_texts + + @staticmethod + def align_content(video_path: str, timestamps: list) -> list: + """语音-画面对齐主逻辑(改进版,并行处理)""" + timeout_handler_align = None # Initialize for finally block + cap_align = None + try: + # 创建超时处理器 + timeout_handler_align = TimeoutHandler() + timeout_handler_align.start("内容对齐") + + # 获取视频时长 - Prefer OpenCV, then ffmpeg fallback (consistent with extract_keyframes) + duration = 0 + cap_align = cv2.VideoCapture(video_path) + if cap_align.isOpened(): + fps_align = cap_align.get(cv2.CAP_PROP_FPS) + total_frames_align = int(cap_align.get(cv2.CAP_PROP_FRAME_COUNT)) + if fps_align > 0 and total_frames_align > 0: + duration = total_frames_align / fps_align + logging.info(f"[AlignContent] OpenCV获取视频时长: {duration:.2f}秒") + + if duration <= 0 : # Fallback to ffmpeg CLI for duration + logging.warning("[AlignContent] OpenCV未能获取有效时长,尝试FFmpeg CLI。") + try: + ffmpeg_exe_path = os.path.join(FFMPEG_BIN, 'ffmpeg.exe') + if not (FFMPEG_BIN and os.path.exists(ffmpeg_exe_path)): + ffmpeg_exe_path = 'ffmpeg' + process = subprocess.run([ffmpeg_exe_path, '-i', video_path], capture_output=True, text=True, timeout=30) + output_to_parse = process.stderr + duration_match = re.search(r'Duration: (\\d{2}):(\\d{2}):(\\d{2}\\.\\d+)', output_to_parse) + if duration_match: + h, m, s = map(float, duration_match.groups()) + duration = h * 3600 + m * 60 + s + logging.info(f"[AlignContent] FFmpeg CLI成功获取duration: {duration:.2f}秒") + else: # Try stdout + duration_match_stdout = re.search(r'Duration: (\\d{2}):(\\d{2}):(\\d{2}\\.\\d+)', process.stdout) + if duration_match_stdout: + h, m, s = map(float, duration_match_stdout.groups()) + duration = h * 3600 + m * 60 + s + logging.info(f"[AlignContent] FFmpeg CLI (stdout)成功获取duration: {duration:.2f}秒") + else: + logging.error("[AlignContent] FFmpeg CLI无法获取duration。将使用timestamps。") + except Exception as ff_exc_align: + logging.error(f"[AlignContent] FFmpeg CLI获取duration时出错: {ff_exc_align}。将使用timestamps。") + + if duration <= 0: # If all attempts to get duration failed + if timestamps and len(timestamps) > 0 : + duration = timestamps[-1] + FRAME_INTERVAL # Estimate from last keyframe timestamp + logging.warning(f"[AlignContent] 无法获取精确视频时长,根据最后关键帧估算为: {duration:.2f}秒") + else: # Cannot determine duration at all + logging.error("[AlignContent] 无法确定视频时长,内容对齐可能不准确。") + # Potentially raise error or return empty if duration is critical + # For now, proceed with duration=0, which might lead to issues in generate_page_intervals + pass # Let it proceed with duration = 0, subsequent logic must handle + + # 语音识别 + segments = VideoProcessor.transcribe_audio(video_path) + if not segments: + logging.warning("未识别到语音内容,将生成空文本摘要") + segments = [] + else: + original_segment_count = len(segments) + segments = ContentAligner._filter_repetitive_segments(segments) # 调用文本去重 + if len(segments) < original_segment_count: + logging.info(f"语音片段去重处理完成:从 {original_segment_count} 个片段减少到 {len(segments)} 个片段。") + else: + logging.info("语音片段无需去重处理。") + + # 生成页面时间间隔 + intervals = ContentAligner.generate_page_intervals(timestamps, duration) + + # 使用改进的匹配算法(并行处理) + page_texts = ContentAligner.find_best_match(segments, intervals) + + # 生成最终的对齐数据 + aligned_data = [] + for idx in range(len(intervals)): + text = " ".join([seg["text"] for seg in page_texts.get(idx, [])]) + aligned_data.append({ + "page": idx, + "start_time": intervals[idx][0], + "end_time": intervals[idx][1], + "text": text if text else "未识别到相关语音内容" + }) + + # 停止超时处理 + timeout_handler_align.stop() + + return aligned_data + except Exception as e: + logging.error(f"内容对齐失败: {str(e)}") + logging.error(traceback.format_exc()) + return [] + finally: + if timeout_handler_align: + timeout_handler_align.stop() + if cap_align and cap_align.isOpened(): + cap_align.release() + logging.info("[AlignContent] OpenCV VideoCapture released.") + + +# ---------------------- 摘要生成模块 ---------------------- +class SummaryGenerator: + @staticmethod + def optimize_text(text: str) -> str: + """优化文本内容""" + # 替换专业术语 + for term, abbr in PROFESSIONAL_TERMS.items(): + text = text.replace(term, f'{term} ({abbr})') + + # 优化过渡词 + for word in TRANSITION_WORDS: + text = text.replace(word, f'{word}') + + return text + + @staticmethod + def generate_html(aligned_data: list, keyframes: list, output_dir: str): + """生成HTML格式的报告""" + # 创建临时目录用于存储图片 + temp_img_dir = os.path.join(output_dir, "temp_images") + os.makedirs(temp_img_dir, exist_ok=True) + + # 创建进度跟踪器 + progress = ProgressTracker(len(aligned_data) + 1, "HTML报告生成") + + # 创建超时处理器 + timeout_handler = TimeoutHandler() + timeout_handler.start("HTML报告生成") + + try: + # 检查输出目录权限 + try: + # 尝试在输出目录创建测试文件以验证权限 + test_file = os.path.join(output_dir, "test_write_permission.tmp") + with open(test_file, 'w') as f: + f.write("test") + os.remove(test_file) + logging.info(f"输出目录权限检查通过: {output_dir}") + except Exception as e: + logging.error(f"输出目录权限检查失败: {str(e)},尝试使用当前目录") + # 如果指定的输出目录不可写,则使用当前目录 + output_dir = os.path.abspath(".") + temp_img_dir = os.path.join(output_dir, "temp_images") + os.makedirs(temp_img_dir, exist_ok=True) + logging.info(f"已切换到当前目录作为输出: {output_dir}") + + # 性能优化:减小图片大小,加快处理 + logging.info("优化图片尺寸以提高性能") + optimized_keyframes = [] + for frame in keyframes: + # 限制图片最大尺寸为720p + if frame.width > 1280 or frame.height > 720: + aspect_ratio = frame.width / frame.height + if aspect_ratio > 16/9: # 宽屏 + new_width = 1280 + new_height = int(new_width / aspect_ratio) + else: + new_height = 720 + new_width = int(new_height * aspect_ratio) + frame = frame.resize((new_width, new_height), Image.LANCZOS) + optimized_keyframes.append(frame) + + keyframes = optimized_keyframes + logging.info("图片尺寸优化完成") + + # 处理所有帧 + pages_data = [] + for idx, frame in enumerate(keyframes): + try: + page_data = SummaryGenerator._process_frame(idx, frame, aligned_data, temp_img_dir) + if page_data: + pages_data.append(page_data) + progress.update(1, f"处理第 {idx + 1} 页") + except Exception as e: + logging.error(f"处理帧 {idx} 时出错: {str(e)}") + logging.error(traceback.format_exc()) + continue + + # 检查是否有成功处理的页面 + if not pages_data: + logging.error("没有成功处理任何页面,无法生成HTML报告") + raise RuntimeError("没有成功处理任何页面,无法生成HTML报告") + + # 生成HTML模板 + template = Environment().from_string(""" + + + + + PPT视频结构化摘要 + + + +

PPT视频结构化摘要

+ {% for page in pages %} +
+ +
+
+ 页面截图 +
+
{{ page.text }}
+
+
+ {% endfor %} + + + + """) + + # 保存HTML文件 + output_path = os.path.join(output_dir, "summary.html") + try: + with open(output_path, "w", encoding="utf-8") as f: + f.write(template.render(pages=pages_data)) + logging.info(f"HTML报告已生成: {output_path}") + # 检查文件是否已成功写入 + if os.path.exists(output_path) and os.path.getsize(output_path) > 0: + logging.info(f"HTML报告验证成功: {output_path},大小: {os.path.getsize(output_path)} 字节") + else: + logging.error(f"HTML报告生成失败: 文件不存在或为空: {output_path}") + raise IOError(f"HTML报告生成失败: 文件不存在或为空: {output_path}") + except Exception as e: + logging.error(f"HTML报告保存失败: {str(e)}") + # 尝试使用备用路径 + backup_path = os.path.join(os.path.abspath("."), f"summary_{int(time.time())}.html") + logging.info(f"尝试使用备用路径保存HTML: {backup_path}") + with open(backup_path, "w", encoding="utf-8") as f: + f.write(template.render(pages=pages_data)) + logging.info(f"HTML报告已使用备用路径生成: {backup_path}") + output_path = backup_path # 更新输出路径 + + # 停止超时处理 + timeout_handler.stop() + progress.complete(f"HTML报告生成完成: {output_path}") + + # 打印明确的文件位置信息以便用户查找 + print(f"\n[重要] HTML报告已生成在: {os.path.abspath(output_path)}\n") + + except Exception as e: + logging.error(f"HTML报告生成过程中发生错误: {str(e)}") + try: + logging.error(traceback.format_exc()) + except Exception: + logging.error("无法获取详细错误信息,traceback模块不可用") + # 停止超时处理 + timeout_handler.stop() + raise + finally: + # 清理临时文件 + try: + if os.path.exists(temp_img_dir): + for f in os.listdir(temp_img_dir): + try: + os.remove(os.path.join(temp_img_dir, f)) + except Exception as e: + logging.error(f"删除临时图片文件失败: {str(e)}") + try: + os.rmdir(temp_img_dir) + logging.info("已删除临时图片目录") + except Exception as e: + logging.error(f"删除临时图片目录失败: {str(e)}") + except Exception as e: + logging.error(f"清理临时文件时出错: {str(e)}") + + return output_path # 返回生成的HTML文件路径 + + @staticmethod + def _process_frame(idx, frame, aligned_data, temp_img_dir): + """处理单个帧""" + try: + img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg") + frame.save(img_path, quality=85) + with open(img_path, "rb") as f: + img_data = base64.b64encode(f.read()).decode("utf-8") + + return { + "num": idx + 1, + "time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s", + "image": f"data:image/jpeg;base64,{img_data}", + "text": SummaryGenerator.optimize_text(aligned_data[idx]["text"]) + } + except Exception as e: + logging.error(f"处理帧 {idx} 时出错: {str(e)}") + return None + + @staticmethod + def generate_pdf(aligned_data: list, keyframes: list, output_dir: str): + """生成PDF格式的报告""" + # 首先生成HTML文件 + html_path = os.path.join(output_dir, "summary.html") + if not os.path.exists(html_path): + SummaryGenerator.generate_html(aligned_data, keyframes, output_dir) + + # 创建进度跟踪器 + progress = ProgressTracker(1, "PDF报告生成") + + # 创建超时处理器 + timeout_handler = TimeoutHandler() + timeout_handler.start("PDF报告生成") + + try: + logging.info("开始将HTML转换为PDF...") + + # 设置PDF配置选项 + options = { + 'page-size': 'A4', + 'margin-top': '0.75in', + 'margin-right': '0.75in', + 'margin-bottom': '0.75in', + 'margin-left': '0.75in', + 'encoding': 'UTF-8', + 'no-outline': None, + 'quiet': '' + } + + # 生成PDF文件路径 + pdf_path = os.path.join(output_dir, "summary.pdf") + + # 使用pdfkit生成PDF + try: + pdfkit.from_file(html_path, pdf_path, options=options) + logging.info(f"PDF报告已生成: {pdf_path}") + + # 停止超时处理 + timeout_handler.stop() + progress.complete("PDF报告生成完成") + return True + except Exception as e: + logging.error(f"PDF生成失败: {str(e)}") + return False + + except Exception as e: + logging.error(f"PDF报告生成过程出错: {str(e)}") + timeout_handler.stop() + return False + + @classmethod + def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str): + """生成所有格式报告""" + try: + # 首先生成HTML报告 + html_path = cls.generate_html(aligned_data, keyframes, output_dir) + + # 输出明确的报告位置提示 + print(f"\n[完成] 报告生成成功!\n") + print(f"HTML报告地址: {os.path.abspath(html_path)}") + + # 尝试生成PDF报告 + pdf_success = False + try: + # 检查pdfkit模块是否可用 + if 'pdfkit' in sys.modules: + pdf_success = cls.generate_pdf(aligned_data, keyframes, output_dir) + else: + logging.info("pdfkit模块不可用,跳过PDF生成") + except Exception as e: + logging.error(f"PDF报告生成失败: {str(e)}") + + if not pdf_success: + logging.warning("PDF生成功能不可用或生成失败,仅生成HTML报告") + + return True + except Exception as e: + logging.error(f"报告生成出错: {str(e)}") + logging.error(traceback.format_exc()) + # 创建一个极简的报告,以确保用户至少能看到一些结果 + try: + fallback_path = os.path.join(os.path.abspath("."), "emergency_report.html") + with open(fallback_path, "w", encoding="utf-8") as f: + f.write(f""" + + + + + 应急报告 + + +

视频处理完成,但报告生成失败

+

处理过程中发生了以下错误:

+
{str(e)}
+

请查看日志文件以获取更多信息。

+ + + """) + print(f"\n[警告] 正常报告生成失败,已创建应急报告: {fallback_path}\n") + except Exception: + logging.error("创建应急报告也失败了") + return False + + +# ---------------------- 主流程控制 ---------------------- +def main_process(video_path, output_dir=None, progress_callback=None): + try: + logging.info(f"开始处理视频文件: {video_path}") + + # 设置输出目录 + if output_dir is None: + output_dir = OUTPUT_DIR + + # 检查输出目录是否存在,如果不存在则创建 + try: + os.makedirs(output_dir, exist_ok=True) + logging.info(f"使用输出目录: {output_dir}") + + # 检查输出目录权限 + test_file = os.path.join(output_dir, "test_permission.tmp") + with open(test_file, "w") as f: + f.write("test") + os.remove(test_file) + except Exception as e: + logging.error(f"输出目录异常: {str(e)},使用当前目录作为替代") + output_dir = os.path.abspath(".") + os.makedirs(output_dir, exist_ok=True) + logging.info(f"已切换到当前目录: {output_dir}") + + # 进度回调函数 + def update_progress(progress, message=None): + if progress_callback: + try: + progress_callback(progress, message) + except Exception as e: + logging.error(f"进度回调函数执行失败: {str(e)}") + logging.info(f"处理进度: {progress}% - {message if message else ''}") + + # 初始化进度 + update_progress(0, "开始处理视频") + + # 检查视频文件是否存在 + if not os.path.exists(video_path): + error_msg = f"视频文件不存在: {video_path}" + logging.error(error_msg) + update_progress(0, f"错误: {error_msg}") + raise FileNotFoundError(error_msg) + + # 检查文件大小 + file_size = os.path.getsize(video_path) / (1024 * 1024) # 转换为MB + logging.info(f"视频文件大小: {file_size:.2f}MB") + + # 检查文件是否为空 + if file_size == 0: + error_msg = "视频文件为空" + logging.error(error_msg) + update_progress(0, f"错误: {error_msg}") + raise ValueError(error_msg) + + # 检查文件是否可读 + try: + with open(video_path, 'rb') as f: + f.read(1024) # 尝试读取一小块数据 + except Exception as e: + error_msg = f"视频文件无法读取: {str(e)}" + logging.error(error_msg) + update_progress(0, f"错误: {error_msg}") + raise IOError(error_msg) + + # 检查依赖项 + update_progress(5, "检查系统依赖") + if not check_dependencies(): + error_msg = "依赖项检查失败" + logging.error(error_msg) + update_progress(5, f"错误: {error_msg}") + raise RuntimeError(f"{error_msg},请检查日志获取详细信息") + + update_progress(10, "依赖项检查通过") + + # 初始化视频处理器 + processor = VideoProcessor() + + # 提取关键帧 + logging.info("开始提取关键帧...") + update_progress(15, "开始提取关键帧") + try: + keyframes, duration = processor.extract_keyframes(video_path) + if not keyframes: + error_msg = "关键帧提取失败:未能提取到任何关键帧" + logging.error(error_msg) + update_progress(15, f"错误: 未能提取到关键帧") + raise RuntimeError(error_msg) + logging.info(f"成功提取 {len(keyframes)} 个关键帧,视频时长:{duration:.2f}秒") + update_progress(40, f"已提取 {len(keyframes)} 个关键帧") + except Exception as e: + error_msg = f"关键帧提取过程出错: {str(e)}" + logging.error(error_msg) + try: + logging.error(traceback.format_exc()) + except Exception: + logging.error("无法获取详细错误信息,traceback模块不可用") + update_progress(15, f"错误: 关键帧提取失败 - {str(e)}") + raise RuntimeError(error_msg) + + # 转录音频 + logging.info("开始转录音频...") + update_progress(45, "开始转录音频") + try: + segments = VideoProcessor.transcribe_audio(video_path) + if not segments: + logging.warning("音频转录失败:未能识别到任何语音内容") + update_progress(45, "警告: 未识别到语音内容,将生成空文本摘要") + segments = [] + else: + logging.info(f"成功转录 {len(segments)} 个音频片段") + update_progress(65, f"已转录 {len(segments)} 个音频片段") + for i, seg in enumerate(segments[:3], 1): # 只记录前三个片段作为示例 + logging.debug(f"音频片段 {i}: {seg['text'][:50]}...") + except Exception as e: + error_msg = f"音频转录过程出错: {str(e)}" + logging.error(error_msg) + try: + logging.error(traceback.format_exc()) + except Exception: + logging.error("无法获取详细错误信息,traceback模块不可用") + update_progress(45, f"错误: 音频转录失败 - {str(e)}") + raise RuntimeError(error_msg) + + # 计算时间戳 + timestamps = [0] # 添加起始时间戳 + for frame_idx, frame in enumerate(keyframes[1:], 1): + timestamps.append(frame_idx * duration / len(keyframes)) + + # 对齐内容 + logging.info("开始对齐内容...") + update_progress(70, "开始对齐内容") + try: + aligned_data = ContentAligner.align_content(video_path, timestamps) + if not aligned_data: + error_msg = "内容对齐失败:未能生成对齐数据" + logging.error(error_msg) + update_progress(70, "错误: 内容对齐失败") + # 创建一个空的对齐数据,以便能继续生成报告 + aligned_data = [] + for i in range(len(keyframes)): + aligned_data.append({ + "page": i, + "start_time": timestamps[i], + "end_time": timestamps[i+1] if i < len(timestamps)-1 else duration, + "text": "未能识别到相关语音内容" + }) + logging.info(f"已创建{len(aligned_data)}个空内容对齐数据") + update_progress(75, "使用空内容继续处理") + else: + logging.info(f"成功对齐 {len(aligned_data)} 个内容片段") + update_progress(80, f"已对齐 {len(aligned_data)} 个内容片段") + for i, data in enumerate(aligned_data[:3], 1): # 只记录前三个对齐结果作为示例 + logging.debug(f"对齐片段 {i}: {data.get('start_time', 'N/A')}s - {data.get('end_time', 'N/A')}s") + except Exception as e: + error_msg = f"内容对齐过程出错: {str(e)}" + logging.error(error_msg) + try: + logging.error(traceback.format_exc()) + except Exception: + logging.error("无法获取详细错误信息,traceback模块不可用") + update_progress(70, f"错误: 内容对齐失败 - {str(e)}") + + # 创建一个空的对齐数据,以便能继续生成报告 + aligned_data = [] + for i in range(len(keyframes)): + aligned_data.append({ + "page": i, + "start_time": timestamps[i], + "end_time": timestamps[i+1] if i < len(timestamps)-1 else duration, + "text": "未能识别到相关语音内容" + }) + logging.info(f"已创建{len(aligned_data)}个空内容对齐数据") + update_progress(75, "使用空内容继续处理") + + # 生成总结 + logging.info("开始生成总结...") + update_progress(85, "开始生成报告") + try: + if SummaryGenerator.generate_all(aligned_data, keyframes, output_dir): + logging.info(f"总结生成完成,输出目录: {output_dir}") + update_progress(100, "处理完成") + + # 检查HTML文件是否存在 + html_path = os.path.join(output_dir, "summary.html") + if os.path.exists(html_path): + logging.info(f"报告验证成功: {html_path}") + print(f"\n[成功] 报告生成完成,位置: {os.path.abspath(html_path)}\n") + else: + logging.warning(f"报告文件不存在: {html_path}") + print(f"\n[警告] 处理似乎完成但未找到报告文件,请检查日志\n") + else: + error_msg = "报告生成失败" + logging.error(error_msg) + update_progress(85, f"错误: {error_msg}") + raise RuntimeError(error_msg) + except Exception as e: + error_msg = f"总结生成过程出错: {str(e)}" + logging.error(error_msg) + try: + logging.error(traceback.format_exc()) + except Exception: + logging.error("无法获取详细错误信息,traceback模块不可用") + update_progress(85, f"错误: 报告生成失败 - {str(e)}") + + # 尝试创建一个简单的报告 + try: + simple_html = os.path.join(output_dir, "simple_report.html") + with open(simple_html, "w", encoding="utf-8") as f: + f.write(f""" + + + + + 简单报告 + + + +

视频简单报告

+

完整报告生成失败,这是一个简化版本

+ """) + + # 添加关键帧 + for i, frame in enumerate(keyframes): + # 保存图片 + img_path = os.path.join(output_dir, f"frame_{i}.jpg") + frame.save(img_path) + + # 添加到HTML + f.write(f""" +
+

第 {i+1} 帧

+ 关键帧 {i+1} +
+ """) + + f.write("") + + logging.info(f"简单报告已生成: {simple_html}") + print(f"\n[恢复] 创建了简单报告: {os.path.abspath(simple_html)}\n") + except Exception as inner_e: + logging.error(f"简单报告生成也失败了: {str(inner_e)}") + + raise RuntimeError(error_msg) + + logging.info("所有处理步骤已完成") + return True + + except Exception as e: + logging.error(f"处理过程中发生错误: {str(e)}") + logging.error("详细错误信息:") + try: + logging.error(traceback.format_exc()) + except Exception: + logging.error("无法获取详细错误信息,traceback模块不可用") + if progress_callback: + try: + progress_callback(0, f"处理失败: {str(e)}") + except: + pass + print(f"\n[错误] 处理失败: {str(e)}\n") + return False + + +if __name__ == "__main__": + try: + if len(sys.argv) < 2: + print("使用方法: python 毕设.py <视频文件路径>") + sys.exit(1) + + video_path = sys.argv[1] + if main_process(video_path): + print("[完成] 处理成功") + sys.exit(0) + else: + print("[错误] 处理失败,请查看日志文件了解详情") + sys.exit(1) + except KeyboardInterrupt: + print("\n[中断] 用户中断了处理") + sys.exit(130) + except Exception as e: + print(f"[错误] 程序执行过程中出现未处理的异常: {str(e)}") + try: + traceback.print_exc() + except Exception: + print("无法打印详细错误信息,traceback模块不可用") + sys.exit(1)